kafka偏移量管理总结

1:基本概念

在这里插入图片描述
在这里插入图片描述

1.1:spark streaming 的偏移量管理

在这里插入图片描述
0.9版本之后不区分高低API,消费API稍微不同

1.3:offset 管理

在这里插入图片描述
在这里插入图片描述

2:开发实战测试

为了使用kafka的消费者组的查看命令和kafka的管理工具进行监控,特测试那种情况可以进行监控
日志级别设置,减少日志量

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
  Logger.getLogger("org.spark-project").setLevel(Level.WARN)

2.1:kafka0.8版本+sparkstreaming:偏移量保存到checkpoint+kafka

同时用两种方式进行kafka偏移量的保存
测试旧版kafka和spark结合的偏移量保存方式,测试是否可以用消费者组命令进行查看和kafka的监控软件进行监控
2.1.1:自动提交偏移量
测试topic:spark01
"enable.auto.commit" -> "true",
消费者组查看:没有消费者组
kafka监控:未尝试
2.1.2:外部管理:单独checkpoint

测试结果

关闭自动提交
仅仅用checkpoint时,kafka无法查看偏移量信息
消费者组查看:消费者组不会显示,偏移量不可查看
监控工具:不可监控

代码

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.concurrent.duration.Duration
//0.8kafka+spark进行测试
object _04StreamingGetOrCreate {
  //可本地可hdfs
  val checkpointPath = "file:///C:\\Users\\Administrator.SC-201905261418\\Desktop\\bigdata开发实例\\checkpoint6"

  def main(args: Array[String]): Unit = {
    val context = StreamingContext.getOrCreate(checkpointPath, createSSC)
    context.start()
    context.awaitTermination()
  }

  def createSSC(): StreamingContext = {
    val conf = new SparkConf().setAppName("local test checkpoint").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))
    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> "hdp01:9092,hdp02:9092,hdp03:9092",
      "group.id" -> "comsumer1",
      //smallest从偏移量最早的位置开始读取,开发多用此配置
      //本次demo即保存在checkpoint中
      "auto.offset.reset" -> "smallest",
      "enable.auto.commit" -> "false",
      //此时我们相当于在消费数据,指定反序列数据的方式,实现org.apache.kafka.common.serialization.Deserializer的类
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")
    val topic = Set("spark01")
    ssc.checkpoint(checkpointPath)
    //dstreams是一个rdd的集合rdds
    val dstreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
    //配置持久定时写到checkpoint,参数建议:5-10倍的batch
    dstreams.checkpoint(Seconds(100))

    dstreams.foreachRDD((rdd, mtime) => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val stu = new scala.collection.mutable.HashMap[String, Long]
      for (elem <- offsetRanges) {
        val topic = elem.topic
        val offset: Long = elem.untilOffset
        val partition = elem.partition
        //同时保存偏移量到zk
       //zk_Client.client("comsumer1", topic, offset, partition)
      }

      rdd.foreachPartition(partiton => {
        partiton.foreach(p => {
          //即可获取rdd中的值
          val value: String = p._2
          print("value")
          print(value)
        })
      })
    })
  
    ssc
  }
2.1.3:手动提交偏移量到zookeeper(ZkClient)+checkpoint
关闭自动提交,手动提交到zookeeper

查询结果
可以消费

消费者组查看:可以查看消费者组,但是不能查看消费者组的偏移量信息,缺失ids目录信息,可登陆zk的客户端查看,确实缺少,可以写入偏移量时手动创建该目录进行使用,zookeeper的保存偏移量也可以用工具进行偏移量导出(见kafka权威指南)
监控工具:不可查看

zookeeper提交代码

package com.desheng.bigdata.spark.scala.zookeeperUtils

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient

object zk_Client {
  val zkQuorum = "hdp01:2181,hdp02:2181,hdp03:2181"

  def client(groupid: String, topic: String, untilOffset: Long, partition: Int): Unit = {
    val zkClient = new ZkClient(zkQuorum)
    //创建一个 ZKGroupTopicDirs 对象,其实是指定往zk中写入数据的目录,用于保存偏移量
    val topicDirs = new ZKGroupTopicDirs(groupid, topic)
    //获取zookeeper中偏移量保存的路径
    val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
    print(zkTopicPath)
    val zkSavePath = s"${topicDirs.consumerOffsetDir}/${partition}"
    if (!zkClient.exists(zkSavePath)) {
      print("path not exists,create path to write data")
      zkClient.createPersistent(zkSavePath, untilOffset)
    } else {
      print("exists")
      zkClient.writeData(zkSavePath, untilOffset)
    }
  }
}

main代码


 dstreams.foreachRDD((rdd, mtime) => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val stu = new scala.collection.mutable.HashMap[String, Long]
      for (offset <- offsetRanges) {
        val count: Long = offset.count()
        val topic = offset.topic
        val untilOffset: Long = offset.untilOffset
        val partition = offset.partition
        //同时保存偏移量到zk
        zk_Client.client("comsumer1", topic, untilOffset, partition)
      }
      rdd.foreachPartition(partiton => partiton.map(
        line => {
          val value = line._1
          print("key : "+line._1+" vlaue: "+line._2)
          println()
        }

      ))
    })
2.1.4:手动提交到kafka:checkpoint+kafka(kafkaManager)

和2.1.5:手动提交到kafka2:重写kafkamanager一样

KafkaCluster类用于建立和Kafka集群的链接相关的操作工具类,我们可以对Kafka中Topic的每个分区设置其相应的偏移量Map((topicAndPartition, offsets.untilOffset)),然后调用KafkaCluster类的setConsumerOffsets方法去更新Zookeeper里面的信息,这样我们就可以更新Kafka的偏移量,最后我们就可以通过KafkaOffsetMonitor之类软件去监控Kafka中相应Topic的消费信息
关闭自动提交,手动提交偏移量到kafka
消费者组查看:可以看到,偏移量不可查看
kafka监控软件:可以查看
 dstreams.foreachRDD((rdd, mtime) => {
      //提交偏移量
      val kc = new KafkaCluster(kafkaParams)
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val stu = new scala.collection.mutable.HashMap[String, Long]
      for (offset <- offsetRanges) {

        val topic = offset.topic
        val untilOffset: Long = offset.untilOffset
        val partition = offset.partition
        val topicAndPartition = new TopicAndPartition(topic, partition);
        val map = scala.collection.mutable.Map[TopicAndPartition, Long]()
        map += (topicAndPartition -> untilOffset)
        val immap: Map[TopicAndPartition, Long] = map.toMap
        kc.setConsumerOffsets(kafkaParams.get("group.id").toString, immap)
      }
      rdd.foreachPartition(partiton => {
        partiton.foreach(p => {
          //即可获取rdd中的值
          val value: String = p._2
          print("value")
          print(value)
        })
      })

    })
2.1.5:手动提交到kafka2:重写kafkamanager
结果:
消费者组不可查看,可监控

需要重写KafkaManager类,提交偏移量时调用,引入该类,创建对象进行调用

package com.desheng.bigdata.spark.scala.kafkaUtils


import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils}
import org.slf4j.LoggerFactory

import scala.reflect.ClassTag

class KafkaManager (val kafkaParams: Map[String, String]) extends Serializable{
  private val logger =LoggerFactory.getLogger(KafkaCluster.getClass)
  private val kc = new KafkaCluster(kafkaParams)

  /** 需要自己重载这个方法。以下是该方法的说明:https://github.com/apache/spark/blob/v1.6.0/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
    * Create an input stream that directly pulls messages from Kafka Brokers
    * without using any receiver. This stream can guarantee that each message
    * from Kafka is included in transformations exactly once (see points below).
    *
    * Points to note:
    *  - No receivers: This stream does not use any receiver. It directly queries Kafka
    *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
    *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
    *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
    *    You can access the offsets used in each batch from the generated RDDs (see
    *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
    *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
    *    in the [[StreamingContext]]. The information on consumed offset can be
    *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
    *  - End-to-end semantics: This stream ensures that every records is effectively received and
    *    transformed exactly once, but gives no guarantees on whether the transformed data are
    *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
    *    that the output operation is idempotent, or use transactions to output records atomically.
    *    See the programming guide for more details.
    *
    * @param ssc StreamingContext object
    * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
    *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
    *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
    *   host1:port1,host2:port2 form.
    *   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
    *   to determine where the stream starts (defaults to "largest")
    * @param topics Names of the topics to consume
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    * @tparam KD type of Kafka message key decoder
    * @tparam VD type of Kafka message value decoder
    * @return DStream of (Kafka message key, Kafka message value)
    */
  def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag]
  ( ssc: StreamingContext, kafkaParams: Map[String, String],
    topics: Set[String]
  ): InputDStream[(K, V)] =  {

    val groupId = kafkaParams.get("group.id").get
    // 在zookeeper上读取offsets前先根据实际情况更新offsets
    setOrUpdateOffsets(topics, groupId)

    //从zookeeper上读取offset开始消费message
    val messages = {
      val partitionsE = kc.getPartitions(topics)
      if (partitionsE.isLeft)
        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft)
        throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
      val consumerOffsets = consumerOffsetsE.right.get
      KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
    }
    messages
  }

  /**
    * 创建数据流前,根据实际消费情况更新消费offsets
    *
    * @param topics   topics
    * @param groupId  consumer group id
    */
  private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
    topics.foreach(topic => {
      var hasConsumed = true
      val partitionsE = kc.getPartitions(Set(topic))
      if (partitionsE.isLeft)
        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft) hasConsumed = false
      if (hasConsumed) {// 消费过
        /**
          * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
          * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
          * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
          * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
          * 这时把consumerOffsets更新为earliestLeaderOffsets
          */
        val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
        if (earliestLeaderOffsetsE.isLeft)
          throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
        val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
        val consumerOffsets = consumerOffsetsE.right.get

        // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
        var offsets: Map[TopicAndPartition, Long] = Map()
        consumerOffsets.foreach({ case(tp, n) =>
          val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
          if (n < earliestLeaderOffset) {
            logger.warn("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
              " offsets已经过时,更新为" + earliestLeaderOffset)
            offsets += (tp -> earliestLeaderOffset)
          }
        })
        if (!offsets.isEmpty) {
          kc.setConsumerOffsets(groupId, offsets)
        }
      } else {// 没有消费过
      val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
        var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
        if (reset == Some("smallest")) {
          val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft)
            throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsets = leaderOffsetsE.right.get
        } else {
          val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft)
            throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsets = leaderOffsetsE.right.get
        }
        val offsets = leaderOffsets.map {
          case (tp, offset) => (tp, offset.offset)
        }
        kc.setConsumerOffsets(groupId, offsets)
      }
    })
  }

  /**
    * 更新zookeeper上的消费offsets
    * @param rdd rdd
    */
  def updateZKOffsets(rdd: RDD[(String, String)]) : Unit = {
    val groupId = kafkaParams.get("group.id").get
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    for (offsets <- offsetsList) {
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
      logger.warn("update offset ..................................................")
      if (o.isLeft) {
        logger.warn(s"Error updating the offset to Kafka cluster: ${o.left.get}")
      }
    }
    logger.warn("end  update offset ..................................................")
  }

}

测试demo

package com.desheng.bigdata.spark.scala.streaming.p3.extactly

import com.desheng.bigdata.spark.scala.kafkaUtils.KafkaManager
import kafka.common.TopicAndPartition
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils}

object savaOffsetToZK {
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
  Logger.getLogger("org.spark-project").setLevel(Level.WARN)
  //可本地可hdfs
  val checkpointPath = "file:///C:\\Users\\Administrator.SC-201905261418\\Desktop\\bigdata开发实例\\checkpoint7"


  def main(args: Array[String]): Unit = {
    val context = StreamingContext.getOrCreate(checkpointPath, createSSC)
    context.start()
    context.awaitTermination()
  }

  def createSSC(): StreamingContext = {
    val conf = new SparkConf().setAppName("local test checkpoint").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))
    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> "hdp01:9092,hdp02:9092,hdp03:9092",
      "group.id" -> "comsumer2",
      //smallest从偏移量最早的位置开始读取,开发多用此配置
      //本次demo即保存在checkpoint中
      "auto.offset.reset" -> "smallest",
      "enable.auto.commit" -> "false",
      //此时我们相当于在消费数据,指定反序列数据的方式,实现org.apache.kafka.common.serialization.Deserializer的类
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")
    val topic = Set("spark01")
    ssc.checkpoint(checkpointPath)
    //dstreams是一个rdd的集合rdds
    val dstreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
    //配置持久定时写到checkpoint,参数建议:5-10倍的batch
    dstreams.checkpoint(Seconds(100))

    dstreams.foreachRDD((rdd, mtime) => {
      rdd.saveAsTextFile("file:///C:\\Users\\Administrator.SC-201905261418\\Desktop\\bigdata开发实例\\spark\\localFile")
      //测试调用
      val kafkaManager = new KafkaManager(kafkaParams)
      kafkaManager.updateZKOffsets(rdd)
      
      println("foreach rdd ")
      rdd.foreachPartition(partiton => {
        partiton.foreach(p => {
          //即可获取rdd中的值
          print("k "+p._1+" v "+p._2)
        })
      })

    })
    //获取到数据lines
    val lines = dstreams.map(_._2)
    lines.print()
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    println("result " + wordCounts.print())
    ssc
  }
}

结果

2.2kafka0.10+sparkstreaming偏移量管理

kafka0.10偏移量管理测试

新版本消费者提供了新的手动提交到kafka的api,不用通过创建kafkacluster进行提交
commitSync和commitAsync同步,异步提交

2.2.1:手动提交kafka+checkpoint

kafka监控工具和shell命令均可以查看

手动提交代码如下

  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  Dstream.asInstanceof[CanCommitOffsets].commitAsync(offsetRanges )

3:kafka0.8版本的生产者和消费者测试

kafka的生产者和消费者监控测试
可以查看
可以进行查看

4:kafka的监控工具

若只需要监控功能,推荐使用KafkaOffsetMonito,若偏重Kafka集群管理,推荐使用Kafka Manager。

4.1:自行管理

4.1.1zookeepero中的偏移量导出

导出的为消费的偏移量位置不是最新的偏移量位置,不过该脚本0.11版本后预计取消,注意自己的版本变化

./kafka-run-class.sh kafka.tools.ExportZkOffsets --group comsumer2 --zkconnect hdp01:2181,hdp02:2181,hdp03:2181 --output ./offsets

4.2:KafkaOffsetMonitor

程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。

4.3:Kafka Manager

偏向Kafka集群管理,若操作不当,容易导致集群出现故障。对Kafka实时生产和消费消息是通过JMX实现的。没有记录Offset、Lag等信息。
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐