Sink部分Flink根据Kafka分为了2个部分

  • 0.11之前
  • 0.11之后

0.11之前因为没有kafka的事务相关 所以没法做到 消息的exactly_once

0.11之后是可以实现的

FlinkKafkakProducer 的创建有多个重载构造方法,当我们没有指定相关的 流checkpoint 语义,

那么默认 是at_least_once

FlinkKafkaProducerBase

public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {

继承 sinkFunction 基于实现checkpointFunction

对于SinkFunction相关的处理逻辑是在invoke,snapshot里面,但我们首先看下其一些前置准备工作 open(), init()

Open

open方法做了一些前置工作的准备,主要是一下几个

  1. 序列化方式
  2. 创建KafkaProducer
  3. metrics
  4. checkpoint配置

Invoke

invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)

如果 transcation 如果是 EXACTLY_ONCE 每次会在每次 checkpoint的时候变化

做一些消息转发的动作, 如果是 exactly_once模式 这时候是不会真正的提交,只有在checkpoint的时候才会

将本次事务的消息进行提交,然后再开启下一个事务

Snapshot checkpoint 阶段 (重点)

  1. FlinkKafkaProducer.snapshot()
    1. super.snapshotState(context) 实际调用 TwoPhaseCommitSinkFunction
  2. TwoPhaseCommitSinkFunction.snapshotState
    1. 获取checkpointId
    2. preCommit 将当前事务的数据进行提交
      1. kafkaProducer: 将exactly_once 和 at_least_once 的数据进行 producer.flush

        3. currentTransactionHolder = beginTransactionInternal() 开启一个新的事务

  •                 这里只有对 exactly_once 模式进行了事务创建
  •                 at_least_once && none 默认复用之前的

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐