kafka生产者消费者同步与异步
一 生产者发送的同步与异步生产者发送消息依靠send方法,主要要同步和异步两种:异步发送producer.send(record,callback)callback就是对发送消息后的回调。该方法输入参数是metaData和exception:当消息异步发送成功则exception为空;反之若发送失败则metadata为空exception非空。同步发送producer.send(record).g
一 生产者发送的同步与异步
生产者发送消息依靠send方法,主要要同步和异步两种:
异步发送
producer.send(record,callback)
callback就是对发送消息后的回调。该方法输入参数是metaData和exception:当消息异步发送成功则exception为空;反之若发送失败则metadata为空exception非空。
同步发送
producer.send(record).get()
通过调用feature.get无限等待。若没有失败则返回对应的metaData包括消息发送的topic part offset。
二 消费者提交消费offset的同步与异步
consumer通过向所属的协调者发送请求来实现位移提交。一般有手动和自动提交两种方式,手动提交通过enable.auto.commit=false设置。提交会存在三种消费语义:
at most once 先提交再消费 ,最多消费一次,存在数据丢失可能,但不会重复消费;
at least once 先消费再提交 ,最少消费一次, 存在重复消费可能(kafka默认),但是无丢失;
exaclty once 通过事务等机制保证一定消费且只一次,现实难以实现。
手动提交主要包括同步commitSync和异步commitAsync两种方法实现。这里异步提交不是指consumer单独起一个线程进行位移提交,consumer会在用户主线程poll中不断轮旋这次异步提的结果。
提交时可以指定一个map参数,key是分区,value是下一条待消费消息位移
更多推荐
所有评论(0)