Spark消费Kafka的两种方式
在这里插入代码片
·
介绍
kafka版本,kafka0.8支持Receiver和Direct
Kafka版本大于等于0.10.0,且Spark版本大于等于Spark 2.3.0,应使用spark-streaming-kafka-0-10,
Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets
- 这个版本好像不支持Receiver方式
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.2</version>
</dependency>
<!--spark-streaming-kafka-plugin-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.2</version>
</dependency>
Receiver方式
code
// 1、Kafka配置
// 配置zookeeper集群、消费者组
val kafkaParams = Map(
"zookeeper.connect" -> "localhost:2181",
"group.id" -> groupID)
// 2、topic_name与numThreads的映射
// topic有几个partition,就写几个numThreads。
// 每个partition对应一个单独线程从kafka取数据到Spark Streaming
val topics = Map(topicName -> numThreads)
// 3、ReceiverInputDStream
// 注意:应先import kafka.serializer.StringDecoder再import org.apache.spark.streaming._
val kafkaStream= KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_AND_DISK_SER_2)
原理
Spark Streaming启动时,会在Executor中同时启动Receiver异步线程用于从Kafka持续获取数据,数据保存在Executor中,Executor挂了会导致数据丢失
- Spark Streaming启动时,会在Executor中同时启动Receiver异步线程用于从Kafka持续获取数据
- 获取的数据先持久化在Receiver中(存储方式由StorageLevel决定,一般是内存+磁盘)
- 当Batch Job触发后,这些数据会被转移到剩下的Executor中被处理
- 处理完毕后,Receiver会自动更新Zookeeper中的Offset
这个是以前记笔记写的,我现在感觉这个需要验证,到底是什么时候提交的offset
如何保证数据不丢失
接受数据不会立即更更新offset,而是等到持久化结束后更新offset
但是,如何节点挂了,持久化到内存的数据就丢失了
生产下,为保证数据完全不丢失,一般需要启用WAL(Write Ahead Log)预写日志机制,需要配置WAL,spark.streaming.receiver.writeAheadLog.enable,将Receiver收到的数据再备份一份到更可靠的系统如HDFS分布式文件中,以冗余的数据来换取数据不丢失
虽然WAL可以确保数据不丢失,它并不能对所有的数据源保证exactly-once语义,这就是数据重复问题
但是会导致数据重复问题
虽然WAL可以确保数据不丢失,它并不能对所有的数据源保证exactly-once语义
- 接收器接收到输入数据,并把它存储到WAL中
- 接收器在更新Zookeeper中Kafka的偏移量之前突然挂掉了
- Spark Streaming假设输入数据已成功收到(因为它已经写入到WAL中)
- 然而Kafka认为数据被没有被消费,因为相应的偏移量并没有在Zookeeper中更新
- 那些被保存到WAL中但未被处理的数据被重新读取
- 一旦从WAL中读取所有的数据之后,接收器开始从Kafka中消费数据
- 因为接收器是采用Kafka的High-Level Consumer API实现的,它开始从Zookeeper当前记录的偏移量开始读取数据,但是因为接收器挂掉的时候偏移量并没有更新到Zookeeper中,所有有一些数据被处理了2次
优点
- Receiver底层实现中使用了Kafka高级消费者API,因此,不需要自己管理Offset,只需指定Zookeeper和消费者组GroupID,系统便会自行管理
缺点
- 启用WAL机制虽然可以防止数据丢失,但是浪费存储空间也影响效率,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制会对数据复制一份,而这里又会复制一份到WAL中
- 采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度
- 会出现重复消费问题
- 需要使用单独的Receiver线程来异步获取Kafka数据。
- 需要专门的Receivers来读取Kafka数据且不参与计算,浪费Executor
- 由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费
- Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,实时任务一直在增加,而Receiver则在一直接收数据,但是计算不过来,导致数据堆积
Direct(No Receiver)方式
code
// 1、Kafka配置
// auto.offset.reset=latest 无提交的offset时,从最新的开始消费
// enable.auto.commit=false 禁用后台自动提交offset,自己手动管理
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"group.id" -> groupID)
// 2、DirectKafkaInputDStream
// LocationStrategies:本地策略。为提升性能,可指定Kafka Topic Partition的消费者所在的Executor。
// LocationStrategies.PreferConsistent:一致性策略。一般情况下用这个策略就OK。将分区尽可能分配给所有可用Executor。
// LocationStrategies.PreferBrokers:特殊情况,如果Executor和Kafka Broker在同一主机,则可使用此策略。
// LocationStrategies.PreferFixed:特殊情况,当Kafka Topic Partition负荷倾斜,可用此策略,手动指定Executor来消费特定的Partition.
// ConsumerStrategies:消费策略。
// ConsumerStrategies.Subscribe/SubscribePattern:可订阅一类Topic,且当新Topic加入时,会自动订阅。一般情况下,用这个就OK。
// ConsumerStrategies.Assign:可指定要消费的Topic-Partition,以及从指定Offset开始消费。
val kafkaStream=KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](List(topicName),kafkaParams)
)
特点
- 为了解决由WAL引入的性能损失,并且保证 exactly-once 语义,不需要使用单独的Receiver线程从Kafka获取数据
- 使用Kafka简单消费者API,不需要ZooKeeper参与,直接从Kafka Broker获取数据
- 当action真正触发时才会去kafka里接数据,需要计算时再读取数据
- 为保证整个应用安全性, Offset管理一般需要借助外部存储实现。如Mysql、HBase等
优点
- 不需要单独的Receiver线程从Kafka获取数据,所有的Executor都参与计算。所以相同的资源申请,Direct方式能够支持更大的业务
- Receiver方式持续不断接收数据,业务量大时需要提高内存,但是Executor计算用不到这么多内存,Direct方式节省了这部分内存,只需要考虑批量计算所需要的内存即可,实际应用中我们可以把原先的10G降至现在的2-4G左右。
- 当action真正触发时才会去kafka里接数据,需要计算时再读取数据,实时任务堆积时,不会导致数据堆积
- Spark Streaming会创建和Kafka Topic Partition一样多的RDD Partition,而且是一对一的映射关系,这样,就可以并行读取,大大提高了性能
- Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次
- 不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复
缺点
- 提高开发成本。Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本
- 提高监控成本,没有监控可视化。Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发。
更多推荐


所有评论(0)