本文来说下SpringBoot整合kafka部分知识内容


带回调的生产者

前面我们说了简单的生产和消费,本文说下带回调的生产者。kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法。回调函数只是保证消息可以从生产者到kafka服务器发送成功i,至于消费者从kafka消费消息是保证不了的。


方式一

生产者

@Slf4j
@RestController
@RequestMapping("/api/kafka")
@Api(tags = "kafka测试开发")
public class KafkaController {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/callbackOne")
    @ApiOperation(value = "带回调的生产者")
    public void sendMessage2(@RequestParam("message") @ApiParam(value="消息",required = true) String callbackMessage) {

        // 带回调的生产者
        kafkaTemplate.send("test", callbackMessage).addCallback(success -> {

            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            log.info("发送消息成功:" + topic + "-" + partition + "-" + offset + "-" + callbackMessage);
        }, failure -> {
            log.info("发送消息失败:" + failure.getMessage());
        });
    }

}

消费者

@Component
@Slf4j
public class KafkaConsumer {

    // 消费监听
    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> record){

        // 消费的哪个topic、partition的消息,打印出消息内容
        log.info("==============================================>");
        StringBuffer sb = new StringBuffer();
        // 主题
        sb.append(record.topic() + "-");
        // 分区
        sb.append(record.partition() + "-");
        // 需要消费的值
        sb.append(record.value() + "-");
        // 位移
        sb.append(record.offset());

        log.info("消费者进行消费:"+ sb);
    }
}

测试结果

在这里插入图片描述
在这里插入图片描述


方式二

只是回调者的实现方式有点不同,消费者和方式一类似

 @GetMapping("/callbackTwo")
 @ApiOperation(value = "带回调的生产者方式二")
 public void sendMessage3(@RequestParam("message") @ApiParam(value="消息",required = true) String callbackMessage) {

        kafkaTemplate.send("test", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {

            @Override
            public void onSuccess(SendResult<String, Object> result) {

                RecordMetadata rm = result.getRecordMetadata();
                log.info("发送消息成功:" + rm.topic() + "-" + rm.partition() + "-" + rm.offset() + "-" + callbackMessage);
            }

            @Override
            public void onFailure(Throwable ex) {
                log.info("发送消息失败:"+ex.getMessage());
            }

        });
    }

程序测试

在这里插入图片描述
在这里插入图片描述


本文小结

本文介绍了SpringBoot整合kafka实战之带回调的生产者

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐