kafka应用问题
kafka应用时出现的一些问题
·
1.问题一
Connection to node 2 could not be established. Broker may not be available.
解决办法:
1.检查防火墙是否开放相关端口
2.如果是部署在云服务器,检查云服务器是否开放相关端口
3.检查kafka的监听地址是否配置正确
2.问题二
问题描述:
以下问题导致kafka数据消费可能出现部分topic无法消费的情况。
2022-04-11 14:13:45.628 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.KafkaException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151)
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1368)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:422)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:497)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1274)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1236)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205)
at com.sun.proxy.$Proxy402.poll(Unknown Source)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
... 4 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition fish_killed_number-0 could be determined
问题原因及解决办法:
项目的kafka配置里面enable-auto-commit的值应该为false
当enable.auto.commit为true时,offset交给kafka来管理,offset进行默认的提交模式。这个时候,如果在向kafka里面生产消息时,offset的提交速度过慢,将会导致kafka的partition长期被锁住,进而导致消费者无法消费相关topic的数据,且在zookeeper里面的元数据会产生脏数据。
当enable.auto.commit为false时,相当于把offset交给了spring管理,由spring来代替我们做人工提交,有效避免了offset的提交速度过慢的问题。
更多推荐
已为社区贡献1条内容
所有评论(0)