功能说明:
对kafka中的流数据进行清洗,其中相关的配置文件存放在数据库中,需要将流数据与数据库中的配置文件进行关联,输出相应的结果。

方式一:异步查询

使用异步查询的方式,对数据流中的数据进行查询。代码如下:

package flink.stream.asyncIOSide;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static stranger.PropertyLoader.getPropertiesConfig;

/**
 * @author [tu.tengfei]
 * @description 通过异步的方式,将流表与维表进行连接
 * @date 2019/5/28
 */
public class AsyncIOSideJoinPostgres {
    public static void main(String[] args) throws Exception {

        //读取配置文件
        final String configPath = "config.properties";
        final Properties pro = getPropertiesConfig(configPath);
        final String topic = "stranger";
        final String groupId = "mainStranger";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);

        //读取kafka数据流
        String bootstrapServers = pro.getProperty("bootstrap.servers");
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", bootstrapServers);//kafka的节点的IP或者hostName,多个使用逗号分隔
        properties.setProperty("group.id", groupId);//flink consumer flink的消费者的group.id
        FlinkKafkaConsumer011<String> kafkaStream = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties);
        kafkaStream.setStartFromLatest();

        DataStreamSource<String> mainStream = env.addSource(kafkaStream);
//        mainStream.map(x->x.split("\\t"));

        //处理kafka数据:时间  人员ID  行为状态  卡口ID
        DataStream<Tuple4<String, String, String, String>> mainMap = mainStream.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
            @Override
            public Tuple4<String, String, String, String> map(String s) throws Exception {
                String[] split = s.split("\\t");
                return new Tuple4<>(split[0], split[1], split[2], split[3]);
            }
        });

        //构建异步查询
        DataStream<Tuple5<String, String, String, String, String>> asyncMainStream;
        if (true) {
            asyncMainStream = AsyncDataStream.orderedWait(
                    mainMap,
                    new AsyncBehavior(),
                    1000000L,
                    TimeUnit.MILLISECONDS,
                    20);
        } else {
            asyncMainStream = AsyncDataStream.unorderedWait(
                    mainMap,
                    new AsyncBehavior(),
                    10000,
                    TimeUnit.MILLISECONDS,
                    20);
        }

        asyncMainStream.print();
        env.execute("Async select test");


    }

    /**
     * 异步查询
     */
    private static class AsyncBehavior extends RichAsyncFunction<Tuple4<String, String, String, String>, Tuple5<String, String, String, String, String>> {
        private static final String configPath = "config.properties";
        private transient SQLClient behaviorSQLClient;
        private Cache<String, String> Cache;

        /**
         * 初始化
         *
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //配置缓存
            Cache = Caffeine
                    .newBuilder()
                    .maximumSize(1025)
                    .expireAfterAccess(10, TimeUnit.MINUTES)
                    .build();

            Properties pro = getPropertiesConfig(configPath);
            String driver = pro.getProperty("STRANGER.DRIVER");
            String url = pro.getProperty("STRANGER.URL");
            String user = pro.getProperty("STRANGER.USERNAME");
            String password = pro.getProperty("STRANGER.PASSWORD");

            JsonObject mySQLClientConfig = new JsonObject();
            mySQLClientConfig.put("url", url)
                    .put("driver_class", driver)
                    .put("max_pool_size", 20)
                    .put("user", user)
//                    .put("max_idle_time",1000)
                    .put("password", password);

            VertxOptions vo = new VertxOptions();
            vo.setEventLoopPoolSize(10);
            vo.setWorkerPoolSize(20);

            Vertx vertx = Vertx.vertx(vo);

            behaviorSQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);
        }

        /**
         * 关闭连接
         *
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            super.close();
            if (behaviorSQLClient != null)
                behaviorSQLClient.close();
            if (Cache != null)
                Cache.cleanUp();
        }

        /**
         * 查询逻辑
         *
         * @param input 时间  人员ID  状态  地点
         * @param out   时间  人员ID  状态  地点  分值
         * @throws Exception
         */
        @Override
        public void asyncInvoke(Tuple4<String, String, String, String> input, ResultFuture<Tuple5<String, String, String, String, String>> out) throws Exception {
            String behavior = input.f2;
            // 从缓存中读取
            String ifPresent = Cache.getIfPresent(behavior);
            if (ifPresent != null) {
                out.complete(Collections.singleton(new Tuple5<>(input.f0, input.f1, input.f2, input.f3, ifPresent)));
                return;
            } else {
                behaviorSQLClient.getConnection(conn -> {
                    if (conn.failed()) {
                        out.completeExceptionally(conn.cause());
                        return;
                    } else {
                        SQLConnection result = conn.result();

                        //结合自己的查询逻辑,拼凑出相应的sql,然后返回结果。
                        String sql = "SELECT first_score FROM behavior_info where behavior='" + behavior + "'";

                        result.query(sql, res -> {
                            if (res.failed()) {
                                out.complete(null);
                                return;
                            } else {
                                ResultSet resultSet = res.result();
                                List<JsonObject> rows = resultSet.getRows();
                                if (rows.size() <= 0) {
                                    out.complete(null);
                                    return;
                                } else {
                                    for (JsonObject row : rows) {
                                        String score = String.valueOf(row.getInteger("first_score"));
                                        //将查询的结果进行缓存
                                        Cache.put(behavior, score);
                                        out.complete(Collections.singleton(new Tuple5<>(input.f0, input.f1, input.f2, input.f3, score)));
                                    }
                                }
                            }
                        });
                        result.close(done -> {
                            if (done.failed()) {
                                throw new RuntimeException(done.cause());
                            }
                        });
                    }

                });
            }
        }

    }
}

方式二:广播配置数据

将数据库中的配置数据读取出来,进行广播,然后与实时流进行关联。
参考:https://blog.csdn.net/weixin_43315211/article/details/90201810

package flink.stream.addsource;

import flink.BasicConf;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapStateDescriptor;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;

/**
 * 需求:
 * 将postgresql中的数据读取到streamPgSql中,作为配置数据,包含code和name
 * 同时将streamPgSql通过广播,减少数据的内存消耗
 *
 * 将kafka中的数据与postgresql中的数据进行join,清洗,得到相应的数据
 *
 * Broadcast会将state广播到每个task
 * 注意该state并不会跨task传播
 * 对其修改,仅仅是作用在其所在的task
 */
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class StreamKafkaJoinPostgres {
    public static void main(String[] args) throws Exception {
        final String bootstrap = BasicConf.KafkaConf.bootstrap;
        final String zookeeper = BasicConf.KafkaConf.zookeeper;
        final String topic = "web";
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        env.enableCheckpointing(5000);  //检查点 每5000ms
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//        final StreamTableEnvironment tenv = TableEnvironment.getTableEnvironment(env);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", bootstrap);//kafka的节点的IP或者hostName,多个使用逗号分隔
        properties.setProperty("zookeeper.connect", zookeeper);//zookeeper的节点的IP或者hostName,多个使用逗号进行分隔
        properties.setProperty("group.id", "flinkStream");//flink consumer flink的消费者的group.id

        //1、读取postgresQL的配置消息
        DataStream <String> streamPgSql = env.addSource(new PostgresqlSource());

        final DataStream <HashMap <String, String>> conf = streamPgSql.map(new MapFunction <String, HashMap <String, String>>() {
            @Override
            public HashMap <String, String> map(String value) throws Exception {
                String[] tokens = value.split("\\t");
                HashMap <String, String> hashMap = new HashMap <>();
                hashMap.put(tokens[0], tokens[1]);
                System.out.println(tokens[0]+" : "+tokens[1]);
                return hashMap;
//                return new Tuple2<>(tokens[0],tokens[1]);
            }
        });


        //2、创建MapStateDescriptor规则,对广播的数据的数据类型的规则
        MapStateDescriptor <String,Map<String,String>> ruleStateDescriptor = new MapStateDescriptor <>("RulesBroadcastState"
                ,BasicTypeInfo.STRING_TYPE_INFO
                ,new MapTypeInfo<>(String.class,String.class));
        //3、对conf进行broadcast返回BroadcastStream
        final BroadcastStream <HashMap <String, String>> confBroadcast = conf.broadcast(ruleStateDescriptor);

        //读取kafka中的stream
        FlinkKafkaConsumer011 <String> webStream = new FlinkKafkaConsumer011 <>(topic, new SimpleStringSchema(), properties);
        webStream.setStartFromEarliest();
        DataStream <String> kafkaData = env.addSource(webStream).setParallelism(1);
        //192.168.108.209	2019-05-07 16:11:09	"GET /class/2.html"	503	https://search.yahoo.com/search?p=java核心编程
        DataStream <Tuple5 <String, String, String, String, String>> map = kafkaData.map(new MapFunction <String, Tuple5 <String, String, String, String, String>>() {
            @Override
            public Tuple5 <String, String, String, String, String> map(String value) throws Exception {
                String[] tokens = value.split("\\t");
                return new Tuple5 <>(tokens[0], tokens[1], tokens[2], tokens[3], tokens[4]);
            }
        })
                //使用connect连接BroadcastStream,然后使用process对BroadcastConnectedStream流进行处理
                .connect(confBroadcast)
                .process(new BroadcastProcessFunction <Tuple5 <String, String, String, String, String>, HashMap <String, String>, Tuple5 <String, String, String, String, String>>() {
                    private HashMap<String,String> keyWords = new HashMap <>();
                    MapStateDescriptor <String,Map<String,String>> ruleStateDescriptor = new MapStateDescriptor <>("RulesBroadcastState"
                            ,BasicTypeInfo.STRING_TYPE_INFO
                            ,new MapTypeInfo<>(String.class,String.class));

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                    }

                    @Override
                    public void processElement(Tuple5 <String, String, String, String, String> value, ReadOnlyContext ctx, Collector <Tuple5 <String, String, String, String, String>> out) throws Exception {
//                        Thread.sleep(10000);
						Map<String, String> map= ctx.getBroadcastState(ruleStateDescriptor).get("keyWords");
                        String result = map.get(value.f3);
                        if (result == null) {
                            out.collect(new Tuple5 <>(value.f0, value.f1, value.f2, value.f3, value.f4));
                        } else {
                            out.collect(new Tuple5 <>(value.f0, value.f1, value.f2, result, value.f4));
                        }
                  
                    }

                    /**
                     * 接收广播中的数据
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processBroadcastElement(HashMap <String, String> value, Context ctx, Collector <Tuple5 <String, String, String, String, String>> out) throws Exception {
//                        System.out.println("收到广播数据:"+value.values());
                        BroadcastState <String, Map <String, String>> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
                        keyWords.putAll(value);
                        broadcastState.put("keyWords", keyWords);
                    }
                });

        map.print();
        env.execute("Broadcast test kafka");
    }
}

个人理解:
异步查询消耗的是数据库的查询时间,但数据具有实时性,当数据库中的数据进行更新时,流的配置数据也会随之进行更新,但当数据流中的数据过大时,容易造成数据库连接超过最大量,产生程序的异常。
广播的形式,是将数据库中的数据以流的形式写入,然后进行广播,实时性跟自己设置的数据刷新时间有关,是基于内存来进行的,不消耗查询时间。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐