kafka报java.io.EOFException错误解决方案

在项目中使用kafka来获取第三方推送的生产端数据进行消费,在项目运行过程中发现consumer.poll(1000)方法一直报java.io.EOFException错误,如下所示:

java.io.EOFException
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:96) ~[kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) ~[kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) ~[kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) ~[kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) [kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.common.network.Selector.poll(Selector.java:467) [kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) [kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) [kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) [kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) [kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) [kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316) [kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214) [kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) [kafka-clients-2.1.0.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) [kafka-clients-2.1.0.jar:?]
	at com.stream.task.hr.HRKafkaConsumer.hrEmpSync(HRKafkaConsumer.java:102) [FMP-TASK-V1.0.0.jar:?]
	at com.stream.task.hr.HrEmpSyncTask.execute(HrEmpSyncTask.java:43) [FMP-TASK-V1.0.0.jar:?]
	at com.stream.task.job.SchedulerJob$TaskRunnable.run(SchedulerJob.java:68) [FMP-TASK-V1.0.0.jar:?]
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_171]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_171]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_171]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_171]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_171]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]

看网上说是kafka版本问题,我这边看了下项目kafka使用的是kafka-clients-0.10.0.0.jar,
依赖是

	    <dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.0.0</version>
		</dependency>

使用这个kafka-clients依赖会有一个问题,那就是代码中ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);会进入死循环且报java.io.EOFException错误,于是试了好几个kafka-clients版本都不行,最后注释掉kafka-clients依赖换个kafka依赖:

		<!--<dependency>-->
			<!--<groupId>org.apache.kafka</groupId>-->
			<!--<artifactId>kafka-clients</artifactId>-->
			<!--<version>0.10.0.0</version>-->
		<!--</dependency>-->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.12</artifactId>
			<version>2.1.0</version>
		</dependency>

代码中对应修改一下这个poll()方法,改为ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));再次启动项目就不会无限的循环报错了。

至于kafka怎么使用论坛有好多帖子,就不一一赘述了。这次只是记录本项目中的报错以及修复过程,积累一些经验供以后参考。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐