SpringBoot整合kafka实战之带回调的生产者
本文来说下SpringBoot整合kafka部分知识内容文章目录概述概述
·
本文来说下SpringBoot整合kafka部分知识内容
带回调的生产者
前面我们说了简单的生产和消费,本文说下带回调的生产者。kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法。回调函数只是保证消息可以从生产者到kafka服务器发送成功i,至于消费者从kafka消费消息是保证不了的。
方式一
生产者
@Slf4j
@RestController
@RequestMapping("/api/kafka")
@Api(tags = "kafka测试开发")
public class KafkaController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/callbackOne")
@ApiOperation(value = "带回调的生产者")
public void sendMessage2(@RequestParam("message") @ApiParam(value="消息",required = true) String callbackMessage) {
// 带回调的生产者
kafkaTemplate.send("test", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
log.info("发送消息成功:" + topic + "-" + partition + "-" + offset + "-" + callbackMessage);
}, failure -> {
log.info("发送消息失败:" + failure.getMessage());
});
}
}
消费者
@Component
@Slf4j
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"test"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
log.info("==============================================>");
StringBuffer sb = new StringBuffer();
// 主题
sb.append(record.topic() + "-");
// 分区
sb.append(record.partition() + "-");
// 需要消费的值
sb.append(record.value() + "-");
// 位移
sb.append(record.offset());
log.info("消费者进行消费:"+ sb);
}
}
测试结果
方式二
只是回调者的实现方式有点不同,消费者和方式一类似
@GetMapping("/callbackTwo")
@ApiOperation(value = "带回调的生产者方式二")
public void sendMessage3(@RequestParam("message") @ApiParam(value="消息",required = true) String callbackMessage) {
kafkaTemplate.send("test", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
RecordMetadata rm = result.getRecordMetadata();
log.info("发送消息成功:" + rm.topic() + "-" + rm.partition() + "-" + rm.offset() + "-" + callbackMessage);
}
@Override
public void onFailure(Throwable ex) {
log.info("发送消息失败:"+ex.getMessage());
}
});
}
程序测试
本文小结
本文介绍了SpringBoot整合kafka实战之带回调的生产者
更多推荐
已为社区贡献7条内容
所有评论(0)