kafka系列之Consumer 拦截器(9)
Consumer 拦截器前面我们详细介绍了拦截器的原理,可以参考kafka系列之Producer 拦截器(06),其实拦截器的在很多技术中都有,关于拦截器的应用场景我们在前面一节中也介绍过了,这一节我们直接看一下消费者端拦截器的使用。我觉得为了学习kafka ,你可以打开kafka 的源码包,看看都有什么,你可以从kafka-client 包开始我们今天要介绍的ConsumerIntercepto
Consumer 拦截器
前面我们详细介绍了拦截器的原理,可以参考kafka系列之Producer 拦截器(06),其实拦截器的在很多技术中都有,关于拦截器的应用场景我们在前面一节中也介绍过了,这一节我们直接看一下消费者端拦截器的使用。我觉得为了学习kafka ,你可以打开kafka 的源码包,看看都有什么,你可以从kafka-client 包开始
我们今天要介绍的ConsumerInterceptor
就是在consumer 包下面,我么前面介绍的很多东西你都可以在这里看到响应的代码,例如分区的分配策略、再均衡监听器等等。
ConsumerInterceptor
/**
* A plugin interface that allows you to intercept (and possibly mutate) records received by the consumer. A primary use-case
* is for third-party components to hook into the consumer applications for custom monitoring, logging, etc.
* 其实这一句已经说明了它的主要使用场景,而且告诉你它是个插件,也就是可有可无的,主要用在第三方组件里,用来记录和监控
*/
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
/**
* This is called just before the records are returned by {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}
* 发生的时机:在返回给客户端之前,也就是poll() 方法之前
* <p>
* This method is allowed to modify consumer records, in which case the new records will be returned.
* 这个方法允许你修改records(记录集合),然后信息的记录集合被返回
* There is no limitation on number of records that could be returned from this
* method. I.e., the interceptor can filter the records or generate new records.
* 没有返回记录条数上的限制,你可以在这里可以可以过滤或者是生成新的记录
*/
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
/**
* This is called when offsets get committed.
* 当offset 被调教之后条用
*/
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
/**
* This is called when interceptor is closed
*/
public void close();
}
onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。
onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
ConsumerRecords 和 ConsumerRecord
这里我们注意到一个问题,那就是ConsumerInterceptor.onConsume
处理的对象是ConsumerRecords,这个Producer 拦截器的参数ProducerRecord不一样,而且从名称上我们就可以看出来这是个集合,那是什么集合呢
它呢其实就是ConsumerRecord的一个集合,这是因为我们生产者生产记录是一条条生产的,而我们的消费者是一批批消费的。
拦截器案例
修改记录内容
这就是我们的拦截器实现
public class ChangeValueInterceptor implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
if (records == null || records.isEmpty()) {
return records;
}
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
if ("kingcall".equals(record.key())) {
record = new ConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), "刀光剑影江湖情,摧枯拉朽浪滔滔。功名利禄拂衣去,山高水远路迢迢");
}
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
List<ConsumerRecord<String, String>> consumerRecords = newRecords.getOrDefault(topicPartition, new ArrayList<ConsumerRecord<String, String>>());
consumerRecords.add(record);
newRecords.put(topicPartition, consumerRecords);
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
下面是我们的consumer 代码
public class ConsumerInterceptorDemo {
private static KafkaConsumer<String, String> consumer;
private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", true);
props.put("auto.commit.interval.ms", 1000);
props.put("session.timeout.ms", 30000);
props.put("max.poll.records", 1000);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return props;
}
@Test
public void changeValue() {
Properties configs = initConfig();
List<String> interceptors = new ArrayList<>();
// 拦截器 1
interceptors.add("com.kingcall.clients.interceptors.interceptorEntity.consumer.ChangeValueInterceptor");
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
}
}
}
当我们尝试发送两条消息
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "你好祖国", "你好祖国");
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "kingcall", "kingcall");
输出如下:
计算消息延时
这个怎么玩呢,前面我们在学习Producer 拦截器的时候记录下了,总共发送的数据条数,这里我们只要计算出来总的记录延迟,我们就可以算出来平均延时,从了解我们当前的处理能力,是否可以满足业务的需求
定义拦截器
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private static Jedis jedis;
static {
jedis = new Jedis("localhost");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalMessageCount"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
jedis.close();
jedis = null;
}
@Override
public void configure(Map<String, ?> configs) {
}
}
使用拦截器
@Test
public void avgLatency() {
Properties configs = initConfig();
List<String> interceptors = new ArrayList<>();
// 拦截器 1
interceptors.add("com.kingcall.clients.interceptors.interceptorEntity.consumer.AvgLatencyConsumerInterceptor");
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
}
}
结果:
“
总结
- Consumer 拦截器可以在我们获取消息之前和提交offset 之后进行一些操作,主要还是监控相关的
- 我们可以合理使用拦截器来满足我们的一些特殊需求
- 拦截器可能对吞吐有影响,注意监控
更多推荐
所有评论(0)