今天在做Spark Streaming的测试实验时,打包成Jar后,Submit提交任务时,报错:

   

16/01/04 16:43:56 INFO VerifiableProperties: Property group.id is overridden to kafkatest
16/01/04 16:43:56 INFO VerifiableProperties: Property zookeeper.connect is overridden to slave9:2181,slave10:2181,slave11:2181
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
	at scala.Option.map(Option.scala:145)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
	at scala.util.Either$RightProjection.flatMap(Either.scala:523)
    然后各中百度,谷歌,找了很多资料,终于在:  http://stackoverflow.com/questions/34145483/spark-streaming-kafka-stream?answertab=votes#tab-top  找到了同样的错误,上面说是由于Spark Streaming默认使用的是Kafka 0.8.2.1,但是使用的时候使用了Kafka 0.9.0.0,貌似跟我的情况一样啊!于是,我兴高采烈的将SBT依赖的Kafka版本改成0.8.2.1,重新编译打包,再次Submit,可问题依旧,这很不科学啊!!

    后来,我仔细分析了一下错误信息,估计还是哪里序列化导致的问题,于是,我分析自己代码里,涉及到序列化的地方:

    val messages: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc, kafkaParams, topicsSet).map(_._2)
     初步分析,这里会涉及到Kafka序列化的问题。于是,我仔细一想: StringDecoder 这个接口是从哪一个包里引入进来的?

    查看代码最前的引入包部分,有这么一行:

   import _root_.kafka.serializer.StringDecoder

    怎么前面会有_root_ ?哦,原来刚开始时我是没有引入StringDecoder相关的包得,我是使用Idea中Alt + Enter,然后从相互来的列表中选择包,让Idea自动帮我引入包的,但是为 什么会有_root_这一个前缀,我就不知道啦!感觉这里就怪怪的,于是我改成了:

   import kafka.serializer.StringDecoder

    再次编译打包,Submit,OK!成功运行,居然被这么一个问题浪费了一下午的时间,罪过!在此备忘,也希望能帮到遇到跟我一样问题的同学!


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐