@羲凡——只为了更好的活着

SparkStreaming 读取kafka中数据(spark2.3.2)

流处理一般都会涉及到kafka,所以用SparkStreaming读取kafka中数据是流处理的必备技能之一。

1.先在pom.xml文件中添加

${spark.version} 即你的spark的版本,我spark是spark 2.3.2。我kafka是kafka_2.11-0.10.2.2

<!--kafka-->
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
	<version>${spark.version}</version>
	<scope>compile</scope>
</dependency>
<!--Kafka 0.10 Source For Structured Streaming-->
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
	<version>${spark.version}</version>
	<scope>compile</scope>
</dependency>
2.实例代码:
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies,KafkaUtils,LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TestKafkaDirectStreamHA {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("TestDirectStreamHA")
      .master("local[*]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    val checkpointDir = "/aarontest/sparkstreaming/checkpoint/TestDirectStreamHA"

    def creatingFunc(): StreamingContext = {
      val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
      ssc.checkpoint(checkpointDir)
      val topic = Set("aarontest")
      val kafkaParams = Map(
        "bootstrap.servers" -> "deptest104:9092,deptest105:9092,deptest108:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "KafkaDirectStreamHA",
        //"auto.offset.reset" -> "latest",//从最新的开始消费,如果送指定消费offsets者需要去掉此行
        "enable.auto.commit" -> (false: java.lang.Boolean))
      val offsets = Map(
        new TopicPartition("aarontest", 0) -> 10L,
        new TopicPartition("aarontest", 1) -> 10L,
        new TopicPartition("aarontest", 1) -> 10L
      )
      val directKafkaDStream = KafkaUtils
        .createDirectStream(ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topic, kafkaParams, offsets))
        .map(_.value())
      val resultDStream = directKafkaDStream
        .mapPartitions(iter => {
          iter.filter(_.length > 0).filter(_.nonEmpty).flatMap(_.split(" ")).map((_, 1))
        }).reduceByKey(_ + _)
      resultDStream.print()
      ssc
    }

    val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
    ssc.start()
    ssc.awaitTermination()
  }
}
3.结果展示:
-------------------------------------------
Time: 1551682105000 ms
-------------------------------------------

-------------------------------------------
Time: 1551682110000 ms
-------------------------------------------
(successed,1)
(spark,1)
(read,1)
(kafka,1)

-------------------------------------------
Time: 1551682115000 ms
-------------------------------------------
4.注意事项:

a.使用checkpoint保障sparkstreaming的高可用,即在代码停止运行后重启,任能继续消费kafka数据
b.如果kafka中不产生数据,代码任继续输出时间戳
c.auto.offset.reset表示从最新的开始消费,如果指定消费offsets则不需要此参数

====================================================================

@羲凡——只为了更好的活着

若对博客中有任何问题,欢迎留言交流

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐