
flinkSQL数据去重
结论:flinkSQL 流式数据去重,与批式逻辑一致。-- flinkSQL数据去重。-- kafka 数据准备。
-- flinkSQL数据去重
-- ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime ASC) AS row_num
drop table `my_hive`.`dl_test`.`a3` ;
CREATE TABLE `my_hive`.`dl_test`.`a3` (
`id` VARCHAR(2147483647) NOT NULL,
`type` VARCHAR(2147483647) NOT NULL,
`data` VARCHAR(2147483647) NOT NULL,
proctime AS PROCTIME()
) WITH (
'properties.bootstrap.servers' = 'xxxxx:9092',
'connector' = 'kafka',
'json.ignore-parse-errors' = 'true',
'format' = 'json',
'topic' = 'flink_test',
'properties.group.id' = 'flink_test_02',
'scan.startup.mode' = 'earliest-offset'
)
;
-- remove duplicate rows on order_id and keep the first occurrence row,
-- because there shouldn't be two orders with the same order_id.
my_hive`.`dl_test`.`a3`
select id,sum(data) as sum_data
from (
SELECT id,type,cast(data as int) as data
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime ASC) AS row_num
FROM `my_hive`.`dl_test`.`a3`)
WHERE row_num = 1
) t
group by id
;
-- kafka 数据准备
{"id":"1","type":"1","data":"1"}
{"id":"1","type":"2","data":"1"}
{"id":"1","type":"3","data":"1"}
{"id":"1","type":"3","data":"2"}
{"id":"2","type":"1","data":"1"}
{"id":"3","type":"1","data":"1"}
结论:flinkSQL 流式数据去重,与批式逻辑一致
Flink 文档:去重 | Apache Flink
更多推荐
所有评论(0)