监控消费进度 : 看滞后程度:消费者 Lag , Consumer Lag

滞后程度 : 消费者落后于生产者的程度

  • 如 : Kafka 生产者向某主题成功生产 100 万条消息,消费者消费 80 万条消息
  • 那消费者就滞后 20 w条,即 Lag = 20 w

Kafka 监控 Lag 是在分区上的层级 :

  • 主题的 Lag = 手动汇总主题下所有分区的 Lag

Lag : 反映消费者的运行情况

  • 正常工作的消费者,它的 Lag 很小,表明能及时消费消息,滞后程度很小
  • 当消费者 Lag 值很大,表明它无法跟上生产者的速度,最终 Lag 会越来越大,导致拖慢下游消息的处理速度

当消费者的速度无法匹及生产者的速度 :

  • 可能出现消费数据不在页缓存中,就无法享受 Zero Copy
  • 消费者就要从磁盘上读取数据,会拉大了与生产者的差距,出现马太效应
  • 那些 Lag 大的消费者会越来越慢,Lag 会越来越大

生产环境中要时刻关注消费者的消费进度 :

  • 出现 Lag 逐步增加的趋势,要定位问题,及时处理,避免造成业务损失

消费进度的监控方法 :

  • 命令行工具 kafka-consumer-groups
  • Java Consumer API 编程
  • JMX 监控指标

Kafka 自带命令

Kafka 自带的命令行工具 : bin/kafka-consumer-groups.sh

  • kafka-consumer-groups : 监控消费者消费进度的工具

该脚本在 Kafka bin 目录下,查看某个给定消费者的 Lag 值:

bin/kafka-consumer-groups.sh \
--bootstrap-server <Kafka broker 连接信息> \
--describe --group <group 名称>

Kafka 连接信息 = < 主机名:端口 >

  • group 名 : 消费者设置的 group.id 值

例子:

  • 主题、分区
  • LOG-END-OFFSET : 每个分区当前最新生产的消息的位移值
  • CURRENT-OFFSET : 该消费者组当前最新消费消息的位移值
  • LAG 值(前两者的差值)、消费者实例 ID
  • 消费者连接 Broker 的主机名 , 消费者的 CLIENT-ID 信息

在这里插入图片描述

Java Consumer API

Java Consumer API :

  • 查询当前分区最新消息位移
  • 查询消费者组最新消费消息位移

用 Consumer API 监控消费者组的 Lag 值:

  • 只适用于 Kafka 2.0.0 及以上的版本
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
    Properties props = new Properties();
    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    
    try (AdminClient client = AdminClient.create(props)) {
        // 获取给定消费者组的最新消费消息的位移
        ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
        try {
            Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            
            try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                // 获取订阅分区的最新消息位移
                Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), 
                                                                               // 执行减法操作,获取 Lag 值并封装进一个 Map 对象
                                                                               entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            // 处理中断异常
            // ...
            return Collections.emptyMap();
        } catch (ExecutionException e) {
            // 处理 ExecutionException
            // ...
            return Collections.emptyMap();
        } catch (TimeoutException e) {
        	throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
        }
    }
}

JMX 监控指标

Kafka 消费者的 JMX 指标 : kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"

  • records-lag-max : 窗口内曾经达到的最大的 Lag 值
  • records-lead-min : 最小的 Lead 值

Lead : 消费者最新消费消息的位移与分区当前第一条消息位移的差值

  • Lag 越大,Lead 越小
  • Lead 快接近于 0 时,消费者就可能丢消息

kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"

  • records-lag-avg : 平均的 Lag 值
  • records-lead-avg : 平均的 Lead 值
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐