提取json

加入maven依赖

		<dependency>
			<groupId>com.jayway.jsonpath</groupId>
			<artifactId>json-path</artifactId>
			<version>2.2.0</version>
		</dependency>

使用json path

使用json path来获取json_c字段里的数据,提取出来后为json格式的String(即没有转义的json)字符串

	stream
	.map(
		s-> StringUtils.substringBetween(s,"\"after\":",",\"source\":")
	)
	.map(row-> JsonPath.read(row,"$.json_c"))//读成String格式
	.print();

转换成table

这里使用blink planner来处理

加入maven依赖

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
			<version>1.9.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-blink_2.11</artifactId>
			<version>1.9.0</version>
		</dependency>

代码指定blink planner

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

错误的数据去重

一般来说我们表里只需要反应最近的值就可以了,但是如果一个记录被update一次,stream里就会出现一次,print里就有有一条。那么如果希望去除掉旧的记录怎么办呢?下面是一种错误的方法

		DataStream<SourceDto> stream=env.addSource(consumer)
				.map(this::read)//读成String格式
				.filter(Objects::nonNull)
				.keyBy("id")
				.reduce(this.reduce())
				;
		stream.print();

上面是先通过id字段来分组,再通过reduce来去重,reduce方法的代码如下

		return new ReduceFunction<T>() {
            @Override
            public T reduce(T t1, T t2) throws Exception {
                return t1.getTxId() > t2.getTxId() ? t1 : t2;
            }
        };

这个是使用了pgsql里的txId来判断最后一条记录的。但是实际过程中由于stream是appendOnly的,当一条DB中执行update后,在stream中会流出一条记录,没有retract

采用SQL去重

这是一条可行的方式

		final String tableName="dto_table_";
		bsTableEnv.registerDataStream(tableName,stream,"id,txId,id_c,int_c");
		Table sourceTable=bsTableEnv.sqlQuery("select t.* from "+tableName+" as t inner join (" +
				"select id,max(txId) as txId from "+tableName +" group by id"+
				") as f on t.id=f.id and t.txId=f.txId");
		bsTableEnv.registerTable("dto_table",sourceTable);

通过inner join的自关联,只保留txId最大的一条记录。但我觉得这不是一种优雅的方式。

执行SQL

剩下的就是执行一条常规的SQL语句了

String sql="select a.id_c ,b.int_c+a.int_c as int_c  from dto_table as a left join dto_table as b on a.id_c=b.id_c where a.int_c>0";
Table dtoTable=bsTableEnv.sqlQuery(sql);

Sink

再将SQL执行结果转化为流写出

bsTableEnv
	.toRetractStream(dtoTable,SinkDto.class)
	.print();

但是这里还是有个问题,貌似retract未能生效。不过对于PGSQL这种支持upsert的应该没有问题

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐