我们在做实时数据开发的时候,通常要用spark、flink去消费kafka的数据,拿到数据流后会和外部数据库(Hbase、MySQL等)进行维表关联来把数据流打宽。当然了,有些外部数据库不只是存储维度数据,也会有很多事实数据,并且这些数据更新频繁,数据量巨大,但是我们的Flink流也会去实时的join这些巨大的事实表,这就需要选择一个合适的外部数据库作为支持,这个外部数据库一定要满足海量数据高效的读写性能,这样才能满足实时场景的需求,说到这,我们的目光自然而然的落到了Hbase上,来吧,我们直接上图,下面这张图就是以上所说场景的一个基本架构

那么问题来了,FlinkSQL如何去关联Hbase大表呢,如果关联字段不是hbase维表的rowkey那么将会触发全表扫描,如果这个表很大,全表扫描效率就很不乐观了,耗时少则几秒,多则无限延长,所以我们一定是要走hbase二级索引的,但是很遗憾,FlinkSQL里的Hbase connector不会处理索引,它要么scan,要么就get,那么我们该怎么办呢,别急,我们也有笨办法,那就是我们自己维护索引表,如果你还不懂hbase二级索引的实现方式请自行补充这方面知识,下面的内容就是有关二级索引的试用了,看图吧

 来,我描述一下上图的流程,首先消费到kafka数据后我们的流不能直接去join hbase的数据表而是要先去join索引表,这样就拿到了数据表的rowkey,然后我们再join数据表,这样就不会触发全表扫描了,而是通过rowkey查询,效率就一下子有了质的提升,那么代码是怎么实现呢?上案例!稍等,我先说一下这个案例,我们需要在Flink流中创建三个表:kafka表、hbase维度索引表、hbase维度表,然后我们就可以愉快的写SQL了

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //接入socket流,测试数据如下
        //{'name':'kafka_tb','type':'INSERT','new':{'id':'1','name':'lxz'}}
        DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
        //定义kafka_tb表类型(有序)
        TypeInformation[] kafka_tb_types = new TypeInformation[]{Types.STRING,Types.STRING};
        RowTypeInfo kafka_tb_rowType = new RowTypeInfo(kafka_tb_types);
        //socket接收到的流转换后注册成kafka_tb表
        DataStream<Row> ds = dataStream.flatMap(new FlatMapFunction<String, Row>() {
            @Override
            public void flatMap(String value, Collector<Row> out) throws Exception {
                String type = JSON.parseObject(value).getString("type");
                JSONObject new_row = JSON.parseObject(value).getJSONObject("new");
                switch (type) {
                    case "INSERT":
                        out.collect(Row.ofKind(RowKind.INSERT, new_row.getString("id"), new_row.getString("name")));break;
                }
            }
        }).returns(kafka_tb_rowType);
        //注册kafka表kafka_tb
        Schema schema01 = Schema.newBuilder().build();
        Table tab1 = tEnv.fromChangelogStream(ds,schema01).as("id","name");
        tEnv.createTemporaryView("kafka_tb", tab1);
        //注册Hbase索引表hbase_index_tb
        tEnv.executeSql("CREATE TABLE hbase_index_tb (\n" +
                " ID STRING,\n" +
                " CF ROW<NAME STRING>,\n" +
                " PRIMARY KEY (ID) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = 'hbase-2.2',\n" +
                " 'table-name' = 'hbase_index_tb',\n" +
                " 'zookeeper.quorum' = 'prod-bigdata-pc10:2181,prod-bigdata-pc14:2181,prod-bigdata-pc15:2181',\n" +
                " 'zookeeper.znode.parent' = '/hbase-unsecure'\n"+
                ")");
        //注册Hbase数据表hbase_data_tb
        tEnv.executeSql("CREATE TABLE hbase_data_tb (\n" +
                " ID STRING,\n" +
                " CF ROW<CITY STRING,AGE INT,SEX STRING,NAME STRING,GRADE FLOAT>,\n" +
                " PRIMARY KEY (ID) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = 'hbase-2.2',\n" +
                " 'table-name' = 'hbase_data_tb',\n" +
                " 'zookeeper.quorum' = 'prod-bigdata-pc10:2181,prod-bigdata-pc14:2181,prod-bigdata-pc15:2181',\n" +
                " 'zookeeper.znode.parent' = '/hbase-unsecure'\n"+
                ")");
        //执行关联查询
        tEnv.executeSql("select a.* " +
                "from hbase_data_tb a " +
                "join hbase_index_tb b " +
                "on a.ID = b.ID " +
                "join kafka_tb c " +
                "on  c.name=b.NAME").print();
    }

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐