Consumer 拦截器

前面我们详细介绍了拦截器的原理,可以参考kafka系列之Producer 拦截器(06),其实拦截器的在很多技术中都有,关于拦截器的应用场景我们在前面一节中也介绍过了,这一节我们直接看一下消费者端拦截器的使用。我觉得为了学习kafka ,你可以打开kafka 的源码包,看看都有什么,你可以从kafka-client 包开始

image-20210311154559231

我们今天要介绍的ConsumerInterceptor 就是在consumer 包下面,我么前面介绍的很多东西你都可以在这里看到响应的代码,例如分区的分配策略、再均衡监听器等等。

image-20210311154735265

ConsumerInterceptor

/**
 * A plugin interface that allows you to intercept (and possibly mutate) records received by the consumer. A primary use-case
 * is for third-party components to hook into the consumer applications for custom monitoring, logging, etc.
 * 其实这一句已经说明了它的主要使用场景,而且告诉你它是个插件,也就是可有可无的,主要用在第三方组件里,用来记录和监控
 */
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {

    /**
     * This is called just before the records are returned by {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}
     * 发生的时机:在返回给客户端之前,也就是poll() 方法之前
     * <p>
     * This method is allowed to modify consumer records, in which case the new records will be returned. 
     * 这个方法允许你修改records(记录集合),然后信息的记录集合被返回
     * There is no limitation on number of records that could be returned from this
     * method. I.e., the interceptor can filter the records or generate new records.
     * 没有返回记录条数上的限制,你可以在这里可以可以过滤或者是生成新的记录
     */
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

    /**
     * This is called when offsets get committed.
     * 当offset 被调教之后条用
     */
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

    /**
     * This is called when interceptor is closed
     */
    public void close();
}

onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。

onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。

ConsumerRecords 和 ConsumerRecord

这里我们注意到一个问题,那就是ConsumerInterceptor.onConsume 处理的对象是ConsumerRecords,这个Producer 拦截器的参数ProducerRecord不一样,而且从名称上我们就可以看出来这是个集合,那是什么集合呢

image-20210311164130219

它呢其实就是ConsumerRecord的一个集合,这是因为我们生产者生产记录是一条条生产的,而我们的消费者是一批批消费的。

拦截器案例

修改记录内容

这就是我们的拦截器实现

public class ChangeValueInterceptor implements ConsumerInterceptor<String, String> {

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        if (records == null || records.isEmpty()) {
            return records;
        }
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
        for (ConsumerRecord<String, String> record : records) {
            if ("kingcall".equals(record.key())) {
                record = new ConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), "刀光剑影江湖情,摧枯拉朽浪滔滔。功名利禄拂衣去,山高水远路迢迢");
            }
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            List<ConsumerRecord<String, String>> consumerRecords = newRecords.getOrDefault(topicPartition, new ArrayList<ConsumerRecord<String, String>>());
            consumerRecords.add(record);
            newRecords.put(topicPartition, consumerRecords);
        }
        return new ConsumerRecords<>(newRecords);
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

下面是我们的consumer 代码

public class ConsumerInterceptorDemo {
    private static KafkaConsumer<String, String> consumer;
    
    private static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", true);
        props.put("auto.commit.interval.ms", 1000);
        props.put("session.timeout.ms", 30000);
        props.put("max.poll.records", 1000);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }

    @Test
    public  void changeValue() {
        Properties configs = initConfig();
        List<String> interceptors = new ArrayList<>();
        // 拦截器 1
        interceptors.add("com.kingcall.clients.interceptors.interceptorEntity.consumer.ChangeValueInterceptor");
        configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
        consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList("test"));

        while (true) {
            // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
            records.forEach((ConsumerRecord<String, String> record) -> {
                System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
            });
        }
    }
}

当我们尝试发送两条消息

ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "你好祖国", "你好祖国");
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "kingcall", "kingcall");

输出如下:

image-20210311171517918

计算消息延时

这个怎么玩呢,前面我们在学习Producer 拦截器的时候记录下了,总共发送的数据条数,这里我们只要计算出来总的记录延迟,我们就可以算出来平均延时,从了解我们当前的处理能力,是否可以满足业务的需求

定义拦截器

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
    
    private static Jedis jedis;
    static {
        jedis = new Jedis("localhost");
    }

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalMessageCount"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void close() {
        jedis.close();
        jedis = null;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

使用拦截器

@Test
public  void avgLatency() {
    Properties configs = initConfig();
    List<String> interceptors = new ArrayList<>();
    // 拦截器 1
    interceptors.add("com.kingcall.clients.interceptors.interceptorEntity.consumer.AvgLatencyConsumerInterceptor");
    configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
    consumer = new KafkaConsumer<String, String>(configs);
    consumer.subscribe(Arrays.asList("test"));

    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record) -> {
            System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        });
    }
}

结果:

image-20210311173236283

总结

  1. Consumer 拦截器可以在我们获取消息之前和提交offset 之后进行一些操作,主要还是监控相关的
  2. 我们可以合理使用拦截器来满足我们的一些特殊需求
  3. 拦截器可能对吞吐有影响,注意监控
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐