集群环境:CDH 5.13.0Spark 2.2.0Scala 2.11.0

ps:没营养的错误。。有些难受。还是要多看看源码啊。。


1.重头:Kafka整合SparkStreaming

官网整合文档:http://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
这里写图片描述

此处选择kafka 0.10版本
点进去首先就能看到关键东西 - Maven坐标,我们选择:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

这个整合依赖包包含很多组件,可以ctrl点击进去查看
但除了这个,我还加了两个包,便于本地测试:

<!--spark的核心包在spark-core在spark上已经有了-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.0</version>
    <scope>provided</scope>
</dependency>

Scope设置为provided,打包时就不用加进去了,因为集群环境本地存在这些包。

注:打包插件:

<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.yj.test1</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

通过Spark集群运行代码:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


def main(args: Array[String]): Unit = {

    //配置spqrkconf参数
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_kafka_direct").setMaster("local[2]")
    //构建sparkContext对象
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    //构建StreamingContext对象,每个批处理的时间间隔
    val ssc: StreamingContext = new StreamingContext(sc, Milliseconds(3000))
    //设置checkpoint
//    ssc.checkpoint("/user/spark/sparkstreaming/checkpoint")

    //设置broker的地址信息,有多个可以写多个,用逗号隔开
    val brokers = "cdh1:9092,cdh2:9092,cdh3:9092"

    //设置参数信息
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "fodr_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("test")

    // 创建一个DirectStream
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc, LocationStrategies.PreferConsistent, Subscribe[String, String](topics, kafkaParams)
    )


    stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      // some time later, after outputs have completed
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }

    val words: DStream[String] = stream.map(_.value()).flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//    val total: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
    val windows: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((v1: Int, v2: Int) =>
      v1 + v2, Seconds(3), Seconds(6))
    windows.print()
//    total.print()

    ssc.start()
    ssc.awaitTermination()
  }

注:若想使用 updateStateByKey 则必须设置 checkpoint

集群服务器提交job:

spark2-submit   --master yarn \
--deploy-mode client \
--num-executors 16 \
--executor-cores 2 \
--executor-memory 8g \
--class com.yj.test1 \
--driver-memory 2g \
/root/scala_test_bak-1.0-SNAPSHOT-jar-with-dependencies.jar

基本OK了。。


2.各种错误

错误1:首当其冲的一个bug:NoSuchMethodError

Exception in thread "streaming-start" java.lang.NoSuchMethodError:
 org.apache.kafka.clients.consumer.KafkaConsumer.subscribe

这个错误真的太经典了,在整合Kafka和SparkStreaming时。。百度谷歌过一堆,很常见,但真正有营养的没几个

总结:发生这种错误,99%都是因为版本没整合好!
比如:
Spark版本是不是2.2.0
Scala版本是不是2.11.0
kafka版本是不是高于0.10.0
我的问题就在于Spark默认的kafka版本为0.9,根本不是0.10!
这里写图片描述
改为0.10就ok了

仔细看看你的集群环境,版本吧

错误2:检查checkpoint,文件不存在
这里写图片描述
以为是checkpoint那行代码有问题,hdfs路径、权限、需要提前创建什么的,看了下源码:

def checkpoint(directory: String) {
    if (directory != null) {
      val path = new Path(directory)
      val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
      fs.mkdirs(path)
      val fullPath = fs.getFileStatus(path).getPath().toString
      sc.setCheckpointDir(fullPath)
      checkpointDir = fullPath
    } else {
      checkpointDir = null
    }
  }

并没有问题啊!(集群默认路径为hdfs)
排查发现:spark提交命令没写executor。。

错误3
这里写图片描述
task数疯狂弹跳上涨,感觉是checkpoint有问题,写线程被无限拒绝,task无限重启失败
解决:换了一个topic就好了。。

错误4:Xshell5终端消费消息看不到,无论怎么生产消息,就是看不到被消费
解决:换了一个SSH渠道就看到消息了(xshell显示问题)。。。


还有一些小问题夹杂其中,都有些莫名其妙。。日后使用时出现问题再总结吧

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐