package flink.demo;

import com.alibaba.fastjson.JSONObject;
import com.enniu.cloud.services.riskbrain.flink.job.EnniuKafkaSource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaJsonTableSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;


/**
 * flink table api demo
 *
 * @author caicongyang
 * @version id: TableApiJob, v 0.1 18/4/12 上午10:30 caicongyang1 Exp $$
 */
public class TableApiJob {

    private static final String KAFKATOPIC = "kafak_source_topic";


    private static final String KAFKASERVER = "dev.kafka.com";


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        org.apache.flink.table.api.java.StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(env);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("bootstrap.servers", KAFKASERVER);
        kafkaProperties.setProperty("group.id", "TableApiJob");

        //{"id":"int","name":"string","score":"int","currentTimeStamp":"string"}
        //kafka schema
        String schema = "{\"id\":\"int\",\"name\":\"string\",\"score\":\"int\",\"currentTimeStamp\":\"long\"}";
        JSONObject jsonObject = JSONObject.parseObject(schema);

        //字典映射
        Map<String, TypeInformation> dic = new HashMap<>();
        dic.put("string", Types.STRING());
        dic.put("int", Types.INT());
        dic.put("long", Types.LONG());

        Set<String> keySet = jsonObject.keySet();


        String[] key = (String[]) keySet.toArray(new String[keySet.size()]);


        List<TypeInformation> valueList = new ArrayList<>();
        for (String i : keySet) {
            valueList.add(dic.get(jsonObject.getString(i)));
        }

        TypeInformation<?>[] value = (TypeInformation<?>[]) valueList.toArray(new TypeInformation<?>[valueList.size()]);

        // specify JSON field names and types
        TypeInformation<Row> typeInfo = Types.ROW(
            key,
            value
        );

        KafkaJsonTableSource tableSource = new EnniuKafkaSource(
            KAFKATOPIC,
            kafkaProperties,
            typeInfo);

        // Fail on missing JSON field
        tableSource.setFailOnMissingField(true);

        sTableEnv.registerTableSource("table1", tableSource);

        Table table = sTableEnv.sql("SELECT SUM(score),name FROM table1  group by name");

        //table to stream for Retract model
        DataStream<Tuple2<Boolean, Row>> tuple2DataStream = sTableEnv.toRetractStream(table, Row.class);

        SingleOutputStreamOperator<String> desStream = tuple2DataStream
            .map(new MapFunction<Tuple2<Boolean, Row>, String>() {

                @Override
                public String map(Tuple2<Boolean, Row> value) throws Exception {

                    return value.f1.toString();
                }
            });

        desStream.print();

        env.execute();

    }

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐