Sparak-Streaming基于Offset消费Kafka数据
Sparak-Streaming基于Offset消费Kafka数据
·
Sparak-Streaming基于Offset消费Kafka数据
1、官方提供消费kafka的数据实例
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}
官网链接:
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
核心思想就是在,创建kafkastream同时获取偏移量,消费完的同时,执行更新操作。
2、自定义操作kafka数据的offset的manager
这里注意,需要把类,放在 org.apache.spark.streaming.kafka下面,与源码操作的包一致
package org.apache.spark.streaming.kafka
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import scala.reflect.ClassTag
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.StreamingContext
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
private val kc = new KafkaCluster(kafkaParams)
private val flag = 1150 * 10000l
/**
* 创建数据流
* @param ssc
* @param kafkaParams
* @param topics
* @tparam K
* @tparam V
* @tparam KD
* @tparam VD
* @return
*/
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = {
val groupId = kafkaParams.get("group.id").get
// 在zookeeper上读取offsets前先根据实际情况更新offsets
setOrUpdateOffsets(topics, groupId)
//从zookeeper上读取offset开始消费message
val messages = {
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
messages
}
/**
* 创建数据流前,根据实际消费情况更新消费offsets
* @param topics
* @param groupId
*/
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
if (hasConsumed) { // 消费过
/**
* 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
* 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
* 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
* 这时把consumerOffsets更新为earliestLeaderOffsets
*/
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get
// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({
case (tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已经过时,更新为" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else { // 没有消费过
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
} else {
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
}
})
}
/**
* 更新zookeeper上的消费offsets
* 把当前的消费记录,写入zk
*
* @param rdd
*/
def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
val groupId = kafkaParams.get("group.id").get
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
/**
* 更新zookeeper上的消费offsets
* 把当前的消费记录的offset往前推
* 并写入zk
*
* @param rdd
* @param day
*/
def updateZKOffsetsFromoffsetRanges(offsetRanges: Array[OffsetRange], day: Double): Unit = {
val groupId = kafkaParams.get("group.id").get
for (offsets <- offsetRanges) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
var offsetStreaming = 0l
println("offsets.untilOffset " + offsets.untilOffset)
/**
* 如果streaming挂掉,则从偏移量的前flag开始计算
* 由于在streaming里的window函数中进行了去重处理
* 因此不用担心数据重复的问题
*/
if (offsets.untilOffset >= flag) {
offsetStreaming = offsets.untilOffset - (flag * day).toLong
} else {
offsetStreaming = 0
}
println("offsetStreaming " + offsetStreaming)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsetStreaming)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
}
1) 从哪里结束消费,再从那里开始继续
2) 从结束消费向前移动偏移量的操作,然后再重新消费
3、使用案例
val conf = new SparkConf().setAppName("NewsTopNRealRankOffset")//.setMaster("local[3]");
conf.set("spark.streaming.blockInterval", "50ms");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.storage.memoryFraction", "0.4") //executor分配给缓存的内存比例,默认为0.6即60%,剩下40%为task运行的内存,实际上40%是偏小的
conf.set("spark.locality.wait", "6000") //6000毫秒
conf.set("spark.streaming.kafka.maxRatePerPartition", "35000") // 限制每秒钟从topic的每个partition最多消费的消息条数
//shuffle优化
conf.set("spark.shuffle.consolidateFiles", "true")
conf.set("spark.reducer.maxSizeInFlight", "150m")
conf.set("spark.shuffle.file.buffer", "128k")
conf.set("spark.shuffle.io.maxRetries", "8")
conf.set("spark.shuffle.io.retryWait", "6s")
conf.set("spark.shuffle.memoryFraction", "0.3")
val sc = new SparkContext(conf);
val ssc = new StreamingContext(sc, Seconds(25));
//缓存的数据
ssc.remember(Minutes(60 * 24 * 2));
val sqlContext = new HiveContext(sc);
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
//1.注册UDF
val udf = UDFUtils();
udf.registerUdf(sqlContext);
//2.kafka数据处理
val kafkaServiceOffset = KakfaServiceOffset();
val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092";
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder",
"group.id" -> "tracklogPrdNewsTopN_TrackLogT", "auto.offset.reset" -> "largest"); //largest smallest
val topics = Set("TrackLogT");
//通过自定义的KafkaManager获取kafka数据源
val km = new KafkaManager(kafkaParams);
val kafkaStream = km.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics);
var offsetRanges = Array[OffsetRange]()
//创建kafkastream同时获取偏移量
val kafkaStreamOffset = kafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
val urlClickLogPairsDStream = kafkaServiceOffset.kafkaDStreamForNewsOffset(ssc, kafkaStreamOffset);
//3.缓存hive中的数据
val cacheUtils = CacheUtils();
cacheUtils.cacheHiveData(sqlContext);
//4.缓存窗口函数数据处理
val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Seconds(3600 * 35),
Seconds(250));
//5.处理业务逻辑
urlClickCountsDStream.foreachRDD(urlClickCountsRDD => {
val urlClickCountRowRDD = urlClickCountsRDD.map(tuple => {
val datetime = tuple._1.split("\001")(0) + " " + tuple._1.split("\001")(1);
val cookieid = tuple._1.split("\001")(2);
val url = tuple._1.split("\001")(3);
val artId = tuple._1.split("\001")(4);
val click_count = tuple._2;
Row(datetime, cookieid, url, artId, click_count);
});
val structType = StructType(Array(
StructField("datetime", StringType, true),
StructField("cookieid", StringType, true),
StructField("url", StringType, true),
StructField("artId", StringType, true),
StructField("click_count", IntegerType, true)));
val categoryProductCountDF = sqlContext.createDataFrame(urlClickCountRowRDD, structType);
categoryProductCountDF.registerTempTable("url_click_log_day");
//消费完的同时,更新操作
km.updateZKOffsetsFromoffsetRanges(offsetRanges, 1);
cacuDayTopN(sqlContext);
cacuHourTopN(sqlContext);
});
//6.启动streaming任务
ssc.start();
ssc.awaitTermination();
注意,设置"group.id" ,即为在kafka队列的偏移量标志在zookeeper中的名称。
更多推荐
已为社区贡献2条内容
所有评论(0)