背景:

flink的datastream部署到线上时,发现数据只能写入到kafka的一些分区,其他分区没有数据写入。当把flink的并行度设置大于等于kafka的分区数时,kafka的分区都能写入数据。于是研究了一下源码。

FlinkFixedPartitioner源码:

package org.apache.flink.streaming.connectors.kafka.partitioner;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;


@PublicEvolvingpublic
class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
    private static final long serialVersionUID = -3785320239953858777L;
    private int parallelInstanceId;

    public FlinkFixedPartitioner() {
    }

    public void open(int parallelInstanceId, int parallelInstances) {
        Preconditions.checkArgument(parallelInstanceId >= 0,
            "Id of this subtask cannot be negative.");
        Preconditions.checkArgument(parallelInstances > 0,
            "Number of subtasks must be larger than 0.");
        this.parallelInstanceId = parallelInstanceId;
    }

    public int partition(T record, byte[] key, byte[] value,
        String targetTopic, int[] partitions) {
        Preconditions.checkArgument((partitions != null) &&
            (partitions.length > 0), "Partitions of the target topic is empty.");

        return partitions[this.parallelInstanceId % partitions.length];
    }

    public boolean equals(Object o) {
        return (this == o) || o instanceof FlinkFixedPartitioner;
    }

    public int hashCode() {
        return FlinkFixedPartitioner.class.hashCode();
    }
}

关键代码:   partitions[this.parallelInstanceId % partitions.length]

根据源码可以看出:

flink是根据sink的subtask的id 和kafka的partition数量进行取余计算出相应的分区值的。

计算过程如下:

flink并行度为3(F0,F1,F2),partition数量为2(P0,P1),则F0->P0,F1->P1,F2->P0

flink并行度为2(F0,F1),partition数量为3(P0,P1,P2),则F0->P0,F1->P1

因此默认分区器会有以下问题:

1、当 Sink 的并发度低于 Topic 的 partition 个数时,一个 sink task 写一个 partition,会导致部分 partition 完全没有数据,从而导致数据倾斜。

2、如果sink的并行度总数不是topic的partition的倍数时,还是会存在数据倾斜问题。

3、当 topic 的 partition 扩容时,则需要重启作业,以便发现新的 partition,否则新的分区也发现不了。

同时自己去查看flink的kafka connector,发现相关的connector有好几种,最新的flink1.12推荐使用新的jar包。

<dependency>    
   <groupId>org.apache.flink</groupId>     
   <artifactId>flink-connector-kafka_2.12</artifactId> 
<version>1.12.3</version></dependency>

解决方法:

1、通过设置分区器值为null,走kafka的默认分区器来解决指定分区写不到数据的问题,


 resultDs.addSink(new FlinkKafkaProducer<>(productTopic,new KafkaKeyedSerializationSchema(),getPropertiesFromBrokerList(productBootstrapServers),null,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,5))

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐