1. 上栈
[2016-07-01 15:58:55,889] ERROR Commit of Thread[WorkerSinkTask-beaver_http_response-connector-0,5,main] offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:101)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:180)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:861)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:828)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
[2016-07-01 15:58:55,889] ERROR Commit of Thread[WorkerSinkTask-beaver_http_response-connector-8,5,main] offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:101)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
2. 原因
有个很重要的概念,kafka会管理如何分配分区的消费。
(1)当某个consumer消费了数据,但是在特定的时间内没有commit,它就认为consumer挂了。这个时候就要reblance了。这个时间和“heartbeat.interval.ms”配置相关。
(2)每次consumer从kafka poll数据时,每次poll会有一个量的控制,“max.partition.fetch.bytes”配置决定。
抛开kafka connect而言,上面这个两个配置要协调好,如果max.partition.fetch.bytes过大,heartbeat.interval.ms时间过短。恭喜你,肯定就开始上面的栈了。
好,把kafka connect拉过来piu piu吧。它有个很低调的配置:“offset.flush.interval.ms”。这个配置意义很重大,它是commit offset的时间间隔,默认是60秒。我检查了下我的配置:
"heartbeat.interval.ms":"30000"
也就是说30秒如果kafka connector不给kafka commit offset,就要reblance了。改了配置后,整个世界清静了。
3. 遗留问题
这个栈只有数据量灰常大的时候才会出现,为什么数据量小的时候不出现?
\kafka-2.0.0\clients\src\main\java\org\apache\kafka\clients\consumer\KafkaConsumer.java
上面的类的pollOnce方法中也许有答案。
喜欢看洋文的骚年,这里有相关文章:
http://stackoverflow.com/questions/35658171/kafka-commitfailedexception-consumer-exception
配置说明,这里找:
https://kafka.apache.org/090/configuration.html
所有评论(0)