1.问题一

Connection to node 2 could not be established. Broker may not be available.

解决办法:
1.检查防火墙是否开放相关端口
2.如果是部署在云服务器,检查云服务器是否开放相关端口
3.检查kafka的监听地址是否配置正确

2.问题二

问题描述:
以下问题导致kafka数据消费可能出现部分topic无法消费的情况。

2022-04-11 14:13:45.628 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.KafkaException's; no record information is available
	at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151)
	at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1368)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:422)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:497)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1274)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1236)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205)
	at com.sun.proxy.$Proxy402.poll(Unknown Source)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
	... 4 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition fish_killed_number-0 could be determined

问题原因及解决办法
项目的kafka配置里面enable-auto-commit的值应该为false

当enable.auto.commit为true时,offset交给kafka来管理,offset进行默认的提交模式。这个时候,如果在向kafka里面生产消息时,offset的提交速度过慢,将会导致kafka的partition长期被锁住,进而导致消费者无法消费相关topic的数据,且在zookeeper里面的元数据会产生脏数据。

当enable.auto.commit为false时,相当于把offset交给了spring管理,由spring来代替我们做人工提交,有效避免了offset的提交速度过慢的问题。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐