创作不易,请勿抄袭,转载请注明出处。如有疑问,请加微信 wx15151889890,谢谢。
[本文链接:]https://blog.csdn.net/wx740851326/article/details/https://blog.csdn.net/wx740851326/article/details/84032929
写代码的,先前是在代码里直接写的kafka信息,后来将kafka的信息改为从配置文件读取了,结果就出错了。开始懵逼······

18/11/12 09:08:22 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
	at scala.util.Either.fold(Either.scala:98)
	at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
	at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
	at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
	at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
	at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
	at com.cvc.wj.handle.RealTimeAlarm.main(RealTimeAlarm.java:53)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)

看一看报错信息,是在创建读取kafka的流时出现了问题,创建流的代码如下

JavaPairInputDStream<String, String> lines = KafkaUtils
		.createDirectStream(jssc, String.class, // key类型
				String.class, // value类型
				StringDecoder.class, // 解码器
				StringDecoder.class, kafkaParams, topics);

一个是用到了spark,此处的spark只是初始化,不涉及其他,因此可以排除问题。剩下就是kafka 的配置参数和topic信息了。
然后把topic打出来看,发现是kafka的信息没有获取到,把配置文件里的topic配置正确之后,问题解决。
总结下,尤其是像这种从本来是好的状态到报错,很可能就是配置信息的问题,细心查看kafka的配置项,相信问题不会出现在kafka上。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐