kafka 提供了 “at least once” 的语义,即消息会发送一次或者是多次。但是人们真正想要的是 “exactly once” 的语义,即重复的消息不会再被发送

导致消息重复的两个常见的原因:

  1. client 发送消息到 cluster 的时候由于网络错误而重试,可能会导致消息重复。如果在消息发送之前就发生网络错误发生,则不会导致消息重复。如果在消息写入日志之后并在响应 client 之前发生网络错误,会有重复的风险,如果 client 不知道消息是否发送成功,所以就会选择重试,或者是放弃重试并声明消息已丢失。
  2. consumer 从 topic 中读取消息,然后挂掉了。然后,这个 consumer 从最新的一个已知的位置重启或者一个新的 consumer 从已知最新位置启动。

第二种情况可以由 kafka consumer 提交 offset 来解决。他们可以将 offset 和他们的输出一起处理,已确保新的 consumer 总是从最新存储的 offset 处开始消费。另外他们也可以将 offset 最为 key ,已消除重复数据。

第一种情况,当前没有很好的解决方案。因为 client 不知道消息的偏移量,因此它没有唯一的方法来标识并检查消息是否发送成功。

对于 consumer 而言,这项提议将会加强 kafka 对 “atomic broadcast” 的保障。

这项提议将引入一组可选的 id ,这些 id 将为消息提供唯一的标识符,以避免重复。

一些问题

Opt-in
如果 producer 不关心幂等性则不用做任何特殊的操作
Transitivity:
Consumers 也会生产数据
考虑一个复杂的案例,从一个源向 kafka topic 中拷贝数据,比如:Mirror Maker ,或者 stream processing,我们希望的是,执行拷贝的进程可以定期保存其在上游 topic /database 中的位置,并始终从此保存的位置恢复。在发生崩溃的情况下,我们希望复制过程能够从上一个已知位置恢复,而不会在目标 topic 中产生重复项。为了实现这一点,复制过程可以保存其输入 offset 和将引入与其下游 topic 相关联的 id。当它在崩溃后重新启动时,它将使用保存的 id 进行初始化。这将有效地使复制的生成请求与上面描述的网络错误重试情况相同。

Fencing
另一个情况是,在 Mirror Maker 或其他消费者故障自动检测的情况下,有可能出现误报,导致至少暂时有两个 consumer 读取相同的输入并产生相同的输出。我们必须正确而优雅地处理这个“脑裂”的问题。

Pipelining
还有一种情况是我们要能够在有重试的情况下按照生成请求的顺序发送消息。当与重试结合使用的时候,这可能会导致消息接收的顺序不一致。如果发送者在不等待回应的情况下,异步发送消息 M1 ,M2 ,M3 ,然后成功接收到了 M1 ,M3 ,但是 M2 失败了,如果重试 M2 成功,这将会导致 topic 中消息的顺序为 M1 ,M3 ,M2 。

Fault tolerance
一种常见的错误是 broker 失败。如果 broker 失败,请求未完成也未确认,也不知道新选择的 master 是否包含该消息,所以将要重试相应的请求。因此,broker 失败的情况下,也是需要等幂机制的。

具体实现方式

一种简单的不切实际的消除重复消息的实现是 client 为每一条消息创建一个 uuid ,server 端会保留它所接收消息的 uuid ,新来的消息将根据已保留的 uuid 进行校验,如果 uuid 存在则拒绝接受。这至少可以满足基本的需求,但是效率非常低,因为将包含 O(num_messages) 条 uuid [译者注: 基本上有两种实现方式一种是使用数据库,但随着数据量的增大,查询效率会越来越低。另外一种方式完全基于内存,类似于key-value的形式,但会占用大量的内存空间] 。实际的实现也应该是类似的去重方法,但需要更少的空间和更快的查询效率。

一种类似的但特别有效率的实现方式是为每一个 producer 指定一个唯一的 pid 和递增的 sequence number ,这两个组合起来可以作为一个 uuid ,broker 不再需要存储所有的 id 来解释它从一个给定的生产者那里收到了什么。利用了 kafka (和 TCP) 有序的属性来确保 broker 只需要为每一个 producter 保存一个 “ highwater mark ” sequence number 并且拒绝接受任何低于此 sequence number 的消息,具体来说,如果 H§ 是 highwatermark,如果代理收到一条带有 PID P 和序列号 S 的消息,那么它将只接受 iff h§<S 的消息。

下一个问题是生产者是否将在它发送的所有消息中维护一个全局 sequence number ,或者仅仅为每个 topic-partition 中维护一个全局 sequence number 。一个全局的 sequence number 在 client 端是很容易实现的。然而如果 sequence number 是基于每一个 topic-partition ,broker 将会有一个很强的约束,即 H§ + 1 = S 。这样的话,我们就可以像处理普通请求一样处理 pipelined 请求。

请注意,当目前为止,我们解决了刚开始提到的关于 consumer/producer 产生重复消息的问题。这个处理的方式可以定期存储其上游的 offset ,以及其 pid 和 sequence number 。当它重新启动时,它将使用 offset 、pid 和 sequence number 重新初始化。它的一些初始请求可能会被拒绝,因为它们发送的sequence number 低于 server 的 highwater mark 。

为了完成这一提案,我们只需要弄清楚如何向 producers 提供独特的 pid,如何为highwater mark 提供容错,以及如何提供上述 “fencing” ,以防止两个具有相同 pid 的 producers 相互干扰。

实现细节

第一件事情我们需要确认的是我们必须确保 server 失败之后没有重复的消息,这意味着无论哪个 server 作为分区的 leader ,都必须拥有与前 leader 相同的所有 producer id 信息。实现这一点的最简单方法是将 pid 字段添加到消息本身,以便将它们复制到 follower 的日志中。

每条消息都有三个新的整数字段组成:pid,sequence_number 和 generation。 如果 pid 为 0,则不实现幂等性,server 将忽略 sequence_number 。另外server 将维护 (pid,topic,partition)=>(generation,sequence_number_highwater) 的映射, server 将检查每个新传入消息上的这些字段,并且只有在序列号正好比其 highwater mark 大一时才会将消息附加到日志中。 此外,generation 必须等于 server 存储的 generation 或更大。增加 generation 将阻止上面 Fencing 提到的,来自“僵尸”生成者的任何消息。

下面,我们需要描述 producer 如何发现其 pid、sequence_number 和 generation。为了实现这一点,我们将添加一个新的API lease_pid,用于分配唯一的 producer id 。API将采用以下格式:

Request:
lease_pid_request => topic partition
Response:
lease_pid_response => error pid generation sequence_number expire_ms

这个请求也可以有一个批处理版本来同时处理多个分区,但是为了简单起见,我描述了单个分区的情况。

此API有几种预期用途:

  1. 当 client 第一次启动并且没有 pid 时,它将相应的每个分区 pid 字段设置为-1,并发出一个 lease_pid_request 。然后 server 将返回唯一的 pid 、随机生成的generation、序列号设置为0。
  2. client 将负责在 pid 过期之前去更新过期时间。
server 端的实现细节

必须需要考虑的一个细节是 pids 的过期方式。最简单的解决方案是将 pid 绑定到连接上,这样在连接断开时可以自动使它们过期。但这并不有效,因为 pid 在连接断开后仍需要存在(这恰恰是它比较重要的目的)。

相反,该提议是假设 cluster 将会在 pid 发出之后有一个固定的过期时间并且 pid 可以重用,也可以允许客户端在其 lease_pid 请求中自定义过期方式,但这需要更复杂的实现,因为所有副本都必须知道每个pid的过期方式。服务器将大致按顺序发布 pids ,因此只有在发布了40亿个 pids 之后才会实际进行重用。(译者注:pid为 int类型的)

每一个 server 分配 pid 都是单调的,也就是说在M>N的情况下,如果 pid N 已经过期了,那么接下来就是 pid M 了。这意味着我们只需保留一个简单的 pid 数组或者是列表,新来的 pid 将添加到一端,并从另一端过期,查找仅基于二分查找。servers 需要固定大小的内存,以环形缓冲的形式来保存固定大小的 pid 数组。

leader 和 followers 都保持这种结构,他们会定期将 pid 数组快照到磁盘并且维护所有 partition 的 offset ,这样发生崩溃时,他们仍然可以用此快照和 offset 从日志中恢复。

请注意,当 map [译者注:上文提到的(pid,topic,partition)=>(generation,sequence_number_highwater) ] 仅因为 product 请求而更新的时候,lease-pid 请求不会更改它!这样做的原因是为了确保所有数据都在复制的日志中,并且我们不需要第二个相同的结构。尽管如此,我们仍需要保证如果一个 server 在生成 pid 然后没有任何消息被产生,然后这个 server 失败的情况下,对应 topic/partition 的 pid 不应该被再次生成,即使新生成的 leader 的 map 中没有这个 pid 。为了确保这一点,我们用一个全局的 zookeeper sequence 来生成 pid。为了提高效率,server 可以一次增加100个 pid ,然后等用完这些 pid 的时候再进行分配(当服务器崩溃时,这可能会浪费一些 pid,但这没问题)。
[译者注:kafka1.0
图片.png]

另外需要注意的是过期时间只是近似的,它是基于 server 对应分区的第一条消息的达到时间。实际上 server 保证至少有那么长的时间才会过期,pids 真正的过期时间却可以更长。这也意味着 follower 可以用消息的到达时间 ( 虽然 follower 的消息到达时间比 leader 消息导到的时间更大 )。在数据恢复的情况下,pid 的循环缓冲区将会被填满并且 所有 pid 的过期时间都是精确的。

#####client 端的实现细节
一般重复数据消除将在 producer 中自动进行。默认情况下,它应该足够简单易用。
要将其集成到像 mirror maker 和 samza 这样的链接 producers 和 consumers 的工具中,我们需要能够保存 producer 的 pid 和 sequence number 。我们可以通过将其包含在 producer 返回的响应中来实现这一点。
producer 需要一个配置来设置初始化时的初始 pid、sequence number 和 generation 。
我们可能会考虑扩展 offsetcommit 请求来同时存储这些字段。

原文地址:
Idempotent Producer

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐