Kafka消费端问题处理的一次实践,如下:

kafka消费端日志:

2024-07-19 14:27:35.645 transmission_loan_history_consumer-0-C-1 INFO   o.a.k.c.c.internals.Fetcher.initializeCompletedFetch:1274 - [Consumer clientId=consumer-transmissiongroup-11, groupId=transmissiongroup] Fetch offset 304363 is out of range for partition transmission_loan_history-0, resetting offset
2024-07-19 14:27:35.654 transmission_loan_history_consumer-0-C-1 INFO   o.a.k.c.c.i.SubscriptionState.maybeSeekUnvalidated:397 - [Consumer clientId=consumer-transmissiongroup-11, groupId=transmissiongroup] Resetting offset for partition transmission_loan_history-0 to offset 306840.
# 查看当前group消费情况 {ip}:{host}换成自己的ip和端口 ,{group}换成自己的group
./kafka-consumer-groups.sh --bootstrap-server {ip}:{host} --group {group} --describe

执行上面命令,发现该topic消息堵塞。

------------以上是问题及现象描述--------------------

由上,去网上查了原因:

Fetch offset is out of range for partition resetting offset · Issue #1493 · spring-projects/spring-kafka · GitHub

Kafka: Fetch offset is out of range for partition. How to avoid data loss? - Stack Overflow

Kafka Consumer configuration - How does auto.offset.reset controls the message consumption - Stack Overflow

这些是从网上找的一些分析,大概意思就是有些消息丢失了,至于是怎么丢失的,It Depends.

到这里,不就是消息堵塞、消息丢失了嘛,由此至少能大概能有一些应对办法:

1、增加分区

2、延长消息过期时间

3、db增加线程处理消费

上面这三个方法都是想要尽量让消费端消费快一点。

----------------------具体过程如下---------------------------------------

一、增加分区

# {ip}、{host}换成自己的kafka ip和端口
./kafka-topics.sh --bootstrap-server {ip}:{host} --alter --topic transmission_item --partitions 30

二、延长消息过期时间

二、延长consumer过期时间,具体就是改大session.timeout.ms参数的值
props.put("session.timeout.ms", "500000");

三、平衡max.poll.interval.ms和max.poll.records

三、平衡max.poll.interval.ms和max.poll.records
max.poll.records 表示每次默认拉取消息条数,默认值为 500

一次 kafka 消息堆积问题排查-腾讯云开发者社区-腾讯云

四、增加消费程序线程

经过以上参数控制变量修改,发现1、增加分区最有用,以上。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐