Kafka Producer Retries & Idempotence 原理

由于存在网络瞬时抖动;或者kafka集群短暂的不可用,会导致kafka producer发送消息出现异常,生产者无法将消息推送到topic,在这种情况下,消息丢失的可能性很高。因此kafka设计了一套重试机制,来确保在一定的条件下,系统使用重试策略,重新自动发送消息。今天来学习下kafka 生产者重试机制以及重试机制带来的问题。

重试机制

kafka生产者发送消息至服务端-broker时,broker返回成功或失败,表示该消息是否投递成功。成功返回不必多说,broker返回的错误分为两大类:

  • Retriable errors

    可重试错误 - 重试后可以解决的错误,如broker返回NotEnoughReplicasException异常,则允许生产者重发消息,也许下一秒broker副本在线之后,消息投递成功。

  • Nonretriable error

    不可重试错误 - 即无法解决的错误,如broker返回INVALID_CONFIG异常,kafka 生产者再次尝试发送请求也不会更改请求的结果,broker还是返回错误。

开启重试

默认情况下,Kafka Producer 生产者关闭重试功能,需要开发者手动配置重试策略,代码非常简单:

Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
//重试次数
props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");

重试次数

retries 设置决定生产者投递消息失败之后,重试的次数。不同的版本重试的默认值并不一致:

  • 0 - kafka 版本 <= 2.0
  • MAX_INTEGER - Kafka版本 >= 2.1时, Integer 类型最大值 - 2147483647

通常情况下,开发者不用设置此配置,而是使用 delivery.timeout.ms控制重试行为

delivery.timeout.ms

如果 retries > 0,例如 retries = 2147483647,生产者不会永远不会进行请求重试,因为它还受到超时时间的限制。

例如,你将Producer的超时时间设置为 delivery.timeout.ms=120000 ( 2 分钟). 消息无法在2分钟之内投递成功则认为失败(注意:此处包含消息重试). 也就是2分钟之内无论消息投递多少次,如果还没有投递成功,则认为失败。delivery.timeout.ms的时间配置包含了好几个部分,在使用时需要特注意,如下图:

在这里插入图片描述

retry.backoff.ms

默认情况下,生产者将在重试之间会停顿等待100ms,开发者可使用retry.backoff.ms参数来控制这一点。

max.in.flight.requests.per.connection

该参数的默认值为5,在该参数值不为1的情况下进行重试,可能会改变消息的顺序。如将两个批次发送到单个分区,并且第一个批次失败并重试,但第二个批次成功,则第二个批中的记录可能会首先出现。如果应用程序对消息顺序有严格要求,请将该参数位置为1.即max.in.flight.requests.perconnection=1,Kafka保证将在某些消息需要多次重试才能成功确认的情况下保留消息顺序。

幂等性

失败消息重试可能会存在数据重复的问题,即两个消息都被成功写入broker,从而导致重复。引发该现象的大概步骤如下:

  1. 生产者发送消息至broker
  2. 消息被成功写入leader、replicated节点
  3. 可能存在瞬时的网络抖动问题,导致生产者并没有接收到服务器发出的ACK
  4. 生产者重新发送消息 而且收到broker的ack
  5. 这种场景下,broker 收到了2条一模一样的消息

成功 VS 重试的流程图如下

在这里插入图片描述

生产幂等性

生产者幂等性机制确保不会因消息重试而出现业务重复消息

在这里插入图片描述

实现原理

当enable.idempotence设置为true时,每个生产者都会被分配一个生产者Id(PID)。每次发送消息时都会携带PID,此外还会附加一个单调递增的序列号,该序列号跟消息待发送的分区存在一一对应的关系(可以简单理解为map结构Key、Value键值对;分区partition为key,序列号为值)。当broker收到重试的消息,由于该分区对应的最后消息的序列号大于、等于新消息序列化,broker拒绝写入,从而实现消息的幂等性。

但是需要注意,这种机制存在很大的缺陷:原因是每个producer的PID在初始化时自动分配,因此只能保证单个生产者的EOS(精确一次)语义,不保证全局消息的幂等性。

开启幂等性

如果已经配置acks=all,那么应该启用幂等性功能。开启的方式也很简单,只需使用生产者配置 enable.idempotence = true。

Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 打开幂等性机制
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

官方建议,通常情况下应当开启 kafka producer 幂等性功能。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐