使用spark-streaming-kafka-0-10_2.11-2.0.0依赖包创建kafka输入流
object DirectStream {def main(args: Array[String]): Unit = {//创建SparkConf,如果将任务提交到集群中,那么要去掉.setMaster("local[2]")val conf = new SparkConf().setAppName("DirectStream").setMaster(&qu
·
object DirectStream {
def main(args: Array[String]): Unit = {
//创建SparkConf,如果将任务提交到集群中,那么要去掉.setMaster("local[2]")
val conf = new SparkConf().setAppName("DirectStream").setMaster("local[2]")
//创建一个StreamingContext,其里面包含了一个SparkContext
val streamingContext = new StreamingContext(conf, Seconds(5))
//配置kafka的参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node-1.xiaoniu.com:9092,node-2.xiaoniu.com:9092,node-3.xiaoniu.com:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test123",
"auto.offset.reset" -> "earliest", // lastest
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("xiaoniu")
//在Kafka中记录读取偏移量
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
//位置策略(可用的Executor上均匀分配分区)
LocationStrategies.PreferConsistent,
//消费策略(订阅固定的主题集合)
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//迭代DStream中的RDD(KafkaRDD),将每一个时间点对于的RDD拿出来
stream.foreachRDD { rdd =>
//获取该RDD对于的偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//拿出对应的数据
rdd.foreach{ line =>
println(line.key() + " " + line.value())
}
//异步更新偏移量到kafka中
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
streamingContext.start()
streamingContext.awaitTermination()
}
}
项目依赖
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile (group: 'org.apache.spark', name: 'spark-core_2.10', version:'2.1.0')
compile (group: 'org.apache.spark', name: 'spark-streaming_2.10', version:'2.1.0')
compile group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.10', version: '2.2.0'
}
详见官网:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets
更多推荐
已为社区贡献3条内容
所有评论(0)