问题描述

数据源是从一个多台服务器提供的HA kafka集群中读取,在工作的时候部署了一个简单计算逻辑的flink job到flink服务器上,从kafka执行自带sh命令和查看原始数据文件发现数据都没有问题。但是最终输出的统计结果与真实数据对比不一致。从flink直接读取的数据乱序(例如: 原始文件及命令创建消费者读取kafka数据 为 1,2,3,4。 在flink中注册resource直接打印出来是1,4,2,3)

 

问题原因

最终经过排查由kafka存储机制导致。

因为实验数据源topic配置的partition配置为3,所以当有生产者producer 想向该topic发送数据时,kafka会根据自己的内置逻辑将数据写入到多个partition中(假设partition=3,如果kafka集群只有一台服务器,则是写入到三个文件中。如果kafka集群具有3台及以上服务器,则是分布式存储到不同机器上),所以此时会造成数据只在partition内有序。

 

 

假如有业务系统产生了以下的数据:

时间戳数据
2020-09-18  00:00:001
2020-09-18  00:00:012
2020-09-18  00:00:023
2020-09-18  00:00:034
2020-09-18  00:00:045
2020-09-18  00:00:056

 

数据发送到有部署在A,B,C三台服务器上,且partition设置为3的topic上的话。数据的存储形式可能如下:

服务器A的kafka文件

时间戳数据
2020-09-18  00:00:012
2020-09-18  00:00:056

 

服务器B的kafka文件

时间戳数据
2020-09-18  00:00:001
2020-09-18  00:00:034

 

服务器C的kafka文件

时间戳数据
2020-09-18  00:00:023
2020-09-18  00:00:045

 

 

当flink job 的并发度配置小于kafka数据源的topic 的partition数量时,例如为1.则kafka集群对数据进行reduce之后发送到消费者的时候必然是按照服务器的数据顺序进行堆叠。

假设kafka集群进行reduce的时候的顺序是A -> B -> C

则消费者收到的数据将会发生乱序:

时间戳数据
2020-09-18  00:00:012
2020-09-18  00:00:056
2020-09-18  00:00:001
2020-09-18  00:00:034
2020-09-18  00:00:023
2020-09-18  00:00:045

 

最终会影响消费者的计算结果,因为flink计算引擎自带的事件时间特性(flink 的eventTime概念以后有时间再赘述一下,甚至连计数都不能准确)。

 

解决方案

对于flink之类可支持分布式的计算引擎,可以将job的并发度设置成和topic的partition一样。这样在job执行的时候每个slave节点对应一个kafka数据文件(文件内的数据是有序的,参见上面举例的ABC服务器上的数据),最终由计算引擎的master进行reduce汇总,可以保证数据计算的准确性。

本人遇到的问题的解决方案是,在flink申明环境的时候将 环境的并发度调整为3,与partition=3对应.则问题解决。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐