采用老版本spark(1.6)在离线批处理环境下,将DataFrame快速写入kafka,通常可以查到的大部分都是针对Spark-Streaming进行Kafka写入的说明,但是

在离线批处理环境下,也希望将批量计算的DataFrame中的数据直接写入到某个topic中,在spark2.0以上,可以直接采用spark-streaming的写入方式,只要

用read 代替 resdStream 和用 write代替 writeStream 即可。

例如,引入以下同步kafka的包

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
      <version>2.4.0</version>
</dependency>

然后可以按以下方式写入kafka

package com.sparkbyexamples.spark.streaming.batch
import org.apache.spark.sql.SparkSession
object WriteDataFrameToKafka {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    val data = Seq (("iphone", "2007"),("iphone 3G","2008"),
      ("iphone 3GS","2009"),
      ("iphone 4","2010"),
      ("iphone 4S","2011"),
      ("iphone 5","2012"),
      ("iphone 8","2014"),
      ("iphone 10","2017"))

    val df = spark.createDataFrame(data).toDF("key","value")
    /* since we are using dataframe which is already in text, selectExpr is optional. If the bytes of the Kafka records represent UTF8 strings, we can simply use a cast to convert the binary data into the correct type. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") */
    df.write
      .format("kafka")
      .option("kafka.bootstrap.servers","192.168.1.100:9092")
      .option("topic","text_topic")
      .save()
  }
}

读取:

package com.sparkbyexamples.spark.streaming.batch
import org.apache.spark.sql.SparkSession
object ReadDataFromKafka {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("https://SparkByExamples.com")
      .getOrCreate()

    val df = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.1.100:9092")
      .option("subscribe", "text_topic")
      .load()

    df.printSchema()

    val df2 = df.selectExpr("CAST(key AS STRING)", 
             "CAST(value AS STRING)","topic")
    df2.show(false)
  }
}

然而,当前应用的环境并不是spark2.0而是老版本spark1.6,这个版本是无法按此方式进行写入的。
因此,开始的时候尝试直接定义producer进行遍历写入:

dstream.foreachRDD { rdd =>
  rdd.foreach { message =>
    val producer = createKafkaProducer()
    producer.send(message)
    producer.close()
  }
}

这种写入会产生较大问题,因为每条数据都要创建producer和kafka进行连接,因此执行效率非常差,每秒只能写入几十条数据,显然无法工作。

然而又无法将producer连接定义到foreach外,因为spark 执行过程中需要经过序列化的变量,如果外部变量没有序列化,foreach内部是无法调用的。

尝试增加注解:@transient,但是仍然报没有序列化的错误。
这个错误的详细说明,在streaming的官方文档里有详细的描述,可以用foreachPartition 循环来优化,将producer定义到partion的循环体中,然后发送每个partition的数据,

如下所示:

dataframe.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      connection.send(message)
    }
    producer.close()
  }
}

这样处理后,效率有非常大的提升,可以达到4000条/秒,基本可以支持业务了。

当然还有更优的解决方案,经过查找,发现有将连接定义成序列化的类,利用lazy和广播实现更好的性能,示例如下:

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}

object KafkaSink {
  def apply(config:Properties ): KafkaSink = {
    val f = () => {
      new KafkaProducer[String, String](config)
    }
    new KafkaSink(f)
  }
}

使用时:

val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

dstream.foreachRDD { rdd =>
  rdd.foreach { message =>
    kafkaSink.value.send(topicname,message)
  }
}

经过以上改造,发送topic的速度已经飙升到7000条/秒了。

在这里插入图片描述

参考文章:https://allegro.tech/2015/08/spark-kafka-integration.html

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐