1. 上栈
[2016-07-01 15:58:55,889] ERROR Commit of Thread[WorkerSinkTask-beaver_http_response-connector-0,5,main] offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:101)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:180)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:861)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:828)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
    at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
[2016-07-01 15:58:55,889] ERROR Commit of Thread[WorkerSinkTask-beaver_http_response-connector-8,5,main] offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:101)
      
      
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
2. 原因

有个很重要的概念,kafka会管理如何分配分区的消费。 
(1)当某个consumer消费了数据,但是在特定的时间内没有commit,它就认为consumer挂了。这个时候就要reblance了。这个时间和“heartbeat.interval.ms”配置相关。 
(2)每次consumer从kafka poll数据时,每次poll会有一个量的控制,“max.partition.fetch.bytes”配置决定。

抛开kafka connect而言,上面这个两个配置要协调好,如果max.partition.fetch.bytes过大,heartbeat.interval.ms时间过短。恭喜你,肯定就开始上面的栈了。

好,把kafka connect拉过来piu piu吧。它有个很低调的配置:“offset.flush.interval.ms”。这个配置意义很重大,它是commit offset的时间间隔,默认是60秒。我检查了下我的配置:

"heartbeat.interval.ms":"30000"
      
      
  • 1

也就是说30秒如果kafka connector不给kafka commit offset,就要reblance了。改了配置后,整个世界清静了。

3. 遗留问题

这个栈只有数据量灰常大的时候才会出现,为什么数据量小的时候不出现?

\kafka-2.0.0\clients\src\main\java\org\apache\kafka\clients\consumer\KafkaConsumer.java
      
      
  • 1

上面的类的pollOnce方法中也许有答案。

喜欢看洋文的骚年,这里有相关文章: 
http://stackoverflow.com/questions/35658171/kafka-commitfailedexception-consumer-exception 
配置说明,这里找: 

https://kafka.apache.org/090/configuration.html

原文地址:http://blog.csdn.net/xianzhen376/article/details/51802736

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐