静态表

flink sql 定义的维度表,一般程序启动的时候将维度即信息一次性缓存到内存中,下次流数据与之关联的时候,实际是与内存中数据进行关联查询
样例SQL

create table source_kafka (
id bigint,
name string,
proctime as proctime()
) WITH (
  'connector' = 'kafka',
  'topic' = 'test',
  'properties.bootstrap.servers' = 'xxx',
  'properties.group.id' = 'test',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);


CREATE TABLE source_hbase (
  id string,
  cf ROW <name string>,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'zyd_test',
'zookeeper.quorum' = 'xxx'
);

create table sink_print(
id bigint,
name string
)
 WITH (
'connector' = 'print'
);

insert into sink_print
select 
t1.id,t2.name
from
source_kafka t1
left join 
source_hbase t2
on t1.id=cast(t2.id as bigint);

kafka 测试用例,第二条数据是修改hbase后发送的
在这里插入图片描述
hbase操作
在这里插入图片描述
flink 输出日志
在这里插入图片描述
hbase中再增加一条数据
在这里插入图片描述
kafka中也发送了
在这里插入图片描述
flink 结果
在这里插入图片描述

动态表

针对以上问题,维度发生更新了怎么办,需要引入动态表,锁定事件的时间

create table source_kafka (
id bigint,
name string,
proctime as proctime()
) WITH (
  'connector' = 'kafka',
  'topic' = 'test',
  'properties.bootstrap.servers' = 'xxx',
  'properties.group.id' = 'test',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);


CREATE TABLE source_hbase (
  id string, --客户id
  cf ROW <name string>, --预约id
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'zyd_test',
'zookeeper.quorum' = 'xxx'
);

create table sink_print(
id bigint,
name string
)
 WITH (
'connector' = 'print'
);

insert into sink_print
select 
t1.id,t2.name
from
source_kafka t1
left join 
source_hbase for system_time as of proctime as t2
on cast( t1.id as string)=t2.id ;



在这里插入图片描述
换种写法报错

insert into sink_print
select 
t1.id,t2.name
from
source_kafka t1
left join 
source_hbase for system_time as of proctime as t2
on t1.id =cast(t2.id as bigint);

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal table join requires equivalent condition of the same type, but the condition is id[BIGINT]=id[STRING NOT NULL]
可以发现 flink sql 关联动态表的时候,其实先将流表数据与动态表关联,再去flink sql 中逻辑处理,所以报错类型不匹配

这样流的数据量过大,对于维表不是有很大的io消耗么,所以flink sql 使用了lookupcache的概念,那就是加缓存

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐