学习笔记之Kafka幂等和事务
文章目录Producer 幂等性Kafka为啥需要幂等性?Kafka的幂等性是如何实现的?幂等性引入之后解决了什么问题?幂等性的限制条件幂等性的实现原理幂等性整体流程消息重试对顺序消息的影响幂等性的使用Producer 事务Kafka引入事务的用途?基本概念事务解决的场景事务保证事务恢复的保证事务原子性的保证事务中 Offset 的提交保证用于事务特性的控制型消息事务流程事务原理流程图寻找 TC
文章目录
Producer 幂等性
幂等性是指发送同样的请求,对系统资源的影响是一致的。结合 Kafka Producer,是指在多次发送同样的消息,Kafka做到发送消息的不丢失和不重复。
Kafka为啥需要幂等性?
在使用Kafka时,需要确保Exactly-Once语义。分布式系统中,一些不可控因素有很多,比如网络、OOM、FullGC等。在Kafka Broker确认Ack前,有可能出现网络异常、FullGC、OOM等问题时导致Ack超时,Producer会进行重复发送。注,在未达到最大重试次数前,会自动重试(非应用程序代码写的重试)。
Kafka的幂等性是如何实现的?
Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重,Kafka为了实现幂等性,在0.11.0.0之后加入的该新功能,它在底层设计架构中引入了ProducerID和SequenceNumber。那这两个概念的用途是什么呢?
- ProducerID :在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
- SequenceNumber :对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
幂等性引入之后解决了什么问题?
面对这样的问题,Kafka引入了幂等性。那么幂等性是如何解决这类重复发送消息的问题的呢?下面我们可以先来看看流程图:
同样,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如Broker在发送Ack信号给Producer时出现网络异常,导致发送失败。异常情况如下图所示:
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
幂等性的限制条件
单独只使用Producer的幂等性是存在一些限制条件的:
- 只能保证 Producer 在单个会话内不丟不重 ,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
- 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性 ,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。
如果需要跨会话、跨多个 topic-partition 的情况,需要使用 Kafka 的事务性来实现。这种幂等性只是保证了再生产端实现了幂等性,在实际场景中往往需要在消息者端实现幂等性,可以最大程度避免重复消费。
幂等性的实现原理
每个新的Producer在初始化的时候会被分配一个唯一的PID(凡是开启幂等性都是需要生成PID,只不过未开启事务的PID可以在任意broker生成,而开启事务只能在TransactionCoordinator节点生成),该PID对用户完全透明而不会暴露给用户。Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其消息序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,否则将其丢弃:
- 如果消息序号比Broker维护的序号大于1以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
- 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber
上述设计解决了 0.11.0 之前版本中的两个问题:
- Broker 保存消息后,发送 ACK 前宕机,Producer 认为消息未发送成功并重试,造成数据重复
- 前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序
注:producer_id是从Kafka服务端请求获取的(通过 ProducerIdManager 的 generateProducerId() 方法产生,维护在zk中的 /latest_producer_id_block
节点),消息序列号是Producer端生成的,初始值为0,之后自增加1,每个分区都有独立的序列号。。这里需要说明下,Kafka发送消息都是以batch的格式发送,batch包含了多条消息。所以Producer发送消息batch的时候,只会设置该batch的第一个消息的序列号,后面消息的序列号可以根据第一个消息的序列号计算出来。
幂等性整体流程
由于幂等性和事务都是在kafka0.11.0.0版本引入的,幂等流程会涉及事务的的一些部分,事务详情见下文介绍,首先看一个整体流程:
首先 KafkaProducer 在初始化时会初始化一个 TransactionManager 实例,它的作用有以下几个部分:
- 记录本地的事务状态(事务性时必须);
- 记录一些状态信息以保证幂等性,比如:每个 topic-partition 对应的下一个 sequence numbers 和 last acked batch(最近一个已经确认的 batch)的最大的 sequence number 等;
- 记录 ProducerIdAndEpoch 信息(PID 信息)。
Client 端的发送流程:
- 1、应用通过 KafkaProducer 的 send() 方法将数据添加到 RecordAccumulator 中,添加时会判断是否需要新建一个 ProducerBatch,这时这个 ProducerBatch 还是没有 PID 和 sequence number 信息的;
- 2、Producer 后台发送线程 Sender,在 run() 方法中,会先根据 TransactionManager 的 shouldResetProducerStateAfterResolvingSequences() 方法判断当前的 PID 是否需要重置,重置的原因是因为:如果有 topic-partition 的 batch 重试多次失败最后因为超时而被移除,这时 sequence number 将无法做到连续,因为 sequence number 有部分已经分配出去,这时系统依赖自身的机制无法继续进行下去(因为幂等性是要保证不丢不重的),相当于程序遇到了一个 fatal 异常,PID 会进行重置,TransactionManager 相关的缓存信息被清空(Producer 不会重启),只是保存状态信息的 TransactionManager 做了 clear+new 操作,遇到这个问题时是无法保证 exactly once 的(有数据已经发送失败了,并且超过了重试次数);
- 3、 Sender 线程通过 maybeWaitForProducerId() 方法判断是否需要申请 PID,如果需要的话,这里会阻塞直到获取到相应的 PID 信息;
- 4、Sender 线程通过 sendProducerData() 方法发送数据,整体流程与之前的 Producer 流程相似,不同的地方是在 RecordAccumulator 的 drain() 方法中,在加了幂等性之后,drain() 方法多了如下几步判断:
4.1、常规的判断:判断这个 topic-partition 是否可以继续发送(如果出现前面2中的情况是不允许发送的)、判断 PID 是否有效、如果这个 batch 是重试的 batch,那么需要判断这个 batch 之前是否还有 batch 没有发送完成,如果有,这里会先跳过这个 Topic-Partition 的发送,直到前面的 batch 发送完成,最坏情况下,这个 Topic-Partition 的 in-flight request 将会减少到1(这个涉及也是考虑到 server 端的一个设置,文章下面会详细分析);
4.2、如果这个 ProducerBatch 还没有这个相应的 PID 和 sequence number 信息,会在这里进行相应的设置;
- 5、最后 Sender 线程再调用 sendProduceRequests() 方法发送 ProduceRequest 请求,后面的就跟之前正常的流程保持一致了。
幂等性时 Server 端如何处理 ProduceRequest 请求:
当 Broker 收到 ProduceRequest 请求之后,会通过 handleProduceRequest() 做相应的处理,其处理流程如下(这里只讲述关于幂等性相关的内容):
- 1、如果请求是事务请求,检查是否对 TXN.id 有 Write 权限,没有的话返回 TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
- 2、如果请求设置了幂等性,检查是否对 ClusterResource 有 IdempotentWrite 权限,没有的话返回 CLUSTER_AUTHORIZATION_FAILED;
- 3、验证对 topic 是否有 Write 权限以及 Topic 是否存在,否则返回 TOPIC_AUTHORIZATION_FAILED 或 UNKNOWN_TOPIC_OR_PARTITION 异常;
- 4、检查是否有 PID 信息,没有的话走正常的写入流程;
- 5、LOG 对象会在 analyzeAndValidateProducerState() 方法先根据 batch 的 sequence number 信息检查这个 batch 是否重复(server 端会缓存 PID 对应这个 Topic-Partition 的最近5个 batch 信息),如果有重复,这里当做写入成功返回(不更新 LOG 对象中相应的状态信息,比如这个 replica 的 the end offset 等);
- 6、有了 PID 信息,并且不是重复 batch 时,在更新 producer 信息时,会做以下校验:
6.1、检查该 PID 是否已经缓存中存在(主要是在 ProducerStateManager 对象中检查);
6.2、如果不存在,那么判断 sequence number 是否 从0 开始,是的话,在缓存中记录 PID 的 meta(PID,epoch, sequence number),并执行写入操作,否则返回 UnknownProducerIdException(PID 在 server 端已经过期或者这个 PID 写的数据都已经过期了,但是 Client 还在接着上次的 sequence number 发送数据);
6.3、如果该 PID 存在,先检查 PID epoch 与 server 端记录的是否相同;
6.4、如果不同并且 sequence number 不从 0 开始,那么返回 OutOfOrderSequenceException 异常;
6.5、如果不同并且 sequence number 从 0 开始,那么正常写入;
6.6、如果相同,那么根据缓存中记录的最近一次 sequence number(currentLastSeq)检查是否为连续(会区分为 0、Int.MaxValue 等情况),不连续的情况下返回 OutOfOrderSequenceException 异常。
- 7、下面与正常写入相同。
消息重试对顺序消息的影响
我们知道正常情况下,单个分区内的消息是按发送时间有序的,但发生消息重试时候(例如前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序),kafka如果保证分区内顺序有序?
- 关闭自动重试,即
message.send.max.retries=0
,同步发送消息producer.send(record).get()
,应用程序代码自行执行任何重试。 - 开启自动重试,在未启用幂等性的情况下,可以把
max.in.flight.requests.per.connection=1
。让broker处永远只有一条message在排队,类似单线程处理这种模式,就可以严格控制顺序了。但是这样做会严重影响性能(接收Message的吞吐量)。
参数
max.in.flight.requests.per.connection
(默认为5)代表在发生阻塞之前,客户端的一个连接上允许出现未确认请求的最大数量,也就是在broker处排队等待acks确认的message数量,设置此值是1 表示kafka broker在响应请求之前client不能再向同一个broker发送请求。
- 开启自动重试,在启用幂等性的情况下,可以保证单分区的消息顺序,在kafka2.0.0及以上源码版本,无需再设置
max.in.flight.requests.per.connection=1
了,如果想显式设置max.in.flight.requests.per.connection
,则必须小于等于5。
在kafka2.0.0及以上源码版本可以实现动态调节
max.in.flight.requests.per.connection
值,当出现重试时,max-in-flight-request 可以动态减少到 1,在正常情况下还是按 5来处理(如果不开启幂等性无法实现动态调节,主要因为其实现依赖sequence number)。保证消息顺序的实现:首先是Server 端在处理 ProduceRequest 请求时,还会检查 batch 的 sequence number 值,它会要求这个值必须是连续的,如果不连续都会返回异常,Client 会进行相应的重试,举个栗子:假设 Client 发送的请求顺序是 1、2、3、4、5(分别对应了一个 batch),如果中间的请求 2 出现了异常,那么会导致 3、4、5 都返回异常进行重试(因为 sequence number 不连续);在Client端发送请求时,会先在遍历 Topic-Partition 对应的 queue 中的 batch 时,如果发现 batch 已经有了 sequence number 的话,则证明这个 batch 是重试的 batch,因为没有重试的 batch 其 sequence number 还没有设置,这时候会做一个判断,会等待其 in-flight-requests 中请求发送完成,才允许再次发送这个 Topic-Partition 的数据。
注:在启用幂等性情况下,如果想显式设置max.in.flight.requests.per.connection
,则必须小于等于5,主要原因是:Server 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据(这个5是硬编码写死的,至于为什么是5,可能跟经验有关,此时性能相对来说较高),如果超过 5,ProducerStateManager 就会将最旧的 batch 数据清除。例如:发送的请求顺序是 1、2、3、4、5、6,这时候 server 端只能缓存 2、3、4、5、6 请求,ProducerStateManager 就会将最旧的 batch 中的数据1清除。假设请求1发送失败,需要重试,当重试的请求发送过来后,首先在已缓存的batch数据中检查是否为重复的 batch,这时候检查的结果是非重复,之后会开始 check 其 sequence number值,这时候只会返回一个 OutOfOrderSequenceException 异常,client 在收到这个异常后,会再次进行重试,直到超过最大重试次数或者超时,这样不但会影响 Producer 性能,还可能给 Server 带来压力(相当于client 狂发错误请求)。
幂等性的使用
需要在生产端配置参数enable.idempotence = true
,当幂等性开启的时候acks即为all。如果显性的将acks设置为0或-1,那么将会报错 Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence
。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "1", "hello world")).get();
kafkaProducer.close();
Producer 事务
Kafka引入事务的用途?
引入事务作用为:
- Producer多次发送消息可以封装成一个原子性操作,即同时成功,或者同时失败;
- Consumer-Transform-Producer模式下,因为消费者提交偏移量出现问题,导致在重复消费消息时,生产者重复生产消息。需要将这个模式下消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作。
基本概念
为了支持事务,Kafka 0.11.0版本引入以下概念:
- 1、事务协调者(Transaction Coordinator):类似于消费组负载均衡的协调者,每一个实现事务的生产端都被分配到一个事务协调者。
- 2、引入一个内部Kafka Topic(__transaction_state)作为事务Log:类似于消费管理Offset的Topic,事务Topic本身也是持久化的,日志信息记录事务状态信息,由事务协调者写入。
- 3、引入控制消息(Control Messages):这些消息是生产者产生的并写入到主题的特殊消息,但对于消费者来说不可见。它们是用来让broker告知消费者之前拉取的消息是否被原子性提交。
- 4、引入TransactionId:不同生产实例使用同一个TransactionId表示是同一个事务,可以跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作,避免事务僵死。
- 5、每个生产者增加一个epoch:用于标识同一个事务Id在一次事务中的epoch,每次初始化事务时会递增,从而让服务端可以知道生产者请求是否旧的请求。
- 6、Producer ID:每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。主要是为提供幂等性时引入的。
- 7、Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。
- 8、幂等性:保证发送单个分区的消息只会发送一次,不会出现重复消息。增加一个幂等性的开关enable.idempotence,可以独立与事务使用,即可以只开启幂等但不开启事务。
注:其中6、7和8是支持幂等特性引入,幂等特性可以单独使用,亦可与事务协调使用。
事务解决的场景
- 只有Producer模式,Producer多次发送消息可以封装成一个原子操作,要么都成功,要么失败
- 消费消息和生产消息并存模式,即是Consumer-Transform-Producer模式下,因为消费者提交偏移量出现问题,导致在重复消费消息时,生产者重复生产消息。需要将这个模式下消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作。该场景常见于 Kafka Stream 应用中,同时包含 Consumer 和 Producer,前者负责从 Kafka 中获取消息,后者负责将处理完的数据写回 Kafka 的其它 Topic 中。
- 只有Consumer模式,此时使用事务和手动提交offset效果一样,而且也不是事务属性引入的目的,一般也不在这种情况下使用。
事务保证
事务恢复的保证
为了实现有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。应用程序必须提供一个稳定的(重启后不变)唯一的 ID,也即Transaction ID。Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。
另外,为了保证新的 Producer 启动后,旧的具有相同Transaction ID的 Producer 即失效,每次 Producer 通过Transaction ID拿到 PID 的同时,还会获取一个单调递增的 epoch。由于旧的 Producer 的 epoch 比新 Producer 的 epoch 小,Kafka 可以很容易识别出该 Producer 是老的 Producer 并拒绝其请求。
有了Transaction ID
和epoch
后,Kafka 可保证:
- 跨 Session 的数据幂等发送。当具有相同Transaction ID的新的 Producer 实例被创建且工作时,旧的且拥有相同Transaction ID的 Producer 将不再工作。
- 跨 Session 的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么 Commit 要么 Abort,使得新实例从一个正常状态开始工作。
事务原子性的保证
事务原子性是指 Producer 将多条消息作为一个事务批量发送,要么全部成功要么全部失败。 引入了一个服务器端的模块,名为Transaction Coordinator,用于管理 Producer 发送的消息的事务性。
该Transaction Coordinator维护Transaction Log,该 log 存于一个内部的 Topic 内。由于 Topic 数据具有持久性,因此事务的状态也具有持久性。
Producer 并不直接读写Transaction Log,它与Transaction Coordinator通信,然后由Transaction Coordinator将该事务的状态插入相应的Transaction Log。
Transaction Log的设计与Offset Log用于保存 Consumer 的 Offset 类似。
事务中 Offset 的提交保证
在Kafka Stream 应用中同时包含 Consumer 和 Producer(即Consumer-Transform-Producer),前者负责从 Kafka 中获取消息,后者负责将处理完的数据写回 Kafka 的其它 Topic 中。
为了实现该场景下的事务的原子性,Kafka 需要保证对 Consumer Offset 的 Commit 与 Producer 对发送消息的 Commit 包含在同一个事务中。否则,如果在二者 Commit 中间发生异常,根据二者 Commit 的顺序可能会造成数据丢失和数据重复:
- 如果先 Commit Producer 发送数据的事务再 Commit Consumer 的 Offset,即At Least Once语义,可能造成数据重复。
- 如果先 Commit Consumer 的 Offset,再 Commit Producer 数据发送事务,即At Most Once语义,可能造成数据丢失。
用于事务特性的控制型消息
为了区分写入 Partition 的消息被 Commit 还是 Abort,Kafka 引入了一种特殊类型的消息,即Control Message。该类消息的 Value 内不包含任何应用相关的数据,并且不会暴露给应用程序。它只用于 Broker 与 Client 间的内部通信。
对于 Producer 端事务,Kafka 以 Control Message 的形式引入一系列的Transaction Marker。Consumer 即可通过该标记判定对应的消息被 Commit 了还是 Abort 了,然后结合该 Consumer 配置的隔离级别决定是否应该将该消息返回给应用程序。
注:Kafka事务的回滚,并不是删除已写入的数据,而是将写入数据的事务标记为 Rollback/Abort 从而在读数据时过滤该数据。
事务流程
事务原理流程图
上图中的 Transaction Coordinator 运行在 Kafka 服务端,下面简称 TC 服务。
__transaction_state 是 TC 服务持久化事务信息的 topic 名称,下面简称事务 topic。
Producer 向 TC 服务发送的 commit 消息,下面简称事务提交消息。
TC 服务向分区发送的消息,下面简称事务结果消息。
寻找 TC 服务地址
Producer 会首先从 Kafka 集群中选择任意一台机器,然后向其发送请求,获取 TC 服务的地址。Kafka 有个特殊的事务 topic,名称为__transaction_state ,负责持久化事务消息。这个 topic 有多个分区,默认有50个,每个分区负责一部分事务。事务划分是根据 transaction id, 计算出该事务属于哪个分区。这个分区的 leader 所在的机器,负责这个事务的TC 服务地址。
事务初始化
Producer 在使用事务功能,必须先自定义一个唯一的 transaction id。有了 transaction id,即使客户端挂掉了,它重启后也能继续处理未完成的事务。
Kafka 实现事务需要依靠幂等性,而幂等性需要指定 producer id 。所以Producer在启动事务之前,需要向 TC 服务申请 producer id。TC 服务在分配 producer id 后,会将它持久化到事务 topic。
发送消息
Producer 在接收到 producer id 后,就可以正常的发送消息了。不过发送消息之前,需要先将这些消息的分区地址,上传到 TC 服务。TC 服务会将这些分区地址持久化到事务 topic。然后 Producer 才会真正的发送消息,这些消息与普通消息不同,它们会有一个字段,表示自身是事务消息。
这里需要注意下一种特殊的请求,提交消费位置请求,用于原子性的从某个 topic 读取消息,并且发送消息到另外一个 topic。我们知道一般是消费者使用消费组订阅 topic,才会发送提交消费位置的请求,而这里是由 Producer 发送的。Producer 首先会发送一条请求,里面会包含这个消费组对应的分区(每个消费组的消费位置都保存在 __consumer_offset topic 的一个分区里),TC 服务会将分区持久化之后,发送响应。Producer 收到响应后,就会直接发送消费位置请求给 GroupCoordinator。
发送提交请求
Producer 发送完消息后,如果认为该事务可以提交了,就会发送提交请求到 TC 服务。Producer 的工作至此就完成了,接下来它只需要等待响应。这里需要强调下,Producer 会在发送事务提交请求之前,会等待之前所有的请求都已经发送并且响应成功。
提交请求持久化
TC 服务收到事务提交请求后,会先将提交信息先持久化到事务 topic 。持久化成功后,服务端就立即发送成功响应给 Producer。然后找到该事务涉及到的所有分区,为每 个分区生成提交请求,存到队列里等待发送。
读者可能有所疑问,在一般的二阶段提交中,协调者需要收到所有参与者的响应后,才能判断此事务是否成功,最后才将结果返回给客户。那如果 TC 服务在发送响应给 Producer 后,还没来及向分区发送请求就挂掉了,那么 Kafka 是如何保证事务完成。因为每次事务的信息都会持久化,所以 TC 服务挂掉重新启动后,会先从 事务 topic 加载事务信息,如果发现只有事务提交信息,却没有后来的事务完成信息,说明存在事务结果信息没有提交到分区。
发送事务结果信息给分区
后台线程会不停的从队列里,拉取请求并且发送到分区。当一个分区收到事务结果消息后,会将结果保存到分区里,并且返回成功响应到 TC服务。当 TC 服务收到所有分区的成功响应后,会持久化一条事务完成的消息到事务 topic。至此,一个完整的事务流程就完成了。
事务完整流程图
找到Transaction Coordinator
由于Transaction Coordinator是分配 PID 和管理事务的核心,因此 Producer 要做的第一件事情就是通过向任意一个 Broker 发送FindCoordinator请求找到Transaction Coordinator的位置。
注意:只有应用程序为 Producer 配置了Transaction ID时才可使用事务特性,也才需要这一步。另外,由于事务性要求 Producer 开启幂等特性,因此通过将transactional.id设置为非空从而开启事务特性的同时也需要通过将enable.idempotence设置为 true 来开启幂等特性。
获取 PID
找到Transaction Coordinator后,具有幂等特性的 Producer 必须发起InitPidRequest请求以获取 PID。
注意:只要开启了幂等特性即必须执行该操作,而无须考虑该 Producer 是否开启了事务特性。
如果事务特性被开启
InitPidRequest会发送给Transaction Coordinator。如果Transaction Coordinator是第一次收到包含有该Transaction ID的 InitPidRequest 请求,它将会把该<TransactionID, PID>存入Transaction Log,如上图中步骤 2.1 所示。这样可保证该对应关系被持久化,从而保证即使Transaction Coordinator宕机该对应关系也不会丢失。
除了返回 PID 外,InitPidRequest还会执行如下任务:
增加该 PID 对应的 epoch。具有相同 PID 但 epoch 小于该 epoch 的其它 Producer(如果有)新开启的事务将被拒绝。
恢复(Commit 或 Abort)之前的 Producer 未完成的事务(如果有)。
注意:InitPidRequest的处理过程是同步阻塞的。一旦该调用正确返回,Producer 即可开始新的事务。
另外,如果事务特性未开启,InitPidRequest可发送至任意 Broker,并且会得到一个全新的唯一的 PID。该 Producer 将只能使用幂等特性以及单一 Session 内的事务特性,而不能使用跨 Session 的事务特性。
开启事务
Kafka 从 0.11.0.0 版本开始,提供beginTransaction()方法用于开启一个事务。调用该方法后,Producer 本地会记录已经开启了事务,但Transaction Coordinator只有在 Producer 发送第一条消息后才认为事务已经开启。
Consume-Transform-Produce模式
这一阶段,包含了整个事务的数据处理过程,并且包含了多种请求。
- AddPartitionsToTxnRequest
一个 Producer 可能会给多个<Topic, Partition>发送数据,给一个新的<Topic, Partition>发送数据前,它需要先向Transaction Coordinator发送AddPartitionsToTxnRequest。
Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为BEGIN,如上图中步骤 4.1 所示。有了该信息后,我们才可以在后续步骤中为每个Topic, Partition>设置 COMMIT 或者 ABORT 标记(如上图中步骤 5.2 所示)。
另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。- ProduceRequest
Producer 通过一个或多个ProduceRequest发送一系列消息。除了应用数据外,该请求还包含了 PID,epoch,和Sequence Number。该过程如上图中步骤 4.2 所示。- AddOffsetsToTxnRequest
为了提供事务性,Producer 新增了sendOffsetsToTransaction方法,该方法将多组消息的发送和消费放入同一批处理内。
该方法先判断在当前事务中该方法是否已经被调用并传入了相同的 Group ID。若是,直接跳到下一步;若不是,则向Transaction Coordinator发送AddOffsetsToTxnRequests请求,Transaction Coordinator将对应的所有<Topic, Partition>存于Transaction Log中,并将其状态记为BEGIN,如上图中步骤 4.3 所示。该方法会阻塞直到收到响应。- TxnOffsetCommitRequest
作为sendOffsetsToTransaction方法的一部分,在处理完AddOffsetsToTxnRequest后,Producer 也会发送TxnOffsetCommit请求给Consumer Coordinator从而将本事务包含的与读操作相关的各<Topic, Partition>的 Offset 持久化到内部的__consumer_offsets中,如上图步骤 4.4 所示。
在此过程中,Consumer Coordinator会通过 PID 和对应的 epoch 来验证是否应该允许该 Producer 的该请求。
这里需要注意:
(1)写入__consumer_offsets的 Offset 信息在当前事务 Commit 前对外是不可见的。也即在当前事务被 Commit 前,可认为该 Offset 尚未 Commit,也即对应的消息尚未被完成处理。
(2)Consumer Coordinator并不会立即更新缓存中相应<Topic, Partition>的 Offset,因为此时这些更新操作尚未被 COMMIT 或 ABORT。
Commit 或 Abort 事务
一旦上述数据写入操作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。
- EndTxnRequest
commitTransaction方法使得 Producer 写入的数据对下游 Consumer 可见。abortTransaction方法通过Transaction Marker将 Producer 写入的数据标记为Aborted状态。下游的 Consumer 如果将isolation.level设置为READ_COMMITTED,则它读到被 Abort 的消息后直接将其丢弃而不会返回给客户程序,也即被 Abort 的消息对应用程序不可见。
无论是 Commit 还是 Abort,Producer 都会发送EndTxnRequest请求给Transaction Coordinator,并通过标志位标识是应该 Commit 还是 Abort。
收到该请求后,Transaction Coordinator会进行如下操作
(1)将PREPARE_COMMIT或PREPARE_ABORT消息写入Transaction Log,如上图中步骤 5.1 所示
(2)通过WriteTxnMarker请求以Transaction Marker的形式将COMMIT或ABORT信息写入用户数据日志以及Offset Log中,如上图中步骤 5.2 所示
(3)最后将COMPLETE_COMMIT或COMPLETE_ABORT信息写入Transaction Log中,如上图中步骤 5.3 所示
补充说明:对于commitTransaction方法,它会在发送EndTxnRequest之前先调用 flush 方法以确保所有发送出去的数据都得到相应的 ACK。对于abortTransaction方法,在发送EndTxnRequest之前直接将当前 Buffer 中的事务性消息(如果有)全部丢弃,但必须等待所有被发送但尚未收到 ACK 的消息发送完成。
上述第二步是实现将一组读操作与写操作作为一个事务处理的关键。因为 Producer 写入的数据 Topic 以及记录 Comsumer Offset 的 Topic 会被写入相同的Transactin Marker,所以这一组读操作与写操作要么全部 COMMIT 要么全部 ABORT。
- WriteTxnMarkerRequest
上面提到的WriteTxnMarkerRequest由Transaction Coordinator发送给当前事务涉及到的每个<Topic, Partition>的 Leader。收到该请求后,对应的 Leader 会将对应的COMMIT(PID)或者ABORT(PID)控制信息写入日志,如上图中步骤 5.2 所示。
该控制消息向 Broker 以及 Consumer 表明对应 PID 的消息被 Commit 了还是被 Abort 了。
这里要注意,如果事务也涉及到__consumer_offsets,即该事务中有消费数据的操作且将该消费的 Offset 存于__consumer_offsets中,Transaction Coordinator也需要向该内部 Topic 的各 Partition 的 Leader 发送WriteTxnMarkerRequest从而写入COMMIT(PID)或COMMIT(PID)控制信息。
写入最终的COMPLETE_COMMIT或COMPLETE_ABORT消息
写完所有的Transaction Marker后,Transaction Coordinator会将最终的COMPLETE_COMMIT或COMPLETE_ABORT消息写入Transaction Log中以标明该事务结束,如上图中步骤 5.3 所示。
此时,Transaction Log中所有关于该事务的消息全部可以移除。当然,由于 Kafka 内数据是 Append Only 的,不可直接更新和删除,这里说的移除只是将其标记为 null 从而在 Log Compact 时不再保留。
另外,COMPLETE_COMMIT或COMPLETE_ABORT的写入并不需要得到所有 Rreplica 的 ACK,因为如果该消息丢失,可以根据事务协议重发。
补充说明,如果参与该事务的某些<Topic, Partition>在被写入Transaction Marker前不可用,它对READ_COMMITTED的 Consumer 不可见,但不影响其它可用<Topic, Partition>的 COMMIT 或 ABORT。在该<Topic, Partition>恢复可用后,Transaction Coordinator会重新根据PREPARE_COMMIT或PREPARE_ABORT向该<Topic, Partition>发送Transaction Marker。
总结
- PID与Sequence Number的引入实现了写操作的幂等性
- 写操作的幂等性结合At Least Once语义实现了单一 Session 内的Exactly Once语义
- Transaction Marker与PID提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性
- Offset 的更新标记了消息是否被读取,从而将对读操作的事务处理转换成了对写(Offset)操作的事务处理
- Kafka 事务的本质是,将一组写操作(如果有)对应的消息与一组读操作(如果有)对应的 Offset 的更新进行同样的标记(即Transaction Marker)来实现事务中-- 涉及的所有读写操作同时对外可见或同时对外不可见
- Kafka 只提供对 Kafka 本身的读写操作的事务性,不提供包含外部系统的事务性
与分布式事务机制对比
两阶段提交原理
二阶段提交的算法思路可以概括为:协调者询问参与者是否准备好了提交,并根据所有参与者的反馈情况决定向所有参与者发送commit或者rollback指令(协调者向所有参与者发送相同的指令)。
所谓的两个阶段是指
- 准备阶段 又称投票阶段。在这一阶段,协调者询问所有参与者是否准备好提交,参与者如果已经准备好提交则回复Prepared,否则回复Non-Prepared。
- 提交阶段 又称执行阶段。协调者如果在上一阶段收到所有参与者回复的Prepared,则在此阶段向所有参与者发送commit指令,所有参与者立即执行commit操作;否则协调者向所有参与者发送rollback指令,参与者立即执行rollback操作。
两阶段提交中,协调者和参与方的交互过程如下图所示。
Kafka两阶段提交对比
Kafka的事务机制与上述所介绍的两阶段提交机制看似相似,都分PREPARE阶段和最终COMMIT阶段,但又有很大不同。
- Kafka事务机制中,PREPARE时即要指明是PREPARE_COMMIT还是PREPARE_ABORT,并且 只须在Transaction Log中标记即可 ,无须其它组件参与。而两阶段提交的PREPARE需要发送给所有的分布式事务参与方,并且事务参与方需要尽可能准备好,并根据准备情况返回Prepared或Non-Prepared状态给事务管理器。
- Kafka事务中,一但发起PREPARE_COMMIT或PREPARE_ABORT,则确定该事务最终的结果应该是被COMMIT或ABORT 。而分布式事务中,PREPARE后由各事务参与方返回状态,只有所有参与方均返回Prepared状态才会真正执行COMMIT,否则执行ROLLBACK
- Kafka事务机制中,某几个Partition在COMMIT或ABORT过程中变为不可用,只影响该Partition不影响其它Partition。两阶段提交中,若唯一收到COMMIT命令参与者Crash,其它事务参与方无法判断事务状态从而使得整个事务阻塞
- Kafka事务机制引入事务超时机制,有效避免了挂起的事务影响其它事务的问题
- Kafka事务机制中存在多个Transaction Coordinator实例,而分布式事务中只有一个事务管理器
事务过期机制
事务超期通过transaction.timeout.ms
配置
终止过期事务
当 Producer 失败时,Transaction Coordinator必须能够主动的让某些进行中的事务过期。否则没有 Producer 的参与,Transaction Coordinator无法判断这些事务应该如何处理,这会造成:
- 如果这种进行中事务太多,会造成Transaction Coordinator需要维护大量的事务状态,大量占用内存
- Transaction Log内也会存在大量数据,造成新的Transaction Coordinator启动缓慢
- READ_COMMITTED的 Consumer 需要缓存大量的消息,造成不必要的内存浪费甚至是 OOM
- 如果多个Transaction ID不同的 Producer 交叉写同一个 Partition,当一个 Producer 的事务状态不更新时,READ_COMMITTED的 Consumer 为了保证顺序消费而被阻塞
为了避免上述问题,Transaction Coordinator会周期性遍历内存中的事务状态 Map,并执行如下操作
- 如果状态是BEGIN并且其最后更新时间与当前时间差大于
transaction.remove.expired.transaction.cleanup.interval.ms
(默认值为 1 小时),则主动将其终止:1)未避免原 Producer 临时恢复与当前终止流程冲突,增加该 Producer 对应的 PID 的 epoch,并确保将该更新的信息写入Transaction Log;2)以更新后的 epoch 回滚事务,从而使得该事务相关的所有 Broker 都更新其缓存的该 PID 的 epoch 从而拒绝旧 Producer 的写操作 - 如果状态是PREPARE_COMMIT,完成后续的 COMMIT 流程————向各<Topic, Partition>写入Transaction Marker,在Transaction Log内写入COMPLETE_COMMIT
- 如果状态是PREPARE_ABORT,完成后续 ABORT 流程
终止Transaction ID
某Transaction ID的 Producer 可能很长时间不再发送数据,Transaction Coordinator没必要再保存该Transaction ID与PID等的映射,否则可能会造成大量的资源浪费。因此需要有一个机制探测不再活跃的Transaction ID并将其信息删除。
Transaction Coordinator会周期性遍历内存中的Transaction ID与PID映射,如果某Transaction ID没有对应的正在进行中的事务并且它对应的最后一个事务的结束时间与当前时间差大于transactional.id.expiration.ms
(默认值是 7 天),则将其从内存中删除并在Transaction Log中将其对应的日志的值设置为 null 从而使得 Log Compact 可将其记录删除。
高可用保证
-
Transaction Coordinator 服务
通过上述对 Kafka 事务的简述,可以看到 TC 服务起着很重要的作用。事实上 Kafka 集群中运行着多个 TC 服务,每个TC 服务负责事务 topic 的一个分区读写,也就是这个分区的 leader。Producer 根据 transaction id 的哈希值,来决定该事务属于事务 topic 的哪个分区,最后找到这个分区的 leader 位置。
既然 TC 服务负责事务 topic 的一个分区 leader,我们知道当一个分区的 leader挂掉之后,Kafka 会保证这个的分区的 follower 会转换为 leader 角色,会继续对外提供服务。这么 TC 服务的高可用就达到了。 -
消息持久化
TC 服务为了支持重启后,也能恢复到之前的状态,所以它将每次重要的消息都会持久化起来,并且保存到事务 topic 的时候,指定 leader 分区和 follower 分区必须都存储成功。这样每次 TC 服务启动的时候,都会从事务 topic 读取之前的状态,加载到缓存里。比如当TC 服务在响应客户端的事务提交请求后,还没来得及向各分区发送事务结果请求,就已经挂掉了。之后 TC 服务重启,会去事务 topic 加载数据,它发现事务的最后状态为 PrepareCommit,并且事务数据还包括了分区列表,这样 TC 服务会继续未完成的事务,会向列表中的各个分区发送事务结果请求。 -
超时处理
如果 Producer 发起了一个事务,但是由于网络问题,TC 服务迟迟没有接下来的请求,那么该事务就会被认为超时。TC 服务会有个线程,会定期检查处理 Ongoing 状态的事务,如果该事务的开始时间和当前时间的差,超过了指定的超时时间(在发送申请producer id请求时可以指定),那么 TC 服务就会回滚该事务,更新和持久化事务的状态,并且发送事务回滚结果给分区。
事务客户端API
Producer提供了五种事务方法,代码定义在org.apache.kafka.clients.producer.Producer<K,V>接口中,具体定义接口如下:
// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();
// 开启事务
void beginTransaction() throws ProducerFencedException;
// 提交消费位置, offsets表示每个分区的消费位置, consumerGroupId表示消费组的名称
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
// 提交事务
void commitTransaction() throws ProducerFencedException;
// 放弃回滚事务
void abortTransaction() throws ProducerFencedException;
事务参数
Broker configs:
- ransactional.id.timeout.ms :在ms中,事务协调器在生产者TransactionalId提前过期之前等待的最长时间,并且没有从该生产者TransactionalId接收到任何事务状态更新。默认是604800000(7天)。这允许每周一次的生产者作业维护它们的id
- max.transaction.timeout.ms :事务允许的最大超时。如果客户端请求的事务时间超过此时间,broke将在InitPidRequest中返回InvalidTransactionTimeout错误。这可以防止客户机超时过大,从而导致用户无法从事务中包含的主题读取内容。默认值为900000(15分钟)。这是消息事务需要发送的时间的保守上限。
- transaction.state.log.replication.factor :事务状态topic的副本数量。默认值:3
- transaction.state.log.num.partitions :事务状态主题的分区数。默认值:50
- transaction.state.log.min.isr :事务状态主题的每个分区ISR最小数量。默认值:2
- transaction.state.log.segment.bytes :事务状态主题的segment大小。默认值:104857600字节
Producer configs
- enable.idempotence :开启幂等
- transaction.timeout.ms :事务超时时间,事务协调器在主动中止正在进行的事务之前等待生产者更新事务状态的最长时间。这个配置值将与InitPidRequest一起发送到事务协调器。如果该值大于max.transaction.timeout。在broke中设置ms时,请求将失败,并出现InvalidTransactionTimeout错误。默认是60000。这使得交易不会阻塞下游消费超过一分钟,这在实时应用程序中通常是允许的。
- transactional.id:用于事务性交付的TransactionalId。这支持跨多个生产者会话的可靠性语义,因为它允许客户端确保使用相同TransactionalId的事务在启动任何新事务之前已经完成。如果没有提供TransactionalId,则生产者仅限于幂等交付。
Consumer configs
- solation.level:设置隔离级别
read_uncommitted:以偏移顺序使用已提交和未提交的消息。
read_committed:仅以偏移量顺序使用非事务性消息或已提交事务性消息。为了维护偏移排序,这个设置意味着我们必须在使用者中缓冲消息,直到看到给定事务中的所有消息。
代码示例
只有Producer模式
创建一个事务,在这个事务操作中,只有生成消息操作。创建生成者,需要:配置transactional.id
属性和配置enable.idempotence
属性
代码如下:
/**
* 需要:
* 1、设置transactional.id
* 2、设置enable.idempotence
* @return
*/
private Producer buildProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "first-transactional"); // 设置事务id
props.put("enable.idempotence",true); // 设置幂等性
props.put("acks", "all");
props.put("retries", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
return producer;
}
/**
* 在一个事务只有生产消息操作
*/
public void onlyProduceInTransaction() {
Producer producer = buildProducer();
// 1.初始化事务
producer.initTransactions();
// 2.开启事务
producer.beginTransaction();
try {
// 3.1 do业务逻辑
// 3.2 发送消息
producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));
producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));
// 4.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 5.放弃事务
producer.abortTransaction();
}
}
消费消息和生产消息并存模式
在一个事务中,既有生产消息操作又有消费消息操作,即常说的Consume-tansform-produce模式。
创建消费者代码,需要:
- 将配置中的自动提交属性(auto.commit)进行关闭
- 而且在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
- 设置isolation.level
/**
* 需要:
* 1、关闭自动提交 enable.auto.commit
* 2、isolation.level为read_committed
*/
public Consumer buildConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group0323");
props.put("isolation.level","read_committed"); // 设置隔离级别
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
return consumer;
}
/**
* 在一个事务内,即有生产消息又有消费消息
*/
public void consumeTransferProduce() {
// 1.构建上产者
Producer producer = buildProducer();
// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
producer.initTransactions();
// 3.构建消费者和订阅主题
Consumer consumer = buildConsumer();
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 4.开启事务
producer.beginTransaction();
// 5.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);
try {
// 5.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
for (ConsumerRecord<String, String> record : records) {
// 5.2.1 读取消息,并处理消息。
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
// 5.2.2 记录提交的偏移量
commits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
// 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
producer.send(new ProducerRecord<String, String>("test", "data2"));
}
// 7.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");
// 8.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 7.放弃事务
producer.abortTransaction();
}
}
}
只有Consumer模式
创建一个事务,在这个事务操作中,只有生成消息操作,如下代码。这种操作其实没有什么意义,跟使用手动提交效果一样,无法保证消费消息操作和提交偏移量操作在一个事务。
/**
* 在一个事务只有消息操作
*/
public void onlyConsumeInTransaction() {
Producer producer = buildProducer();
// 1.初始化事务
producer.initTransactions();
// 2.开启事务
producer.beginTransaction();
// 3.kafka读消息的操作集合
Consumer consumer = buildConsumer();
while (true) {
// 3.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);
try {
// 3.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
for (ConsumerRecord<String, String> record : records) {
// 3.2.1 处理消息 print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
// 3.2.2 记录提交偏移量
commits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
}
// 4.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");
// 5.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 6.放弃事务
producer.abortTransaction();
}
}
}
幂等性和事务性的关系
事务属性实现前提是幂等性,即在配置事务属性transaction id时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。
- enable.idempotence = false,transactional.id不设置:没有事务属性和幂等性的kafka
- enable.idempotence = false,transactional.id设置:无法获取到PID,此时会报错
- enable.idempotence = true,transactional.id不设置:只支持幂等性
- enable.idempotence = true,transactional.id设置:支持事务属性和幂等性
参考
Kafka生产者事务和幂等:http://www.heartthinkdo.com/?p=2040
Kafka 事务性之幂等性实现:http://matt33.com/2018/10/24/kafka-idempotent/
Kafka幂等性原理及实现剖析:https://blog.51cto.com/14230003/2458500
Kafka 事务实现原理:https://zhmin.github.io/2019/05/20/kafka-transaction/
以上为Kafka网上资料及个人理解整理的笔记,欢迎指正!
更多推荐
所有评论(0)