Sparak-Streaming基于Offset消费Kafka数据


1、官方提供消费kafka的数据实例

 // Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array[OffsetRange]()
	
 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }


官网链接:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

核心思想就是在,创建kafkastream同时获取偏移量,消费完的同时,执行更新操作。


2、自定义操作kafka数据的offset的manager

这里注意,需要把类,放在 org.apache.spark.streaming.kafka下面,与源码操作的包一致

package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import scala.reflect.ClassTag
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.StreamingContext

class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {

  private val kc = new KafkaCluster(kafkaParams)

  private val flag = 1150 * 10000l

  /**
   * 创建数据流
   * @param ssc
   * @param kafkaParams
   * @param topics
   * @tparam K
   * @tparam V
   * @tparam KD
   * @tparam VD
   * @return
   */
  def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = {
    val groupId = kafkaParams.get("group.id").get
    // 在zookeeper上读取offsets前先根据实际情况更新offsets  
    setOrUpdateOffsets(topics, groupId)

    //从zookeeper上读取offset开始消费message  
    val messages = {
      val partitionsE = kc.getPartitions(topics)
      if (partitionsE.isLeft)
        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft)
        throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
      val consumerOffsets = consumerOffsetsE.right.get
      KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
    }
    messages
  }

  /**
   * 创建数据流前,根据实际消费情况更新消费offsets
   * @param topics
   * @param groupId
   */
  private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
    topics.foreach(topic => {
      var hasConsumed = true
      val partitionsE = kc.getPartitions(Set(topic))
      if (partitionsE.isLeft)
        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft) hasConsumed = false
      if (hasConsumed) { // 消费过
        /**
         * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
         * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
         * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
         * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
         * 这时把consumerOffsets更新为earliestLeaderOffsets
         */
        val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
        if (earliestLeaderOffsetsE.isLeft)
          throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
        val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
        val consumerOffsets = consumerOffsetsE.right.get

        // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
        var offsets: Map[TopicAndPartition, Long] = Map()
        consumerOffsets.foreach({
          case (tp, n) =>
            val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
            if (n < earliestLeaderOffset) {
              println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
                " offsets已经过时,更新为" + earliestLeaderOffset)
              offsets += (tp -> earliestLeaderOffset)
            }
        })
        if (!offsets.isEmpty) {
          kc.setConsumerOffsets(groupId, offsets)
        }
      } else { // 没有消费过
        val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
        var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
        if (reset == Some("smallest")) {
          val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft)
            throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsets = leaderOffsetsE.right.get
        } else {
          val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft)
            throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsets = leaderOffsetsE.right.get
        }
        val offsets = leaderOffsets.map {
          case (tp, offset) => (tp, offset.offset)
        }
        kc.setConsumerOffsets(groupId, offsets)
      }
    })
  }

  /**
   * 更新zookeeper上的消费offsets
   * 把当前的消费记录,写入zk
   *
   * @param rdd
   */
  def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
    val groupId = kafkaParams.get("group.id").get
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    for (offsets <- offsetsList) {
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
      if (o.isLeft) {
        println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
      }
    }
  }

  /**
   * 更新zookeeper上的消费offsets
   * 把当前的消费记录的offset往前推
   * 并写入zk
   *
   * @param rdd
   * @param day
   */
  def updateZKOffsetsFromoffsetRanges(offsetRanges: Array[OffsetRange], day: Double): Unit = {
    val groupId = kafkaParams.get("group.id").get

    for (offsets <- offsetRanges) {
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)

      var offsetStreaming = 0l

      println("offsets.untilOffset " + offsets.untilOffset)

      /**
       * 如果streaming挂掉,则从偏移量的前flag开始计算
       * 由于在streaming里的window函数中进行了去重处理
       * 因此不用担心数据重复的问题
       */
      if (offsets.untilOffset >= flag) {
        offsetStreaming = offsets.untilOffset - (flag * day).toLong
      } else {
        offsetStreaming = 0
      }

      println("offsetStreaming " + offsetStreaming)

      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsetStreaming)))
      if (o.isLeft) {
        println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
      }
    }
  }
}  


提供两个方法

1) 从哪里结束消费,再从那里开始继续

2) 从结束消费向前移动偏移量的操作,然后再重新消费


3、使用案例

    val conf = new SparkConf().setAppName("NewsTopNRealRankOffset")//.setMaster("local[3]");

    conf.set("spark.streaming.blockInterval", "50ms");
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.storage.memoryFraction", "0.4") //executor分配给缓存的内存比例,默认为0.6即60%,剩下40%为task运行的内存,实际上40%是偏小的
    conf.set("spark.locality.wait", "6000") //6000毫秒
    conf.set("spark.streaming.kafka.maxRatePerPartition", "35000") // 限制每秒钟从topic的每个partition最多消费的消息条数

    //shuffle优化
    conf.set("spark.shuffle.consolidateFiles", "true")
    conf.set("spark.reducer.maxSizeInFlight", "150m")
    conf.set("spark.shuffle.file.buffer", "128k")
    conf.set("spark.shuffle.io.maxRetries", "8")
    conf.set("spark.shuffle.io.retryWait", "6s")
    conf.set("spark.shuffle.memoryFraction", "0.3")

    val sc = new SparkContext(conf);
    val ssc = new StreamingContext(sc, Seconds(25));
    //缓存的数据
    ssc.remember(Minutes(60 * 24 * 2));

    val sqlContext = new HiveContext(sc);

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

    //1.注册UDF
    val udf = UDFUtils();
    udf.registerUdf(sqlContext);

    //2.kafka数据处理
    val kafkaServiceOffset = KakfaServiceOffset();

    val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092";
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder",
      "group.id" -> "tracklogPrdNewsTopN_TrackLogT", "auto.offset.reset" -> "largest"); //largest smallest

    val topics = Set("TrackLogT");

    //通过自定义的KafkaManager获取kafka数据源
    val km = new KafkaManager(kafkaParams);
    val kafkaStream = km.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics);
    var offsetRanges = Array[OffsetRange]()

    //创建kafkastream同时获取偏移量
    val kafkaStreamOffset = kafkaStream.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }

    val urlClickLogPairsDStream = kafkaServiceOffset.kafkaDStreamForNewsOffset(ssc, kafkaStreamOffset);

    //3.缓存hive中的数据
    val cacheUtils = CacheUtils();
    cacheUtils.cacheHiveData(sqlContext);

    //4.缓存窗口函数数据处理
    val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      Seconds(3600 * 35),
      Seconds(250));

    //5.处理业务逻辑
    urlClickCountsDStream.foreachRDD(urlClickCountsRDD => {
      val urlClickCountRowRDD = urlClickCountsRDD.map(tuple => {
        val datetime = tuple._1.split("\001")(0) + " " + tuple._1.split("\001")(1);
        val cookieid = tuple._1.split("\001")(2);
        val url = tuple._1.split("\001")(3);
        val artId = tuple._1.split("\001")(4);
        val click_count = tuple._2;
        Row(datetime, cookieid, url, artId, click_count);
      });

      val structType = StructType(Array(
        StructField("datetime", StringType, true),
        StructField("cookieid", StringType, true),
        StructField("url", StringType, true),
        StructField("artId", StringType, true),
        StructField("click_count", IntegerType, true)));

      val categoryProductCountDF = sqlContext.createDataFrame(urlClickCountRowRDD, structType);

      categoryProductCountDF.registerTempTable("url_click_log_day");

      //消费完的同时,更新操作
      km.updateZKOffsetsFromoffsetRanges(offsetRanges, 1);

      cacuDayTopN(sqlContext);
      cacuHourTopN(sqlContext);
    });

    //6.启动streaming任务
    ssc.start();
    ssc.awaitTermination();

  

注意,设置"group.id" ,即为在kafka队列的偏移量标志在zookeeper中的名称。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐