Flink整合Kafka&Debezium(三)流转换为表
提取json加入maven依赖<dependency><groupId>com.jayway.jsonpath</groupId><artifactId>json-path</artifactId><version>2.2.0</version></dependency&...
·
提取json
加入maven依赖
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.2.0</version>
</dependency>
使用json path
使用json path来获取json_c字段里的数据,提取出来后为json格式的String(即没有转义的json)字符串
stream
.map(
s-> StringUtils.substringBetween(s,"\"after\":",",\"source\":")
)
.map(row-> JsonPath.read(row,"$.json_c"))//读成String格式
.print();
转换成table
这里使用blink planner来处理
加入maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.0</version>
</dependency>
代码指定blink planner
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
错误的数据去重
一般来说我们表里只需要反应最近的值就可以了,但是如果一个记录被update一次,stream里就会出现一次,print里就有有一条。那么如果希望去除掉旧的记录怎么办呢?下面是一种错误的方法
DataStream<SourceDto> stream=env.addSource(consumer)
.map(this::read)//读成String格式
.filter(Objects::nonNull)
.keyBy("id")
.reduce(this.reduce())
;
stream.print();
上面是先通过id字段来分组,再通过reduce来去重,reduce方法的代码如下
return new ReduceFunction<T>() {
@Override
public T reduce(T t1, T t2) throws Exception {
return t1.getTxId() > t2.getTxId() ? t1 : t2;
}
};
这个是使用了pgsql里的txId来判断最后一条记录的。但是实际过程中由于stream是appendOnly的,当一条DB中执行update后,在stream中会流出一条记录,没有retract
采用SQL去重
这是一条可行的方式
final String tableName="dto_table_";
bsTableEnv.registerDataStream(tableName,stream,"id,txId,id_c,int_c");
Table sourceTable=bsTableEnv.sqlQuery("select t.* from "+tableName+" as t inner join (" +
"select id,max(txId) as txId from "+tableName +" group by id"+
") as f on t.id=f.id and t.txId=f.txId");
bsTableEnv.registerTable("dto_table",sourceTable);
通过inner join的自关联,只保留txId最大的一条记录。但我觉得这不是一种优雅的方式。
执行SQL
剩下的就是执行一条常规的SQL语句了
String sql="select a.id_c ,b.int_c+a.int_c as int_c from dto_table as a left join dto_table as b on a.id_c=b.id_c where a.int_c>0";
Table dtoTable=bsTableEnv.sqlQuery(sql);
Sink
再将SQL执行结果转化为流写出
bsTableEnv
.toRetractStream(dtoTable,SinkDto.class)
.print();
但是这里还是有个问题,貌似retract未能生效。不过对于PGSQL这种支持upsert的应该没有问题
更多推荐
已为社区贡献4条内容
所有评论(0)