flink写入到kafka,只写入指定分区问题排查
背景:flink的datastream部署到线上时,发现数据只能写入到kafka的一些分区,其他分区没有数据写入。当把flink的并行度设置大于等于kafka的分区数时,kafka的分区都能写入数据。于是研究了一下源码。FlinkFixedPartitioner源码:package org.apache.flink.streaming.connectors.kafka.partitioner;im
背景:
flink的datastream部署到线上时,发现数据只能写入到kafka的一些分区,其他分区没有数据写入。当把flink的并行度设置大于等于kafka的分区数时,kafka的分区都能写入数据。于是研究了一下源码。
FlinkFixedPartitioner源码:
package org.apache.flink.streaming.connectors.kafka.partitioner;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
@PublicEvolvingpublic
class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
private static final long serialVersionUID = -3785320239953858777L;
private int parallelInstanceId;
public FlinkFixedPartitioner() {
}
public void open(int parallelInstanceId, int parallelInstances) {
Preconditions.checkArgument(parallelInstanceId >= 0,
"Id of this subtask cannot be negative.");
Preconditions.checkArgument(parallelInstances > 0,
"Number of subtasks must be larger than 0.");
this.parallelInstanceId = parallelInstanceId;
}
public int partition(T record, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
Preconditions.checkArgument((partitions != null) &&
(partitions.length > 0), "Partitions of the target topic is empty.");
return partitions[this.parallelInstanceId % partitions.length];
}
public boolean equals(Object o) {
return (this == o) || o instanceof FlinkFixedPartitioner;
}
public int hashCode() {
return FlinkFixedPartitioner.class.hashCode();
}
}
关键代码: partitions[this.parallelInstanceId % partitions.length]
根据源码可以看出:
flink是根据sink的subtask的id 和kafka的partition数量进行取余计算出相应的分区值的。
计算过程如下:
flink并行度为3(F0,F1,F2),partition数量为2(P0,P1),则F0->P0,F1->P1,F2->P0
flink并行度为2(F0,F1),partition数量为3(P0,P1,P2),则F0->P0,F1->P1
因此默认分区器会有以下问题:
1、当 Sink 的并发度低于 Topic 的 partition 个数时,一个 sink task 写一个 partition,会导致部分 partition 完全没有数据,从而导致数据倾斜。
2、如果sink的并行度总数不是topic的partition的倍数时,还是会存在数据倾斜问题。
3、当 topic 的 partition 扩容时,则需要重启作业,以便发现新的 partition,否则新的分区也发现不了。
同时自己去查看flink的kafka connector,发现相关的connector有好几种,最新的flink1.12推荐使用新的jar包。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.12.3</version></dependency>
解决方法:
1、通过设置分区器值为null,走kafka的默认分区器来解决指定分区写不到数据的问题,
resultDs.addSink(new FlinkKafkaProducer<>(productTopic,new KafkaKeyedSerializationSchema(),getPropertiesFromBrokerList(productBootstrapServers),null,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,5))
更多推荐
所有评论(0)