一,kafka消息传递的三种语义

  • At most once:最多一次,消息可能会丢失,但不会重复。
  • At least once:最少一次,消息不会丢失,可能会重复。
  • Exactly once:只且一次,消息不丢失不重复,只且消费一次。

我们使用kafka肯定是希望Exactly once,要保证Exactly once效率上面需要做出一定的牺牲,那么是要Exactly once还是效率就要结合具体业务来分析了。但是整体的消息投递语义需要 Producer 端和 Consumer 端两者来保证。

二,Producer 消息生产端

当 producer 向 broker 发送一条消息,这时网络出错了,producer 无法得知 broker 是否接受到了这条消息。网络出错可能是发生在消息传递的过程中,也可能发生在 broker 已经接受到了消息,并返回 ack 给 producer 的过程中。

这时,producer 会有两种选择:

  • 不管了(At most once),消息可能会丢失
  • 再发一次(At least once),消息可能会重复

那么如何保证Exactly once,就要结合Ack和send返回结果来保证了。

1,ack确认机制:
  • Ack=0,相当于异步发送,意味着producer不等待broker同步完成,消息发送完毕继续发送下一批信息。提供了最低延迟,但持久性最弱,当服务器发生故障时很可能发生数据丢失。如果leader死亡,producer继续发送消息,broker接收不到数据就会造成数据丢失。
  • Ack=1,producer要等待leader成功收到消息并确认,才发送下一条message。提供较低的延迟性以及较好的持久性。但是如果partition下的leader死亡,而follower尚未复制数据,数据就会丢失。
  • Ack=-1,leader收到所有消息,且follower同步完数据,才发送下一条数据。延迟性最差,持久性最好(即可靠性最好)。
2,send发送函数:

Kafka为生产者生产消息提供了一个 send(producerRecord) 方法,另有一个重载的方法send(producerRecord, callback)

1,通过send方法返回的future,同步阻塞拿到结果。

Future<RecordMetadata> future = producer.send(record)

上面的示例代码也可以看到,send返回的是一个 Future,也就是说其实你是可以 Future.get()获取返回值,但是future是同步阻塞的,这种效率比较低。

2,通过send方法提供的异步回到拿到结果。

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("topic", key, value);
producer.send(myRecord,
         new Callback() {
             public void onCompletion(RecordMetadata metadata, Exception e) {
            
             }
         });

send(msg, callback)该方法可以将一条消息发送出去,并且可以从callback回调中得到该条消息的发送结果, 并且callback是异步回调,所以在兼具性能的情况下,也对消息具有比较好的掌控。

3,幂等producer

所谓幂等producer指producer.send的逻辑是幂等的,即发送相同的Kafka消息,broker端不会重复写入消息。同一条消息Kafka保证底层日志中只会持久化一次,既不会丢失也不会重复。幂等性可以极大地减轻下游consumer系统实现消息去重的工作负担,因此是非常实用的功能。值得注意的是,幂等producer提供的语义保证是有条件的:

  • 单分区幂等性:幂等producer无法实现多分区上的幂等性。如前所述,若要实现多分区上的原子性,需要引入事务。
  • 单会话幂等性:幂等producer无法跨会话实现幂等性。即使同一个producer宕机并重启也无法保证消息的EOS语义。

虽然有上面两个限制,幂等producer依然是一个非常实用的新功能。下面我们来讨论下它的设计原理。如果要实现幂等性, 通常都需要花费额外的空间来保存状态以执行消息去重。Kafka的幂等producer整体上也是这样的思想。

首先,producer对象引入了一个新的字段:Producer ID(下称PID),它唯一标识一个producer,当producer启动时Kafka会为每个producer分配一个PID(64位整数),因此PID的生成和分配对用户来说是完全透明的,用户无需考虑PID的事情,甚至都感受不到PID的存在。

其次,0.11 Kafka重构了消息格式,引入了序列号字段(sequence number,下称seq number)来标识某个PID producer发送的消息。和consumer端的offset类似,seq number从0开始计数并严格单调增加。同时在broker端会为每个PID(即每个producer)保存该producer发送过来的消息batch的某些元信息,比如PID信息、消息batch的起始seq number及结束seq number等。这样每当该PID发送新的消息batch时,Kafka broker就会对比这些信息,如果发生冲突(比如起始seq number和结束seq number与当前缓存的相同),那么broker就会拒绝这次写入请求。倘若没有冲突,那么broker端就会更新这部分缓存然后再开始写入消息。

这就是Kafka实现幂等producer的设计思路:

  1. 为每个producer设置唯一的PID。
  2. 引入seq number以及broker端seq number缓存更新机制来去重。

要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。

4,producer事务

事务可以支持多分区的数据完整性,原子性。并且支持跨会话的exactly once处理语义,也就是说如果producer宕机重启,依旧能保证数据只处理一次。

开启事务也很简单,首先需要开启幂等性,即设置enable.idempotence为true。然后对producer发送代码做一些小小的修改。

//初始化事务
producer.initTransactions();
try {
    //开启一个事务
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    //提交
    producer.commitTransaction();
} catch (KafkaException e) {
    //出现异常的时候,终止事务
    producer.abortTransaction();
}

所以无论开启幂等还是事务的特性,都会对性能有一定影响,这是必然的。所以kafka默认也并没有开启这两个特性,而是交由开发者根据自身业务特点进行处理。

三,Consumer 消息消费端

1,at-most-once

最多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是:

  • enable.auto.commit设置为true。
  • auto.commit.interval.ms设置为一个较低的时间范围。

由于上面的配置,此时kafka会有一个独立的线程负责按照指定间隔提交offset。

消费者的offset已经提交,但是消息还在处理中(还没有处理完),这个时候程序挂了,导致数据没有被成 功处理,再重启的时候会从上次提交的offset处消费,导致上次没有被成功处理的消息就丢失了。

2,at-least-once

实现最少一次消费语义的消费者也很简单:

  • 设置enable.auto.commit为false。
  • 消息处理完之后手动调用consumer.commitSync()。

这种方式是在消费数据之后,手动调用函数consumer.commitSync()异步提交offset,有可能处理多次的场景是消费者的消息处理完并输出到结果库,但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息,所以至少会处理一次

3,exactly-once

这种语义可以保证数据只被消费处理一次。

  • 将enable.auto.commit设置为false。
  • 使用consumer.seek(topicPartition,offset)来指定offset。
  • 在处理消息的时候,要同时保存住每个消息的offset。

以原子事务的方式保存offset和处理的消息结果,这个时候相当于自己保存offset信息了,把offset和具体的数据绑定到一块,数据真正处理成功的时候才会保存offset信息,这样就可以保证数据仅被处理一次了。

4,exactly-once实现

从kafka的消费机制,我们可以得到是否能够精确的消费关键在消费进度信息的准确性,如果能够保证消费进度的准确性,也就保证了消费数据的准确性:

  • 数据有状态:可以根据数据信息进行确认数据是否重复消费,这时候可以使用手动提交的最少一次消费语义实现,即使消费的数据有重复,可以通过状态进行数据去重,以达到幂等的效果
  • 存储数据容器具备幂等性:在数据存入的容器具备天然的幂等(比如ElasticSearch的put操作具备幂等性,相同的数据多次执行Put操作和一次执行Put操作的结果是一致的),这样的场景也可以使用手动提交的最少一次消费语义实现,由存储数据端来进行数据去重。

但是当数据无状态,并且存储容器不具备幂等时,这种场景需要自行控制offset的准确性:

  1. 利用consumer.seek(topicPartition,offset)方法可以指定offset进行消费,在启动消费者时查询数据库中记录的offset信息,如果是第一次启动,那么数据库中将没有offset信息,需要进行消费的元数据插入,然后从offset=0开始消费。
  2. 关系型数据库具备事务的特性,当数据入库时,同时也将offset信息更新,借用关系型数据库事务的特性保证数据入库和修改offset记录这两个操作是在同一个事务中进行
  3. 使用ConsumerRebalanceListener来完成在分配分区时和Relalance时作出相应的处理逻辑

要弄清楚的是,我们在消费的时候,关闭了自动提交,我们也没有通过consumer.commitAsync()手动提交我们的位移信息,而是在每次启动一个新的consumer的时候,触发rebalance时,读取数据库中的位移信息,从该位移中开始读取partition的信息(初始化的时候为0),在没有出现异常的情况下,我们的consumer会不断从producer读取信息,这个位移是最新的那个消息位移,而且会同时把这个位移更新到数据库中,但是,当出现了rebalance时,那么consumer就会从数据库中读取开始的位移。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐