大数据系列之Spark Streaming和Structured Streaming对比
本文对Spark Streaming和Structured Streaming在流模型、API使用、时延性能以及和Kafka对接等方面进行了对比
本文对Spark Streaming和Structured Streaming在流模型、API使用、时延性能以及和Kafka对接等方面进行了对比,如下表所示:
1、流模型
- Spark Streaming
Spark Streaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。Spark Streaming从各种输入源中读取数据,并把数据分组为小的批次,新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。时间区间的大小是由批次间隔这个参数决定的,批次间隔一般设在500毫秒到几秒之间,由应用开发者配置。每个输入批次都形成一个RDD,以Spark作业的方式处理并生成其他的RDD。处理的结果可以以批处理的方式传给外部系统。
- Structured Streaming
Structured Streaming中的关键思想是将实时数据流当作可以连续追加的表,这样可以将流计算以静态表的方式进行处理。
在Structured Streaming中将input data stream作为“input table”,对input table查询会生成一个“result table”。在每次查询时候,新的记录会追加到input table中,同时也会更新到Result Table中,当result table更新的时候,这些更新的数据需要写到外部存储中。
Output有不同的模式:
- Complete mode:整个更新的result table写到外部存储
- Append mode:上一次写入以来新的追加数据写到外部存储
- Update mode:上一次写入以来新的更新数据写到外部存储
2、API使用
2.1 Spark Streaming API使用
1)Input Streaming
Spark Streaming有两种内置的Streaming源:
- Basic source:StreamingContext API可用的源,比如文件系统、socket连接
- Advanced source:比如kafka、flume等
2)Output输出
使用foreachRDD设计模式,通过维护一个静态的对象连接池,在多个RDDs/batches之间重用连接,降低消耗:
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
注:连接池中的连接按需创建,并且一段时间不使用时会超时,实现了将数据发送到外部系统的最有效方法。
3)RDD算子操作
- Transform操作
允许任意的RDD-RDD函数应用于DStream,例如,可以通过将输入数据流与预先计算的垃圾信息(也可以使用Spark生成)进行实时数据清理,然后基于此进行过滤。
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
- Window操作
Spark Streaming支持窗口计算,这样允许在滑动创建进行转换,如下图所示:
当窗口滑过源DStream,源RDDs被组合产生窗口DStream的RDDs。Windows需要2个参数:窗口长度和滑动间隔。
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
- Join操作
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
2.2 Structured Streaming API使用
Structured Streaming代码编写完全复用Spark SQL的 batch API,也就是对一个或者多个 stream或者table进行query。Query的结果是result table,可以以多种不同的模式(append, update, complete)输出到外部存储中。
1)Input输入
Structured Streaming输入源有以下:
- File source:以文件作为输入流数据,支持txt、CSV、JSON等格式
- Kafka Source:从Kafka中读取数据
- Socket source:从socket连接中读取UTF8 text数据
- Rate Source
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
.readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
2)Output输出
Streaming计算完成后,需要使用DataStreamWriter 写入到外部存储中,指定以下参数:
- Output sink的信息如Data format、location等
- Output mode,append、complete还是update
- Query name:query的名称
- Trigger interval:指定trigger间隔,如果没有指定,会在之前数据处理完成后立即触发
- Checkpoint location:对于容错要求高的,指定checkpoint写入的位置
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "updates")
.start()
3)Streaming dataframe操作
- 聚合操作
df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
# Select the devices which have signal more than 10
df.select("device").where("signal > 10")
# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
- Window操作
如下图所示,统计10分钟窗口、5分钟更新的流数据
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
- JOIN操作
Structured Streaming支持Streaming和Data Frame之间的join操作
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") # right outer join with a static DF
3、时延对比
Spark Streaming是基于micro-batch模式,在处理实时流数据的时候存在process-time和event-time窗口时延。
Structured Streaming的continuous mode是实时处理的,只要一有数据就会进行处理,时延基本在毫秒级别。
4、对接Kafka数据
4.1 Spark Streaming对Kafka支持
参考“大数据系列之Spark接入Kafka数据”
1)读取Kafka数据
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
2)发送数据到Kafka
Spark Streaming写入数据到Kafka是通过调用KafkaProducer模块实现的
from kafka import KafkaProducer
to_kafka = KafkaProducer(bootstrap_servers=broker_list)
to_kafka.send(topic_name,send_msg,encode(‘utf8’))
to_kafka.flush()
4.2 Structured Streaming对Kafka支持
1)从Kafka中读取数据,并将二进制流数据转为字符串
# Construct a streaming DataFrame that reads from topic1
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.112.101:2181 ") \
.option("subscribe", "kafka_topic") \
.option("startingOffsets", "earliest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
2)使用Spark作为Producer发送Kafka数据
# Write key-value data from a DataFrame to a Kafka topic specified in an option
query = df \
.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers","192.168.112.101:2181 ") \
.option("topic", " kafka_topic ") \
.option("checkpointLocation", "/path/to/HDFS/dir") \
.start()
5、与其它流引擎比较
Structured Streaming和其它流计算引擎比较,在时延性能、API简洁、聚类计算等方面都具有一定的优势。
参考资料
- http://spark.apache.org/docs/latest/streaming-programming-guide.html
- http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
- https://www.slidestalk.com/u2909/FromSparkStreamingtoStructuredStreaming58639
- https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
- https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
转载请注明原文地址:https://blog.csdn.net/solihawk/article/details/116607167
文章会同步在公众号“牧羊人的方向”更新,感兴趣的可以关注公众号,谢谢!
更多推荐
所有评论(0)