SparkStreaming 读取kafka中数据(spark2.3.2)
@羲凡——只为了更好的活着SparkStreaming 读取kafka中数据(spark2.3.2)流处理一般都会涉及到kafka,所以用SparkStreaming读取kafka中数据是流处理的必备技能之一。1.先在pom.xml文件中添加${spark.version} 即你的spark的版本,我spark是spark 2.3.2。我kafka是kafka_2.11-0.10.2.2...
·
@羲凡——只为了更好的活着
SparkStreaming 读取kafka中数据(spark2.3.2)
流处理一般都会涉及到kafka,所以用SparkStreaming读取kafka中数据是流处理的必备技能之一。
1.先在pom.xml文件中添加
${spark.version} 即你的spark的版本,我spark是spark 2.3.2。我kafka是kafka_2.11-0.10.2.2
<!--kafka-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
<!--Kafka 0.10 Source For Structured Streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
2.实例代码:
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies,KafkaUtils,LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object TestKafkaDirectStreamHA {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("TestDirectStreamHA")
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val checkpointDir = "/aarontest/sparkstreaming/checkpoint/TestDirectStreamHA"
def creatingFunc(): StreamingContext = {
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(checkpointDir)
val topic = Set("aarontest")
val kafkaParams = Map(
"bootstrap.servers" -> "deptest104:9092,deptest105:9092,deptest108:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "KafkaDirectStreamHA",
//"auto.offset.reset" -> "latest",//从最新的开始消费,如果送指定消费offsets者需要去掉此行
"enable.auto.commit" -> (false: java.lang.Boolean))
val offsets = Map(
new TopicPartition("aarontest", 0) -> 10L,
new TopicPartition("aarontest", 1) -> 10L,
new TopicPartition("aarontest", 1) -> 10L
)
val directKafkaDStream = KafkaUtils
.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topic, kafkaParams, offsets))
.map(_.value())
val resultDStream = directKafkaDStream
.mapPartitions(iter => {
iter.filter(_.length > 0).filter(_.nonEmpty).flatMap(_.split(" ")).map((_, 1))
}).reduceByKey(_ + _)
resultDStream.print()
ssc
}
val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
ssc.start()
ssc.awaitTermination()
}
}
3.结果展示:
-------------------------------------------
Time: 1551682105000 ms
-------------------------------------------
-------------------------------------------
Time: 1551682110000 ms
-------------------------------------------
(successed,1)
(spark,1)
(read,1)
(kafka,1)
-------------------------------------------
Time: 1551682115000 ms
-------------------------------------------
4.注意事项:
a.使用checkpoint保障sparkstreaming的高可用,即在代码停止运行后重启,任能继续消费kafka数据
b.如果kafka中不产生数据,代码任继续输出时间戳
c.auto.offset.reset表示从最新的开始消费,如果指定消费offsets则不需要此参数
====================================================================
@羲凡——只为了更好的活着
若对博客中有任何问题,欢迎留言交流
更多推荐
已为社区贡献3条内容
所有评论(0)