写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢! 

在之前的两篇文章,对于生产者而言,我们做到了使用KafkaTemplate发送消息、将消息发送到指定partition、我们还使用了带回调的addCallback方法判断消息是发送成功还是失败,并做下一步处理,今天再完善一下关于消费者的使用。

想个场景,咱们在玩游戏的时候,是不是经常因为队友太菜,对面太强而破口大骂呢,当你打字骂人的时候,有很多字是发不出去的,都会被替换为**类似的字符,对于kafka的生产者其实也有类似的功能,在消息发送之前对消息进行定制化处理,即:生产者拦截器ProducerInterceptor。生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤掉不符合要的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

感觉学一个东西困难的不是怎么学,而是不知道它能干嘛,知道了它能干嘛结合官方文档和网上大佬们的博客,学起来还是挺容易的,就像这篇生产者拦截器的使用,其实很简单,只要实现ProducerInterceptor接口,并重写它的方法就可以,关键是我知不知道有生产者拦截器这个事,废话不多说,直接贴一下生产者拦截器的测试代码:

@Component
public class CustomProducerInterceptor implements ProducerInterceptor {

    /**
     * 将消息序列化和计算分区之前调用,对消息进行定制化操作
     */
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        String newValue = "prefix:" + record.value();
        ProducerRecord producerRecord = new ProducerRecord(record.topic(),newValue);
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

如上,从ProducerInterceptor接口的实现方法名就大体猜到onSend方法就是对消息的处理方法,我在方法中只是在消息前加了一个前缀简单测试下,具体的处理根据实际需求来。

要使自定义的拦截器生效,需要配置一下,在KafkaProducerConfig类中producerConfigs方法追加如下代码:

// 配置生产者拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.example.springbootkafka.interceptor.CustomProducerInterceptor");

启动项目,访问http://localhost:8080/send3?message=test3 结果如下:

控制台打印的消息加上了prefix:前缀,说明配置的拦截器生效了。

一般要是处理逻辑复杂的话,可以将onSend中的处理逻辑拿出来,我单独建了一个工具类,用来处理消息,代码如下:

@Component
public class SendMessageInterceptorUtil {

    public ProducerRecord execute(ProducerRecord record) {
        System.out.println("====== 处理消息 ======");
        String newValue = "prefix:" + record.value();
        ProducerRecord producerRecord = new ProducerRecord(record.topic(),newValue);
        return producerRecord;
    }
}

然后在CustomProducerInterceptor住注入SendMessageInterceptorUtil,在onSend中调用,如下:

    @Autowired
    private SendMessageInterceptorUtil sendMessageInterceptorUtil;    

    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        return sendMessageInterceptorUtil.execute(record);
    }

启动项目,访问http://localhost:8080/send3?message=test3 结果如下:

如上,不但拦截器没生效,而且注入的SendMessageInterceptorUtil为空,还报空指针的错误了,网上百度了一会也没搜到有用的信息,看着拦截器中的configure方法名像是加载配置的,看看它是个啥,

从结果看这个方法应该是加载生产者配置的,可以试一下将刚才定义的处理类加到configs中再试下,如下:

// 配置拦截器消息处理类
props.put("interceptorUtil","com.example.springbootkafka.util.SendMessageInterceptorUtil");
@Override
public void configure(Map<String, ?> configs) {
    System.out.println("======" + configs + "======");
    sendMessageInterceptorUtil = (SendMessageInterceptorUtil) configs.get("interceptorUtil");
}

启动项目,访问http://localhost:8080/send3?message=test3 结果如下:

从错误类转换异常可以知道,只要直接设置SendMessageInterceptorUtil对象就可以了,咱之前设置的类路径它解析不了,当做字符串处理了,改下:

// 配置拦截器消息处理类
SendMessageInterceptorUtil sendMessageInterceptorUtil = new SendMessageInterceptorUtil();
props.put("interceptorUtil",sendMessageInterceptorUtil);

启动项目,访问http://localhost:8080/send3?message=test3 结果如下:

可以看到消息发送成功了,同时也说明kafka的拦截器由kafka进行管理和spring无关。所以无法使用Spring依赖注入功能。但是我们可以将需要的bean添加到kafka配置,使用拦截器提供的config()方法来手动来获取这些bean依赖。

对于拦截器中的close()方法,负责关闭拦截器时执行资源的清理工作;onAcknowledgement方法:消息应答之前或者消息发送失败时调用,先于用户设定的Callback之前执行,这个方法运行在Producer的I/O线程中,实现逻辑越简单越好,否则影响消息发送速度。

看网上有好多统计消息发送成功失败条数的文章,感兴趣的小伙伴可以参考这篇试一下https://www.cnblogs.com/huxi2b/p/7072447.html  这篇还是比较全的。

顺便提下,除了生产者有拦截器,其实消费者也有消费者拦截器,类似生产者拦截器,实现消费者拦截器只要实现ConsumerInterceptor接口,重写它的方法即可,代码就不贴了。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐