1 概述

Spark Streaming是Spark核心API的一个扩展,对于实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。Spark Streaming可以从
kafka、flume、Twitter、 ZeroMQ、Kinesis、TCP套接字等数据源获取数据,同时可以用由high-level函数(如map、reduce、join、window等)组成的复杂算法处理数据。最后,处理后的数据可以被推到文件系统、数据库和现场仪表盘。实际上,你可以将处理后的数据应用到Spark的机器学习算法、 图处理算法中去。

这里写图片描述

它的工作原理图如下。Spark Streaming接收实时的输入数据流,然后将这些数据切分为批数据(batch)供Spark引擎(Spark engine)处理,Spark引擎将数据处理后生成最终的结果集,结果集仍然为batch。

这里写图片描述

Spark Streaming提供一个高层次的抽象,叫做离散流(discretized stream)或者DStream,它代表一个连续的数据流。DStream可以通过
如Kafka, Flume, and Kinesis等数据源的输入数据流创建,也可以在其他DStream上进行高级操作创建。在内部,DStream由一系列的RDDs组成。

这篇指南将会指导你如何利用DStream开启Spark Streaming的编程。用户可以用Scala、Java或者Python编写Spark Streming的程序,在官方文档对着三种语言都介绍了,本文翻译重点介绍Java语言。

2 一个快速的例子之HelloWorld

在我们深入研究如何编写自己的Spark Streaming程序的细节之前,让我们快速地看一下一个简单的Spark Streaming程序是什么样子的。在这个Hello world版本的例子的,程序从监听TCP套接字的服务器获取文本数据,让后统计单词的数目。其做法如下:

首先,我们创建一个JavaStreamingContext对象,它是Spark所有流操作的主要入口。我们创建一个具有两个执行线程以及1秒批间隔时间(即以秒为单位分割数据流)的本地StreamingContext。

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))

利用这个上下文,我们能够创建一个DStream,它表示从TCP套接字数据源(如host为localhost,端口为9999)获取的流数据

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

这个名为lines的DStream表示即将从数据服务器接收的数据流。在这流中每一行记录是一行文本。然后,我们要把这行文本分割为一个个的单词。

// Split each line into words
JavaDStream<String> words = lines.flatMap(
  new FlatMapFunction<String, String>() {
    @Override public Iterable<String> call(String x) {
      return Arrays.asList(x.split(" "));
    }
  });

flatmap是一个DStream操作,它将源DStream中的的每条记录都生成多条新记录来创建一个新的DStream。在这个例子中,每一行文本被分割成多个单词,这些被分割形成的单词流用words DStream表示。需要注意的是,我们用了一个FlatMapFunction对象定义了一个转换操作。在接下来的讲解中,我们将会发现在Java API中有很多遍历方别的类来定义DStream的转换操作。

接下来,来统计这些单词:

// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(
  new PairFunction<String, String, Integer>() {
    @Override public Tuple2<String, Integer> call(String s) {
      return new Tuple2<String, Integer>(s, 1);
    }
  });
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
  new Function2<Integer, Integer, Integer>() {
    @Override public Integer call(Integer i1, Integer i2) {
      return i1 + i2;
    }
  });

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();

通过PairFunction 对象,words 这个DStream被mapper(一对一转换操作)成了一个新的DStream,新的DStream由(word,1)对组成。
然后,通过使用Function2对象,我们就可以用这个新的DStream计算每批数据的单词频率。最后,我们用 wordCounts.print() 打印每秒计算的词频。

需要注意的是,当上述的代码被执行时,Spark Streaming只是准备好了它要执行的运算,实际上并没有执行,只有当所有的转换操作( transformation)准备好后,最后调用start()方法,才正在执行计算。

jssc.start();              // Start the computation
jssc.awaitTermination();   // Wait for the computation to terminate

完整的代码请参考NetworkWordCount(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala

如果你已经下载和构建了Spark的环境,你姐可以用如下的方法运行这个HelloWorld的实例。首先你需要运行netcat作为数据服务器通过使用如下的命令:

$ nc -lk 9999

然后,在不同的终端,你可以通过如下方式运行例子

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999

然后,每隔一秒在netcat server上输入的每行信息将会计算和打印在终端上。就像下面的这样:

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world



...
# TERMINAL 2: RUNNING JavaNetworkWordCount

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

3 基本的概念

接下来,我们在这个简单的例子基础上开始阐述Spark Streaming的基础知识。

3.1 关联

与Spark类似,Spark Streaming也可以利用maven仓库。编写自己的Spark Streaming程序的时候,你需要引入下面的依赖到自己的Maven项目中。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

如何你想从像Kafka、Flume、Kinesis这些不在Spark核心API的数据源获取数据时,你你必须添加相应的spark-streaming-xyz_2.10依赖。

这里写图片描述

为了获取最新的列表,请访问Apache repository(http://search.maven.org/

3.2 初始化StreamingContext

为了初始化Spark Streaming程序,必须创建一个StreamingContext对象,它是Spark Streaming所有流操作的主要入口。StreamingContext 对象可以用SparkConf对象创建。

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration(1000));

参数appName是应用程序显示在集群UI上的名称。master 是一个Spark、Mesos、YARN集群URL 或者一个表示程序用本地模式运行的特殊字符串”local[ * ]” 当程序运行在集群中时,你并不希望在程序中将master硬编码,而是希望用 spark-submit启动应用程序,并从 spark-submit 中得到 master 的值。然而,在本地测试或者单元测试的时候,你可以传递”local[*]”来运行Spark。需要注意的是,它在内部创建了一个JavaSparkContext对象,你可以通过 ssc.sparkContext访问这个JavaSparkContext对象。

批时间片(batch interval)需要根据你的程序的潜在需求以及集群的可用资源来设定,具体的详细信息可以参考性能调优那一节。

一个JavaStreamingContext对象也可以从现有的JavaSparkContext创建。

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

当定义好一个上下文之后,接下来,你需要做几件事

  1. 定义输入源;
  2. 定义流计算指令(DStream的转化操作和输出操作);
  3. 通过使用StreamingContext.start()开始接收和处理数据;
  4. 使用StreamingContext.awaittermination()(手动或由于任何错误)等待处理停止;
  5. 利用StreamingContext.stop() 手动停止处理;

几点需要注意的地方:

  • 一旦一个context已经启动,就不能有新的流算子建立或者是添加到context中;
  • 一旦一个context已经停止,它就不能再重新启动;
  • 在一个JVM中,同一时间只能有一个StreamingContext是活跃状态的;
  • 在StreamingContext上调用 stop()方法,SparkContext对象也会stop;如果只想关闭StreamingContext对象,只需将stop()方法里面的stopSparkContext参数设置为false;
  • 一个SparkContext对象可以重复利用去创建多个StreamingContext对象,但是,前提条件是前面的StreamingContext在后面
    StreamingContext创建之前关闭(不关闭SparkContext);

3.3 离散流(DStreams)

离散流或者DStreams是Spark Streaming提供的基本的抽象。它代表一个连续的数据流,它是是从数据源中获取的输入流,或者是输入流通过转换算子生成的处理后的数据流。在内部,DStreams有一系列RDDs组成(RDD是Spark中分布式数据集的抽象)。DStreams中的每个RDD都
包含确定时间间隔内的数据,如下图所示:

这里写图片描述

任何对DStream的操作都会转化为对DStream中RDD的操作。举个例子,在前面的例子中, flatMap 操作应用于 lines 这个DStreams的每个RDD,生成 words 这个DStreams的 RDD,如下图所:

这里写图片描述

Spark引擎将计算这些隐含RDD的转换算子。DStreams操作隐藏了大部分的细节,并且为了更便捷,为开发者提供了high-level的API。下面几节将具体讨论这些操作的细节。

3.4 输入DStreams和接收器(Receivers)

输入DStreams代表从数据源接收的输入数据流的dstreams。在上述Helloworld例子中, lines 表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入DStream与一个Recevier对象相关,Recevier从数据源获取数据,将其存储在Spark内存中用于处理。

Spark Streaming提供两类数据源:

  • 基本源:这些源在StreamingContext API中直接可用。例如文件系统、套接字连接、Akka的actor等。
  • 高级源:像Kafka、Flume、Kinesis、Twitter等可以通过额外的工具类来使用,我们在3.1节一节讨论了这些类的依赖

接下来,我们将介绍这两种类型数据源中一部分数据源。

需要注意的是,如果你想在一个流应用中并行地接收多个数据流,你需要创建多个输入DStream(这将在性能调优那一节介绍) 。它将创建多个Receiver同时接收多个数据流。但是注意的是,Spark worker/executor是一个长期运行的任务,因此,recevier占据了分配给Spark Streaming应用的一个核。所以,要为Spark Streaming应用程序分配足够的
核(如果是本地运行,那么是线程) 去处理接收到的数据并且运行 receiver 是非常重要的。

几点需要注意的地方:

  • 当在本地运行一个Spark Streaming程序时,不要使用“local”或“local[1]”作为master URL,这两个都意味着只有一个线程来运行本地任务。如果你使用的是基于一个Recevier的输入DStream,然后单线程将运行Recevier,将没有线程用于处理接收的数据。因此,当在本地运行时,使用”local[n]”作为master URL,其中n大于运行的Recevier数量。
  • 分配给Spark Streaming应用程序的核的数量必须大于Receivers的数量,否则,系统只能够接收数据而不能处理它们。

基本源

我们已经在HelloWorld的例子中看到, ssc.socketTextStream(…) 方法用来把从TCP套接字获取的文本数据创建成DStream。除了TCP 套接字,StreamingContext API也支持把文件 以及Akka actors作为输入源创建DStream。

1 文件流

从任何与HDFS API兼容的文件系统中读取数据,一个DStream可以用如下方式创建

streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

Spark Streaming将会监控 dataDirectory 目录,并且处理目录下生成的任何文件(嵌套目录不被支持)。需要注意的是:

1 所有文件具有相同的数据格式;
2 所有文件必须在"*dataDirectory*"目录下创建,文件是自动的移动和重命
  名到数据目录下
3 一旦移动,文件必须被修改。如果文件被持续的追加(append)数据,新的数
  据不会被读取。

对于简单地文本文件,有更加简单的方法:

streamingContext.textFileStream(dataDirectory)

文件流不需要运行一个receiver,所以不需要分配核。

2 基于自定义actor的流

DStream可以调用如下方法从Akka actors获取的数据流来创建,具体信息参考自定义Recevier指南(http://blog.csdn.net/ouyang111222/article/details/50414621

streamingContext.actorStream(actorProps, actor-name)

3 RDD队列作为数据流

为了用测试数据测试Spark Streaming应用程序,人们可以用如下的方法创建基于RDDs队列的DStream。

streamingContext.queueStream(queueOfRDDs)

每个push到队列的RDD都被 当做DStream的批数据,像流一样处理。

高级源

这种类型的数据源需要非Spark库接口,并且它们中的部分还需要复杂的依赖。因此,为了减少依赖的版本冲突的相关问题,从这些源创建DStream的功能已经被移到了独立的库中,你可以在3.1节查看详细的细节。例如,如果你想用来自推特的流数据创建DStream,你需要按照如下步骤操作:

  1. 关联:添加spark-streaming-twitter_2.10到maven项目
  2. 编程:导入TwitterUtils 类,利用TwitterUtils.createStream创建DStream
  3. 部署:将编写的程序以及其所有的依赖(包括spark-streaming-twitter_2.10的依赖以及它的传递依赖)打为jar包,然后
    部署。这在部署章节将会详细的讲解。
import org.apache.spark.streaming.twitter.*;

TwitterUtils.createStream(jssc);

需要注意的是,这些高级的源在 spark-shell 中不能被使用,因此基于这些源的应用程序无法在shell中测试。如果你想在spark-shell中使用,你必须下载相应的jar包并将其添加到class path中。

下面介绍一些高级数据源:

  • Kafka:Spark Streaming 1.5.2能够从kafka 0.8.2.1中获取数据,可以查看kafka集成指南了解详细信息;
  • Flume:Spark Streaming 1.5.2能够从flume 1.6.0中获取数据,可以查看flume集成指南了解详细信息;
  • Kinesis:Spark Streaming 1.5.2能够从 Kinesis Client Library
    1.2.1中获取数据,查看Kinesis集成指南了解详细信息;
  • Twitter:Spark Streaming 1.5.2 利用 Twitter4j 3.0.3获取公共的推特流,这些推文通过推特流API获得。认证信息可以通过Twitter4J库支持的任何方法获得。你既能够得到公共流,也能够得到基于关键字过滤后的流。

自定义的源

当然,输入DStream也可以通过自定义源创建,你需要做的是实现用户自定义的 receiver ,这个 receiver 可以从自定义源接收数据以及将数据推到Spark中。你可以通过自定义receiver指南了解详细信息(http://blog.csdn.net/ouyang111222/article/details/50414621)。

Receiver可靠性

基于可靠性有两类数据源。像Kafka、flume数据源允许确认/应答发送的数据。如果从这些可靠的源获取数据的系统能够正确的应答所接收的数据,它就能够确保在任何情况下不丢失数据。这样,就有两种类型receiver:

1 可靠的接收器:对于可靠的消息来源,允许发送的数据被确认,一个可靠的接收器正确地确认数据被接收器接收同时被可靠地存储在spark中。通常,实现可靠的接收器需仔细考量消息确认的语义。

2 不可靠的接收器:不可靠的接收器不向数据源发送确认信息。它可用于不支持确认机制的数据源,或者那些可靠的数据源但是我们不需要其使用复杂的确认机制。

详细信息请参考自定义Receiver指南(http://blog.csdn.net/ouyang111222/article/details/50414621

3.5 DStream中的转换(Transformations )

和RDD类似,转换允许从输入DStream来的数据被修改。DStreams支持很多在RDD中可用的转换。一些常用的转换(算子)如下所示:

这里写图片描述

其中一些转换需要重点讨论一下。

UpdateStateByKey操作

updateStateByKey操作允许保持任意状态,同时允许不断用新信息更新它。如果你想使用它,你需要做两步:

  1. 定义状态-状态可以是任意的数据类型。
  2. 定义状态更新函数-指定一个函数如何使用之前的状态和一个输入流的新值来更新状态

在每一batch中,无论batch中是否有新数据,对于所有存在的key,Spark都会应用状态更新函数。如果更新函数返回None,该键值对将被消除。

现在来举个例子说明。你想保持一个文本数据流中每个单词的运行次数,运行次数用一个state表示,它的类型是整数,我们定义如下的更新函数:

import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
      Integer newSum = ...  // add the new values with the previous running count to get the new count
      return Optional.of(newSum);
    }
  };

这个函数被用到了DStream包含的单词上

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

更新函数将会被每个单词调用, newValues 拥有一系列的1(从 (word, 1)而来),*runningCoun*t记录之前的次数。要看完整的
代码,参考JavaStatefulNetworkWordCount.java.(https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java

需要注意的是,使用updateStateByKey要求 配置checkpoint目录, 这个这将会在checkpointing 章节详细讨论。

Transform操作

transform 操作允许在DStream运行任何RDD-to-RDD函数。它能够被用来应用任何没有在DStream API中提供的RDD操作。例如,连接数据流中的每个batch和另外一个数据集的功能并没有在DStream API中提供,然而你可以简单的利用 transform 方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流 来清理实时数据,然后基于它们过滤,你可以按如下方法来做:

import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
  new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
    @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
      rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
      ...
    }
  });

需要注意的是,在每一个批处理时间间隔中调用所提供的函数。这使得你可以做时变的RDD操作,也就是说,RDD操作、分区的数量、广播变量等都可以在batch之间改变。

Window 操作
Spark Streaming也支持窗口计算,它允许你在一个滑动窗口数据上应用转换(transformation)。下图阐明了这个滑动窗口:

这里写图片描述

如图显示,窗口在源DStream上滑动,合并和操作作用于窗内的源RDDs,产生窗口DStream的RDDs。在这个具体的例子中,程序在3个时间单元的数据上进行窗口操作,并且每2个时间单元滑动一次。 这说明,任何一个窗口操作都需要指定两个参数:

  • 窗口长度:窗口的持续时间
  • 滑动的时间间隔:窗口操作执行的时间间隔

这两个参数必须是batch时间的倍数。

下面举例说明窗口操作。你想扩展前面的HelloWorld例子用来计算过去30秒的词频,间隔时间是10秒。为了达到这个目的,我们必须在过去30秒的 pairs DStream上应用 reduceByKey 操作。用方法reduceByKeyAndWindow 实现。

// Reduce function adding two integers, defined separately for clarity
Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
  @Override public Integer call(Integer i1, Integer i2) {
    return i1 + i2;
  }
};

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));

下面是一些常用的窗口操作,这些操作都需要用到上文提到的两个参数:窗口长度和滑动的时间间隔

这里写图片描述

Join操作

最后,值得强调的是你可以轻松地在你的Spark Streaming程序中执行不同种类的join操作。

(1)流与流join(Stream-stream join)

JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);

在这里,在每个batch的时间间隔,stream1产生的RDD 会和stream2产生的RDD 连接(join)。你也可以做左外连接(leftOuterJoin)、右外连接(rightOuterJoin)、全外连接(fullOuterJoin)。此外,做窗口流的join非常有用,也很容易。

JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);

(2) 流与数据集join(Stream-dataset joins)

在之前解释 DStream.transform 操作的时候已经展示了流与数据集的join。这里有另外一个关于窗口Stream与数据集的join。

JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(
    new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
        @Override 
        public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
            return rdd.join(dataset);
        }
    }
);

此外,你可以动态的改变你要join的数据集。

3.6 DStream上的输出操作

输出操作允许DStream上的操作被push到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发了DStream转换的实际执行。目前,定义了下面几种输出操作:

这里写图片描述

3.7 利用foreachRDD的设计模式

dstream.foreachRDD是一个强大的原语,它允许数据被发送到外部系统中。然而,明白怎样正确地、有效地用这个原语是非常重要。下面几点介绍了如何避免一般错误:

向外部系统写数据经常需要创建一个连接对象(如到远程服务器的TCP连接),同时用它发送数据到远程系统。举个例子,开发人员可能不经意的在Spark驱动中创建一个连接对象,但在Spark worker中尝试调用这个连接对象保存记录,如下(用scala):

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这些连接对象很少在机器间传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该 在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。

然而,这可能导致另外一个常见的错误—为每一个记录创建一个连接对象,如下:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

通常,创建一个连接对象有资源和时间的开销。因此,为每个记录创建和销毁连接对象会导致非常高的开销,同时极大的降低了系统的整体吞吐量。一个更好的解决办法是利用 rdd.foreachPartition 方法。 为RDD的分区(partition)创建一个连接对象,用这个对象发送分区内的所有记录。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

这就将连接对象的创建开销分摊到分区的所有记录上了。最后,可以通过在多个RDD或者batch间重用连接对象做更进一步的优化。开发者可以持有一个静态的连接池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

需要注意的是,池中的连接对象应该根据需要懒创建,且在空闲一段时间后自动超时。这样就实现了最有效的方式将数据发送到外部系统。

其他几点需要注意的:

  • 输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,DStream上的输出操作中的RDD action推动了接收数据的处理。因此,如果你的应用程序没有任何输出操作或者 用于输出操作 dstream.foreachRDD() 中没有任何RDD action操作,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。
  • 默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。

3.8 DataFrame and SQL 操作
你可以在流数据上很容易使用DataFrame 和 SQL 操作。可以通过SparkContext创建SQLContext。此外,它可以在驱动故障时实现重启。下面的例子是上述helloworld例子的修改版本,它使用了DataFrames 和SQL进行word count。每一个RDD都转化为一个DataFrame,DataFrame被注册成一个临时表,之后使用SQL进行查询。

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
  private String word;

  public String getWord() {
    return word;
  }

  public void setWord(String word) {
    this.word = word;
  }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ... 

words.foreachRDD(
  new Function2<JavaRDD<String>, Time, Void>() {
    @Override
    public Void call(JavaRDD<String> rdd, Time time) {

      // Get the singleton instance of SQLContext
      SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());

      // Convert RDD[String] to RDD[case class] to DataFrame
      JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
        public JavaRow call(String word) {
          JavaRow record = new JavaRow();
          record.setWord(word);
          return record;
        }
      });
      DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);

      // Register as table
      wordsDataFrame.registerTempTable("words");

      // Do word count on table using SQL and print it
      DataFrame wordCountsDataFrame =
          sqlContext.sql("select word, count(*) as total from words group by word");
      wordCountsDataFrame.show();
      return null;
    }
  }
);

你可以在不同的线程上对定义流数据的表使用SQL查询(即异步运行StreamingContext)。只要确保你设置StreamingContext记得大量的流数据以确保运行查询。否则StreamingContext,无法感知任何异步的SQL查询,它将会在查询完成前删除旧的数据流。例如,如果你想要查询最后的batch,但是你的查询5分钟才运行一次,因此,需要调用这样的设置:streamingContext.remember(Minutes(5))

3.9 缓存/持久化

和RDD相似,DStreams也允许开发者持久化流数据到内存中。在DStream上使用 persist() 方法可以自动地持久化DStream中的RDD到内存中。如果DStream中的数据需要计算多次,这种做法将极大地提升新能。像 reduceByWindow 和 reduceByKeyAndWindow 这种窗口操作、 updateStateByKey 这种基于状态的操作,默认是持久化的,不需要开发者调用 persist() 方法。

例如通过网络(如kafka,flume等)获取的输入数据流,默认的持久化策略是复制数据到两个不同的节点以容错。

注意,与RDD不同的是,DStreams默认持久化级别是存储序列化数据到内存中,这将在性能调优章节介绍,更多的信息请看rdd持久化。

3.10 Checkpointing

一个流应用程序必须全天候运行,因此必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming需要检查点(checkpoint)足够的信息到容错存储系统中, 以使系统从故障中恢复。

1 Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中。这用来恢复应用程序中运行worker的节点的故障。元数据包括:

配置 :创建Spark Streaming应用程序的配置信息
DStream 操作 :定义Streaming应用程序的操作集合
未完成的batches:操作存在队列中的未完成的批

2 Data checkpointing:保存生成的RDD到可靠的存储系统中。这在有状态转换(transformation)(如结合跨多个批次的数据)中是必须的。
在这些transformation中,依赖之前的RDD生成的RDD,随着时间的推移,这个依赖链的长度会持续增长。在恢复的过程中,为了避免这种无限增长,有状态的transformation的中间RDD将会定时地存储到可靠存储系统中,以截断这个依赖链。

总之,元数据checkpoint主要是为了从driver故障中恢复数据;如果状态transformation操作被用到了,数据checkpoint即使在简单的操作
中都是必须的。

何时启用Checkpointing

Spark Streaming应用程序在以下两种情况下必须开启checkpoint

  • 使用有状态的transformation:如果在应用程序中用到了updateStateByKey 或者reduceByKeyAndWindow ,checkpoint目录必需提供用以定期RDD checkpoint。
  • 从运行应用程序的driver的故障中恢复过来:使用元数据checkpoint恢复进度信息。

需要注意的是,没有前述的有状态的transformation的简单流应用程序在运行时可以不开启checkpoint。在这种情况下,从driver故障的恢复将是部分恢复(接收到了但是还没有处理的数据将会丢失)。这通常是可以接受的,许多运行的Spark Streaming应用程序都是这种方式。在将来将会支持非hadoop的环境。

怎样配置Checkpointing

在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。这可以使用如下的方式来实现:

streamingContext.checkpoint(checkpointDirectory)

这样做之后,将允许你使用之前介绍的 有状态transformation。另外,如果你想从driver故障中恢复,你应该以下面的方式重写你的Streaming应用程序:

  • 当应用程序是第一次启动,新建一个StreamingContext,启动所有Stream,然后调用 start() 方法;
  • 当应用程序因为故障重新启动,它将会根据checkpoint目录下的checkpoint数据重新创建StreamingContext;
// Create a factory object that can create a and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
    ...
    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
    return jssc;
  }
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start();
context.awaitTermination();

如果 checkpointDirectory 存在,上下文将会利用checkpoint数据重新创建。如果这个目录不存在,将会调用contextFactory函数创建一个新的上下文,建立DStreams。 请看NetworkWordCount例子。

除了使用getOrCreate方法,必须确保驱动在失败的时候自动重启。

值得注意的是,RDD的checkpointing有存储成本,这会导致批数据的处理时间增加。因此,需要小心的设置checkpointing的时间间隔。在最小的批容量(包含1秒的数据)情况下,checkpoint每批数据会显著的减少操作的吞吐量。相反,checkpointing太少会导致任务大小增大,这会产生不利的影响。因为有状态的transformation需要RDD checkpoint。默认的间隔时间是批间隔时间的倍数,最少10秒。它可以通过如下方法来设置:

 dstream.checkpoint(checkpointInterval).

典型的情况下,设置checkpoint间隔是DStream的滑动间隔的5-10大小是一个不错的选择。

3.11 部署应用程序

本章节将讨论Spark Streaming应用程序的部署步骤。

要求

运行一个Spark Streaming应用程序,必须做到以下几点:

  • 集群:这是任何Spark应用都必须的,详情请见部署指南。
  • 将应用打成jar包:编译你的程序将其打成jar,如果你用spark-submit启动应用程序,你不需要将Spark和Spark Streaming打包进这个jar。 如果你的应用程序用到了高级源(如kafka,flume),你需要将它们关联的外部artifact以及它们的依赖打包进需要部署的应用程序jar包中。例如,一个应用程序用到了 TwitterUtils ,那么就需要将 spark streaming-twitter_2.10 以及它的所有依赖打包到应用程序jar中。
  • 为executor配置足够的内存:因为接收的数据必须存储在内存中,executors必须配置足够的内存用来保存接收的数据。需要注意的是,如果你正在做10分钟的窗口操作,系统的内存要至少能保存10分钟的数据。所以,应用程序的内存需求依赖于使用它的操作。
  • 配置checkpointing: 如果stream应用程序需要checkpointing,然后一个与Hadoop API兼容的容错存储目录必须配置为checkpoint的目录,Spark Streaming应用程序将checkpoint信息写入该目录用于错误恢复。具体详情参考3.10节。
  • 配置应用程序driver的自动重启:为了自动从driver故障中恢复,运行Spark Streaming应用程序的部署设施必须能监控driver进程,如果
    失败了能够重启它。不同的集群管理器,有不同的工具可以实现该功能。

(1)Spark Standalone:一个Spark应用程序driver可以提交到Spark独立集群运行,也就是说driver运行在一个worker节点上。进一步来说,Standalone集群管理器能够被指示用来监控driver,并且在driver失败(或者是由于非零的退出代码如exit(1), 或者由于运行driver的节点的故障)的情况下重启driver。

(2)YARN:YARN为自动重启应用程序提供了类似的机制。

(3)Mesos: Mesos可以用Marathon提供该功能

  • 配置WAL(write ahead logs):为了获得强大的容错保证,自Spark 1.2之后,我们引入了预写日志(write ahead logs)。如果WAL开启了,从receiver获取的所有数据会将预写日志写入配置的checkpoint目录。 这可以防止driver故障丢失数据,从而保证零数据丢失。可以通过设置如下参数来开启WAL。
spark.streaming.receiver.writeAheadLogs.enable = true;

然而,这些较强的语义以receiver的接收吞吐量为代价,这可以通过并行运行多个receiver增加吞吐量来解决。另外,当WAL开启时,Spark中的复制数据的功能推荐不用,因为该日志已经存储在了一个副本在存储系统中。可以通过设置输入DStream的存储级别为 StorageLevel.MEMORY_AND_DISK_SER 获得该功能。

  • 设置最大的接收数据率:如果集群资源有限,不足以应对Spark Streaming应用程序处理数据,可以为接收器设置一个最大的接受率(记录/秒)。以下两个配置参数值得关注:
spark.streaming.receiver.maxRate 
spark.streaming.kafka.maxRatePerPartition

在Spark 1.5中,我们引入一个新的特征叫做backpressurebackpressure 消除了设置设置最大数据接收率的需求,取而代之的是Spark Streaming自动确定、计算速率限制并动态的调整它们。你可以通过如下的设置启动backpressure

spark.streaming.backpressure.enabled = true

升级应用程序代码

如果运行的Spark Streaming应用程序需要升级,有如下两种可能的方法:

  1. 启动升级的应用程序,使其与未升级的应用程序并行运行。一旦新的程序已经准备就绪,旧的应用程序就可以关闭。需要注意的是,这种方法支持将数据发送到两个不同的目的地(新程序一个,旧程序一个)
  2. 首先,平滑的关闭( StreamingContext.stop(…) 或JavaStreamingContext.stop(…) )现有的应用程序,平滑的关闭意味着在关闭之前,要保证已经接收的数据完全处理完。然后,就可以启动升级的应用程序,升级的应用程序会接着旧应用程序的点开始处理。这种方法仅支持具有源端缓存功能的输入源(如flume,kafka),这是因为当旧的应用程序已经关闭,升级的应用程序还没有启动的时候,数据需要被缓存。

3.12 监控应用程序

除了Spark的监控功能,Spark Streaming增加了一些专有的功能。当应用一个StreamingContext的时候,Spark web UI 显示添加的 Streaming 菜单,用以显示运行的receivers(receivers是否是存活状态、接收的记录数、receiver错误等)和完成的batch的统计信息(批处理时间、队列等待等待)。这可以用来监控Spark Streaming应用程序的处理进度。

在Web UI中有两个度量指标非常重要。

  • 处理时间(Processing Time):表示批数据处理的时间;
  • 调度延迟(Scheduling Delay ):表示前面的batch处理完毕之后,当前batch在队列中的等待时间。

    如果batch处理时间比batch间隔时间持续更长或者队列等待时间
    持续增加,这就表示系统处理数据的速度跟不上batch数据产生的速度,整个处理过程滞后了。在这种情况下,考虑减少批处理时间。

Spark Streaming程序的处理过程也可以通过StreamingListener接口来监控,这个接口允许你获得receiver状态和处理时间。注意,这个接口是开发者API,它有可能在未来提供更多的信息。

3.13 性能调优

集群中的Spark Streaming应用程序获得最好的性能需要一些调整与优化。这章将介绍几个参数和配置,提高Spark Streaming应用程序的性能。在更高层面,你需要考虑两件事情:

  1. 高效地利用集群资源减少批数据的处理时间;
  2. 设置正确的batch size,使数据的处理速度能够赶上数据的接收速度

减少batch的处理时间

在Spark中减少的batch处理时间的有很多的优化的方法,这些可以在优化指南中作了讨论,这节重点讨论几个重要的。

数据接收的并行度

通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。需要注意的是,每个输入DStream创建一个 receiver (运行在worker机器上) 接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个kafka输入DStream可以被切分为两个kafka输入流,
每个接收一个topic。这将在两个worker上运行两个receiver ,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作
可以运用在合并的DStream上。

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

另外一个需要考虑的参数是 receiver 的阻塞时间,阻塞时间由spark.streaming.blockInterval决定。

spark.streaming.blockInterval

对于大部分的 receiver,在存入Spark内存之前,接收的数据都被合并成
了一个大数据块。每批数据中块的个数决定了任务的个数。这些任务是用类似map的transformation操作接收的数据。每个recevier每个batch的任务数都相似。例如,如果block interva为200ms,那么每2个batch将会创建10个任务。如果任务数量非常少(也就是说,低于每一台机器的core),那么这将是低效的。对于一个给定的批处理间隔增加其任务数,减少block interva。然而,建议的最小值的块间隔是约50 ms,低于这个值任务重启开销可能是一个问题。

多输入流或者多receiver 的可选的方法是明确地重新分配输入数据流(利用 inputStream.repartition() ),在进一步操作之前,通过集群的机器数分配接收的批数据。

数据处理的并行水平

如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。例如,对于分布式reduce操作如 reduceByKeyreduceByKeyAndWindow ,默认的并发任务数通过配置属性来确定
spark.default.parallelism 。你可以通过参数(PairDStreamFunctions)
传递并行度,或者设置参数spark.default.parallelism 修改默认值。

数据序列化

通过优化序列化方法可以减少数据序列化的开销,在Spark Streaming中,有两类数据需要被序列化:

输入数据:默认情况下,通过receiver接受到的数据存储在executor的内存中采用storagelevel.memory_and_disk_ser_2。也就是说,数据序列化为字节减少GC开销,同时为了容忍executor故障数据被复制。此外,数据首先被保存到内存中,只有到内存不足以支撑所有必须的输入数据的操作时,才会溢出到磁盘上。这种序列化具有明显的开销—recevier必须对接收的数据进行反序列化,当用spark的序列化格式时需要重新的序列化。

Spark中RDD数据的序列化:Streaming操作产生的RDD可能需要持久化到内存。例如,窗口操作将数据持久化到内存,因为它们将被处理多次。然而,不像Spark Core默认的storagelevel.memory_only,Streaming操作产生的RDD持久化默认使用storagelevel.memory_only_ser去减少GC开销。

在这两种情况下,使用kryo序列化可以减少CPU和内存开销。参考Spark调优指南查看详细信息。对于kryo,考虑注册自定义类对象,并禁用引用跟踪(在配置指南查看kryo相关的配置)。

在特殊情况下,Streaming应用需要保留的数据量不是很大,持久化两种类型的数据都是可行的,因为反序列化对象没有产生过多的GC开销。例如,如果你的batch间隔就几秒,且没有窗口操作,你可以在通过显示的设置存储级别在数据持久化时禁用序列化。这将减少由于序列化带来的CPU开销,由于没有太多GC开销而提高性能。

任务重启开销

如果每秒钟启动的任务数是非常大的(50或者更多),发送任务到slave的花费明显,这使请求很难获得亚秒(sub-second)级别的反应。通过下面的改变可以减小开支:

  • 任务序列化:运行kyro序列化任何可以减小任务的大小,从而减小任务发送到slave的时间。
  • 执行模式:在Standalone模式下或者粗粒度的Mesos模式下运行Spark可以在比细粒度Mesos模式下运行Spark获得更短的任务启动时间。可以在Mesos下运行Spark中获取更多信息。

设置正确的batch间隔

为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据,换句话说,处理速度应该大于或等于接收数据的速度。这可以通过流的网络UI观察得到,批处理时间应该小于批间隔时间。

根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。例如,考虑 WordCountNetwork 这个例子(前面提到的HelloWorld例子),对于一个特定的数据处理速率,系统可能每2秒打印一次单词计数 (批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。

找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。为了验证你的系统是否能满足数据处理速率,你可以通过检查端到端的延迟值来判断(可以在 Spark驱动程序的log4j日志中查看”Total delay”或者利用StreamingListener口)。如果延迟维持稳定,那么系统是稳定的。如果延迟持续增长,那么系统无法跟上数据处理速率,是不稳定的。 你能够尝试着增加数据处理速率或者减少批容量来作进一步的测试。注意,因为瞬间的数据处理速度增加导致延迟瞬间的增长可能是正常的,只要延迟能重新回到了低值(小于批容量)。

内存优化

调整内存的使用以及Spark应用程序的垃圾回收行为已经在Spark优化指南中详细介绍。强烈推荐对Spark优化指南中进行阅读。在这一章节,我们重点介绍一下在Spark Streaming中特殊的优化参数。

Spark Streaming应用要求的集群内存严重依赖于转换类型的使用。例如,如果你想对最近10分钟的数据进行窗口操作,那么你的集群应该有足够大的内存来存储数据。如果你想对大量的key使用updateStateByKey操作,那么内存要求更大。相反,如果你只是做简单的map-filter-store操作,那么内存要求想对较低。

一般来说,由于通过receiver接收到的数据其存储采用storagelevel.memory_and_disk_ser_2,不适合在内存的数据会溢出到磁盘。这将降低streming应用程序的性能,因此建议为应用程序提供足够的内存。最好小规模下看看内存使用量小,并做好相应的估计。

内存优化的另一方面是垃圾回收。对于spark streaming应用程序来说要求低延迟,由JVM的垃圾回收导致的大停顿是不可取的。

这里有一下参数能更好的帮你优化内存使用和GC消耗。

(1)Default persistence level of DStreams:在数据序列化章节,输入数据和RDDs默认持久化为字节。与反序列化持久化想不,这减少了内存使用和GC负担。如果采用Kryo序列化将进一步减少序列化大小与内存使用。内存使用量的减少还可以通过压缩(Spark中配置spark.rdd.compress)实现,前提是消耗CPU时间。

(2)Clearing old data:默认情况下,所有的输入数据和DStream 转换产生的、被持久化的RDD会被自动的清理。Spark Streaming在转换的基础上决定何时清理数据。例如,如果你使用一个10分钟的窗口操作,Spark Streaming将会保持最后10分钟的数据,丢弃旧的数据。通过设置streamingContext.remember,数据可以被保留更长时间。

(3)CMS Garbage Collector:使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。尽管并发的垃圾回收会减少系统的整体吞吐量,但是仍然推荐使用它以获得更稳定的批处理时间。确保你在驱动(using –driver-java-options in spark-submit)和executors(using Spark configuration spark.executor.extraJavaOption)都设置了CMS GC。

ps:关于Spark Steaming的语义问题未纳入翻译的范畴

原文地址:https://spark.apache.org/docs/latest/streaming-programming-guide.html

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐