Spark Streaming整合kafka的第二种方式

1. Direct Approach (No Receivers)

  • 这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。
  • 替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

优点:

简化并行读取:sparkRDD
  如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。
所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

高性能:不需要开启WAL机制,数据仅备份一次
  如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

一次且仅一次的失误机制:可以实现Exactly once,而不是At least once
  基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

降低资源: 不需要单独申请executor来接收数据
  Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。

降低内存:实时计算不需要设置大缓存来为异步服务
  Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右

鲁棒性好:不会因为特殊情况下异步导致的数据堆积,引起的程序失败
  Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

缺点:

此方法的一个缺点是它不会更新Zookeeper中的偏移量,因此使得基于zookeeper的consumer offsets的监控工具都会失效。但是,您可以在每个批次中访问通过此方法处理的偏移量,并自己更新Zookeeper

2. 偏移量解决方案

自动提交偏移量

如果自动提交偏移量,如60S提交一次。可能会出现以下问题

  1. 数据处理失败,自动提交偏移量。会出现数据丢失
  2. 数据处理成功了,自动提交偏移量(比较理想),有可能出现自动提交偏移量失败,从而导致数据重复消费
spark streaming 整合kafka1.0版本以下

pom.xml

<dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
       <version>2.2.0</version>
</dependency> 

代码:

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object kafka08 {
  /**
    * sparkStreaming使用kafka 0.8API基于Direct直连来接受消息
    * spark direct API接收kafka消息,从而不需要经过zookeeper,直接从broker上获取信息。
    */
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("kafkaDirect08").setMaster("local[2]")
    val sc = new StreamingContext(conf,Seconds(2))
    val kafkaParams =  Map("metadata.broker.list"->"node01:9092,node02:9092,node03:9092",
    "group.id"->"KafkaDirect08")
    val topics = Set("test")


    val kafkaDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](sc,kafkaParams,topics)
    //获得kafka的topic数据
    val data: DStream[String] = kafkaDS.map(_._2)
    val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.print()

    sc.start()
    sc.awaitTermination()
  }

手动提交偏移量

解决方案
在这里插入图片描述
代码:

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}


object kafka08NotLoseData {
  def main(args: Array[String]): Unit = {
    //todo:1、创建SparkConf 提交到集群中运行 不要设置master参数
    val conf = new SparkConf().setAppName("KafkaManagerOffset08").setMaster("local[4]")

    //todo: 2、设置SparkStreaming,并设定间隔时间
    val ssc = new StreamingContext(conf, Seconds(5))

    //todo:3、指定相关参数

    //指定组名
    val groupID = "consumer-kafka08NotLoseData"
    //指定消费者的topic名字
    val topic = "wordcount"
    //指定kafka的broker地址
    val brokerList = "node01:9092,node02:9092,node03:9092"

    //指定zookeeper的地址,用来存放数据偏移量数据,也可以使用Redis MySQL等
    val zkQuorum = "node01:2181,node02:2181,node03:2181"

    //创建Stream时使用的topic名字集合,SparkStreaming可同时消费多个topic
    val topics: Set[String] = Set(topic)

    //创建一个 ZKGroupTopicDirs 对象,就是用来指定在zk中的存储目录,用来保存数据偏移量
    val topicDirs = new ZKGroupTopicDirs(groupID, topic)

    //获取 zookeeper 中的路径 "/consumers/consumer-kafka08NotLoseData/offsets/wordcount"
    val zkTopicPath: String = topicDirs.consumerOffsetDir

    //构造一个zookeeper的客户端 用来读写偏移量数据
    val zkClient = new ZkClient(zkQuorum)

    //准备kafka的参数
    val kafkaParams = Map(
      "metadata.broker.list" -> brokerList,
      "group.id" -> groupID,
      "enable.auto.commit" -> "false"
    )

    //todo:4、定义kafkaStream流
    var kafkaStream: InputDStream[(String, String)] = null

    //todo:5、获取指定的zk节点的子节点个数

    val childrenNum = getZkChildrenNum(zkClient,zkTopicPath)


    //todo:6、判断是否保存过数据 根据子节点的数量是否为0
    if (childrenNum > 0) {
        //构造一个map集合用来存放数据偏移量信息
        var fromOffsets: Map[TopicAndPartition, Long] = Map()

        //遍历子节点
        for (i <- 0 until childrenNum) {
          //获取子节点  /consumers/consumer-kafka08NotLoseData/offsets/wordcount/0
          val partiitonOffset: String = zkClient.readData[String](s"$zkTopicPath/$i")
          // /wordcount-----0
          val tp = TopicAndPartition(topic,i)
          //获取数据偏移量  将不同分区内的数据偏移量保存到map集合中
          //  wordcount/0 -> 1001
          fromOffsets += (tp->partiitonOffset.toLong)
        }
        // 泛型中 key:kafka中的key   value:hello tom hello jerry
        //创建函数 解析数据 转换为(topic_name, message)的元组
      val messageHandler = (mmd : MessageAndMetadata[String,String]) => (mmd.topic,mmd.message())
        //todo:7、利用底层的API创建DStream 采用直连的方式(之前已经消费了,从指定的位置消费)
      kafkaStream =  KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)
    }else {
      //todo:7、利用底层的API创建DStream 采用直连的方式(之前没有消费,这是第一次读取数据)
      //zk中没有子节点数据 就是第一次读取数据 直接创建直连对象
      kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
    }
      //todo:8、直接操作kafkaStream
      //依次迭代DStream中的kafkaRDD 只有kafkaRDD才可以强转为HasOffsetRanges  从中获取数据偏移量信息
      //之后是操作的RDD 不能够直接操作DStream 因为调用Transformation方法之后就不是kafkaRDD了获取不了偏移量信息
    kafkaStream.foreachRDD(kafkaRDD=>{
      //强转为HasOffsetRanges 获取offset偏移量数据
      val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
      //获取数据
      val lines: RDD[String] = kafkaRDD.map(_._2)
      //todo:9、接下来就是对RDD进行操作 触发action
      lines.foreachPartition(partitions=>{
        partitions.foreach(println)
      })
      //todo: 10、手动提交偏移量到zk集群上
      for (o <- offsetRanges){
        //拼接zk路径   /consumers/consumer-kafka08NotLoseData/offsets/wordcount/0
        val zkPath = s"${topicDirs.consumerOffsetDir}/${o.untilOffset.toString}"
        //将 partition 的偏移量数据 offset 保存到zookeeper中
        ZkUtils.updatePersistentPath(zkClient,zkPath,o.untilOffset.toString)
      }

    })
    //开启SparkStreaming 并等待退出
    ssc.start()
    ssc.awaitTermination()
}
  /**
    * 获取zk节点上的子节点的个数
    * @param zkClient
    * @param zkTopicPath
    * @return
    */
    def getZkChildrenNum(zkClient : ZkClient,zkTopicPath : String):Int={
      //查询该路径下是否有子节点,即是否有分区读取数据记录的读取的偏移量
      // /consumers/consumer-kafka08NotLoseData/offsets/wordcount/0
      // /consumers/consumer-kafka08NotLoseData/offsets/wordcount/1
      // /consumers/consumer-kafka08NotLoseData/offsets/wordcount/2

      //子节点的个数
      val childrenNum: Int = zkClient.countChildren(zkTopicPath)
      childrenNum
    }
}
spark streaming 整合kafka1.0版本
  • 推荐使用1.0版本:0.8版本不能用原生API实现自动感知kafka分区的动态变化

pom.xml文件

 <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
 </dependency>

代码:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.dstream.InputDStream


object kafka10 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("receiverKafka10").setMaster("local[2]")
    val sc = new StreamingContext(conf,Seconds(2))

    val kafkaParams=Map(
      "bootstrap.servers" ->"node01:9092,node02:9092,node03:9092",
      "group.id" -> "KafkaDirect10",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "enable.auto.commit" -> "false"
    )
    val topics = Set("test")
    //数据封装成ConsumerRecord对象
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      sc,
      //数据本地性策略  Use this in most cases, it will consistently distribute partitions across all executors
      LocationStrategies.PreferConsistent,
      //指定要订阅的topic
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    //处理数据
    kafkaDStream.foreachRDD(rdd=>{
      val dataRDD: RDD[String] = rdd.map(_.value())
      dataRDD.foreach(line=>{
        println(line)
      })
      //偏移量封装在RDD中,提交偏移量,将偏移量添加到kafka中
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //手动提交偏移量
      kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })
    sc.start()
    sc.awaitTermination()
  }
}

参考:https://blog.csdn.net/a3125504x/article/details/108473421

使用spark-submit提交
在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐