由于消息消费速度处理慢或是消费端故障会导致数据产生积压。
那怎么查看数据积压量呢?
Consumer-Groups管理;
在Kafka 的bin目录下提供了 kafka-consumer-groups.sh 脚本。此脚本用于管理消费情况。
查询消费者组

$KAFKA_DIR/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查询消费者组详情

$KAFKA_DIR/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname

在这里插入图片描述
消费积压情况分析

LogEndOffset :下一条将要被加入到日志的消息的位移

CurrentOffset :当前消费的位移

LAG :消息堆积量

消息堆积量:消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量也称之为消费滞后量

在这里插入图片描述
在这里插入图片描述
消息发送到LeaderA之后会更新LEO的值,Follower1和Fllower2也会实时拉取LeaderA中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移。也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所有HW必然不会与Leader的LEO相等,即LEO>=HW

重设消费者组位移

最早处

$KAFKA_DIR/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute

最新处

$KAFKA_DIR/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute

某个位置

$KAFKA_DIR/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute

调整到某个时间之后得最早位移

$KAFKA_DIR/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-of
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐