flink sql 时态和和静态表的理解
静态表flink sql 定义的维度表,一般程序启动的时候将维度即信息一次性缓存到内存中,下次流数据与之关联的时候,实际是与内存中数据进行关联查询样例SQLcreate table source_kafka (id bigint,name string,proctime as proctime()) WITH ('connector' = 'kafka','topic' = 'test','pro
静态表
flink sql 定义的维度表,一般程序启动的时候将维度即信息一次性缓存到内存中,下次流数据与之关联的时候,实际是与内存中数据进行关联查询
样例SQL
create table source_kafka (
id bigint,
name string,
proctime as proctime()
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'xxx',
'properties.group.id' = 'test',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE source_hbase (
id string,
cf ROW <name string>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'zyd_test',
'zookeeper.quorum' = 'xxx'
);
create table sink_print(
id bigint,
name string
)
WITH (
'connector' = 'print'
);
insert into sink_print
select
t1.id,t2.name
from
source_kafka t1
left join
source_hbase t2
on t1.id=cast(t2.id as bigint);
kafka 测试用例,第二条数据是修改hbase后发送的
hbase操作
flink 输出日志
hbase中再增加一条数据
kafka中也发送了
flink 结果
动态表
针对以上问题,维度发生更新了怎么办,需要引入动态表,锁定事件的时间
create table source_kafka (
id bigint,
name string,
proctime as proctime()
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'xxx',
'properties.group.id' = 'test',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE source_hbase (
id string, --客户id
cf ROW <name string>, --预约id
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'zyd_test',
'zookeeper.quorum' = 'xxx'
);
create table sink_print(
id bigint,
name string
)
WITH (
'connector' = 'print'
);
insert into sink_print
select
t1.id,t2.name
from
source_kafka t1
left join
source_hbase for system_time as of proctime as t2
on cast( t1.id as string)=t2.id ;
换种写法报错
insert into sink_print
select
t1.id,t2.name
from
source_kafka t1
left join
source_hbase for system_time as of proctime as t2
on t1.id =cast(t2.id as bigint);
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal table join requires equivalent condition of the same type, but the condition is id[BIGINT]=id[STRING NOT NULL]
可以发现 flink sql 关联动态表的时候,其实先将流表数据与动态表关联,再去flink sql 中逻辑处理,所以报错类型不匹配
这样流的数据量过大,对于维表不是有很大的io消耗么,所以flink sql 使用了lookupcache的概念,那就是加缓存
更多推荐
所有评论(0)