环境:
scala2.11.8
kafka_2.11-0.11.0.1
spark-2.0.0-bin-hadoop2.7

第一个问题
创建一个WordCount,一端自动生成,一端读取之后从spark查看,但是查看端怎么都起不起来,遇到问题如下

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/spark/jars/kafka/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
        at org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:19)
        at org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 11 more

sbt的build.sbt一开始这样写的:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"

猛然发现spark-streaming-kafka和spark-streaming版本不对,于是更换了版本号,重新打包,问题依旧,心如死灰,毅然决然换掉spark2.0.0的版本再试试,先吃饭,中午回来接着写

回来继续,换了个版本依然没什么奇效,再来看看怎么办吧,依旧是老错误。还是不知道是什么错误,不过换了一下版本,将kafka的版本换成了kafka_2.11-0.8.2.0,出现了新的错误,也算是有点欣慰,错误提示如下:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/spark/jars/kafka/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.streaming.dstream.DStream.validateAtStart(DStream.scala:242)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$validateAtStart$8.apply(DStream.scala:275)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$validateAtStart$8.apply(DStream.scala:275)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.streaming.dstream.DStream.validateAtStart(DStream.scala:275)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$4.apply(DStreamGraph.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$4.apply(DStreamGraph.scala:48)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.streaming.DStreamGraph.start(DStreamGraph.scala:48)
        at org.apache.spark.streaming.scheduler.JobGenerator.startFirstTime(JobGenerator.scala:194)
        at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:100)
        at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:102)
        at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
        at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
        at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
        at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
        at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
        at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
        at org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:25)
        at org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

原因是少指定了一个checkpoint,加上就好了,所以最终的环境

scala-2.11.8
kafka_2.11-0.8.2.0
spark-2.1.0-bin-hadoop2.7

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐