Producer 拦截器

拦截器(interceptor)是个相当新的功能,它是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。

拦截器作为一个非常小众的功能,Kafka 拦截器自 0.10 版本被引入后并未得到太多的实际应用。

Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景,对于监控我们其实是利用拦截器实现了埋点类似的功能

Spring 中的拦截器

如果你用过 Spring Interceptor 或是 Apache Flume,那么应该不会对拦截器这个概念感到陌生,其基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。

它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。下面这张图展示了 Spring MVC 拦截器的工作原理:

img

拦截器 1 和拦截器 2 分别在请求发送之前、发送之后以及完成之后三个地方插入了对应的处理逻辑。

而 Flume 中的拦截器也是同理,它们插入的逻辑可以是修改待发送的消息,也可以是创建新的消息,甚至是丢弃消息。这些功能都是以配置拦截器类的方式动态插入到应用程序中的,故可以快速地切换不同的拦截器而不影响主程序逻辑。

Kafka 拦截器借鉴了这样的设计思路。你可以在消息处理的前后多个时点动态植入不同的处理逻辑,比如在消息发送前或者在消息被消费后。

kafka 中的拦截器

Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。

值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑

对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。

同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor其定义的方法包括:

public interface ProducerInterceptor<K, V> extends Configurable {
    // 该方法会在消息发送之前被调用
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
  	// 该方法会在消息成功提交或发送失败之后被调用
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    // 关闭interceptor,主要用于执行一些资源清理工作
    public void close();
    void configure(Map<String, ?> configs);
}

onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用,我们知道异步发送有回调通知 callback, onAcknowledgement 的调用要早于 callback 的调用。

值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全。

还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。

定义拦截器

这里我们定义一个统计消息数量的拦截器,从而掌握某个业务线(Topic)的数据量

public class CountRecordProducerInterceptor implements ProducerInterceptor<String, String> {
    private static Jedis jedis;
    static {
        jedis = new Jedis("localhost");
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalMessageCount");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception==null){
            jedis.incr("totalSuccessMessageCount");
        }else {
            jedis.incr("totalFailedMessageCount");
        }
    }
  
    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}
使用拦截器

所用拦截器也是比较简单的,我们只需要将其加入Properties 传入KafkaProducer的构造函数。

public class ProducerInterceptor {
    KafkaProducer<String, String> producer = null;

    @Before
    public void setup() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        List<String> interceptors = new ArrayList<>();
        // 拦截器 1
        interceptors.add("com.kingcall.clients.producer.interceptors.interceptorEntity.producer.CountRecordProducerInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
        producer = new KafkaProducer<String, String>(props);
    }

    @Test
    public void baseSend() {
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "Precision Products", "France");
        try {
            Future<RecordMetadata> recordMetadataFuture = producer.send(record);
            System.out.println(recordMetadataFuture.get().toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
查看统计结果

image-20210310114903267

注意:线上不要使用 keys *

总结

  1. 拦截器本身虽然是一个比较小众的功能,但是可以很好的帮助我们解决遇到的问题
  2. 拦截器可能会影响客户端的性能,所以要合理使用,注意监控
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐