Kafka 线上问题: 消息堆积一直不消费, 重启服务后开始消费
消息堆积一直不消费,感觉消费者已经死掉一样。重启服务后开始消费,但是消费一段时间又停止。开始尝试增加消费者数量和增加 pod(节点)数,但是都不能完全解决,异常依然存在。通过 dump 出堆栈信息发现消费则全部处在 WAITING 状态,这个状态是挂起状态,并且是无限期等待:"kafka-coordinator-heartbeat-thread | CID_alikafka_xxx" #125 d
消息堆积一直不消费,感觉消费者已经死掉一样。重启服务后开始消费,
但是消费一段时间又停止。
开始尝试增加消费者数量和增加 pod(节点)数,但是都不能完全解决,异常依然存在。
通过 dump 出堆栈信息发现消费则全部处在 WAITING 状态,这个状态是挂起状态,并且是无限期等待:
"kafka-coordinator-heartbeat-thread | CID_alikafka_xxx" #125 daemon prio=5 os_prio=0 tid=0x00007f1aa57fa000 nid=0x86 in Object.wait() [0x00007f1a8af80000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:920) - locked <0x00000000e798f558> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) Locked ownable synchronizers: - None "kafka-coordinator-heartbeat-thread | CID_alikafka_xxx" #124 daemon prio=5 os_prio=0 tid=0x00007f1aa546b800 nid=0x85 in Object.wait() [0x00007f1a8b081000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:920) - locked <0x00000000e798f888> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) Locked ownable synchronizers: - None
后来经过查看官方文档发现一句话:
https://docs.spring.io/spring-kafka/docs/2.6.3-SNAPSHOT/reference/html/
You should understand that the retry discussed in the preceding section suspends the consumer thread (if a BackOffPolicy
is used). There are no calls to Consumer.poll()
during the retries. Kafka has two properties to determine consumer health. The session.timeout.ms
is used to determine if the consumer is active. Since kafka-clients
version 0.10.1.0
, heartbeats are sent on a background thread, so a slow consumer no longer affects that. max.poll.interval.ms
(default: five minutes) is used to determine if a consumer appears to be hung (taking too long to process records from the last poll). If the time between poll()
calls exceeds this, the broker revokes the assigned partitions and performs a rebalance. For lengthy retry sequences, with back off, this can easily happen.
消费者被挂起了,因为超过了 max.poll.interval.ms 默认五分钟,其实罪魁祸首还是拿到消息后业务处理太慢了,这块后续优化掉。
后面加大 spring.kafka.properties.max.poll.interval.ms 到 600000 (10分钟)解决。
spring.kafka.producer.batch-size 150 一次性拉取消息数 spring.kafka.properties.max.poll.interval.ms 两次poll的间隔默认5分钟 spring.kafka.producer.batch-size 一次性提交大小(默认16384字节)针对消息生产者 spring.kafka.listener.concurrency 消费者数量,平均分配kafka的partition,如24个par
更多推荐
所有评论(0)