一 生产者发送的同步与异步

生产者发送消息依靠send方法,主要要同步和异步两种:

异步发送

producer.send(record,callback)

callback就是对发送消息后的回调。该方法输入参数是metaData和exception:当消息异步发送成功则exception为空;反之若发送失败则metadata为空exception非空。

同步发送

producer.send(record).get()

通过调用feature.get无限等待。若没有失败则返回对应的metaData包括消息发送的topic  part offset。

二 消费者提交消费offset的同步与异步

consumer通过向所属的协调者发送请求来实现位移提交。一般有手动和自动提交两种方式,手动提交通过enable.auto.commit=false设置。提交会存在三种消费语义:

at most once  先提交再消费 ,最多消费一次,存在数据丢失可能,但不会重复消费;

at least once 先消费再提交 ,最少消费一次, 存在重复消费可能(kafka默认),但是无丢失;

exaclty once 通过事务等机制保证一定消费且只一次,现实难以实现。

手动提交主要包括同步commitSync和异步commitAsync两种方法实现。这里异步提交不是指consumer单独起一个线程进行位移提交,consumer会在用户主线程poll中不断轮旋这次异步提的结果。

提交时可以指定一个map参数,key是分区,value是下一条待消费消息位移

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐