spark离线批处理写入kafka调优
采用老版本spark(1.6)在离线批处理环境下,将DataFrame快速写入kafka,通常可以查到的大部分都是针对Spark-Streaming进行Kafka写入的说明,但是在离线批处理环境下,也希望将批量计算的DataFrame中的数据直接写入到某个topic中,在spark2.0以上,可以直接采用spark-streaming的写入方式,只要用read 代替 resdStream 和用 w
采用老版本spark(1.6)在离线批处理环境下,将DataFrame快速写入kafka,通常可以查到的大部分都是针对Spark-Streaming进行Kafka写入的说明,但是
在离线批处理环境下,也希望将批量计算的DataFrame中的数据直接写入到某个topic中,在spark2.0以上,可以直接采用spark-streaming的写入方式,只要
用read 代替 resdStream 和用 write代替 writeStream 即可。
例如,引入以下同步kafka的包
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
然后可以按以下方式写入kafka
package com.sparkbyexamples.spark.streaming.batch
import org.apache.spark.sql.SparkSession
object WriteDataFrameToKafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
val data = Seq (("iphone", "2007"),("iphone 3G","2008"),
("iphone 3GS","2009"),
("iphone 4","2010"),
("iphone 4S","2011"),
("iphone 5","2012"),
("iphone 8","2014"),
("iphone 10","2017"))
val df = spark.createDataFrame(data).toDF("key","value")
/* since we are using dataframe which is already in text, selectExpr is optional. If the bytes of the Kafka records represent UTF8 strings, we can simply use a cast to convert the binary data into the correct type. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") */
df.write
.format("kafka")
.option("kafka.bootstrap.servers","192.168.1.100:9092")
.option("topic","text_topic")
.save()
}
}
读取:
package com.sparkbyexamples.spark.streaming.batch
import org.apache.spark.sql.SparkSession
object ReadDataFromKafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("https://SparkByExamples.com")
.getOrCreate()
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.100:9092")
.option("subscribe", "text_topic")
.load()
df.printSchema()
val df2 = df.selectExpr("CAST(key AS STRING)",
"CAST(value AS STRING)","topic")
df2.show(false)
}
}
然而,当前应用的环境并不是spark2.0而是老版本spark1.6,这个版本是无法按此方式进行写入的。
因此,开始的时候尝试直接定义producer进行遍历写入:
dstream.foreachRDD { rdd =>
rdd.foreach { message =>
val producer = createKafkaProducer()
producer.send(message)
producer.close()
}
}
这种写入会产生较大问题,因为每条数据都要创建producer和kafka进行连接,因此执行效率非常差,每秒只能写入几十条数据,显然无法工作。
然而又无法将producer连接定义到foreach外,因为spark 执行过程中需要经过序列化的变量,如果外部变量没有序列化,foreach内部是无法调用的。
尝试增加注解:@transient,但是仍然报没有序列化的错误。
这个错误的详细说明,在streaming的官方文档里有详细的描述,可以用foreachPartition 循环来优化,将producer定义到partion的循环体中,然后发送每个partition的数据,
如下所示:
dataframe.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val producer = createKafkaProducer()
partitionOfRecords.foreach { message =>
connection.send(message)
}
producer.close()
}
}
这样处理后,效率有非常大的提升,可以达到4000条/秒,基本可以支持业务了。
当然还有更优的解决方案,经过查找,发现有将连接定义成序列化的类,利用lazy和广播实现更好的性能,示例如下:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}
object KafkaSink {
def apply(config:Properties ): KafkaSink = {
val f = () => {
new KafkaProducer[String, String](config)
}
new KafkaSink(f)
}
}
使用时:
val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
dstream.foreachRDD { rdd =>
rdd.foreach { message =>
kafkaSink.value.send(topicname,message)
}
}
经过以上改造,发送topic的速度已经飙升到7000条/秒了。
参考文章:https://allegro.tech/2015/08/spark-kafka-integration.html
更多推荐
所有评论(0)