「官文译」Spark 结构Streaming-2.1.1 + Kafka 集成指南 (Kafka broker version 0.10.0 or higher)
创建Kafka源(Batch批次)源中的每一行都具有以下模式:Each row in the source has the following schema:ColumnTypekeybinaryvaluebinarytopicstringpartitioni
注:
Spark Streaming + Kafka集成指南
Apache Kafka是作为分布式,分区,复制的提交日志服务的发布订阅消息。在开始使用Spark集成之前,请仔细阅读Kafka文档。
Kafka项目在0.8和0.10之间引入了新的消费者api,所以有两个单独的相应的Spark Streaming包可用。请为您的经纪人选择正确的包装和所需功能; 请注意,0.8集成与以后的0.9和0.10经纪人兼容,但0.10集成与早期经纪人不兼容。
火花流 - 卡夫卡0-8 | 火花流 - 卡夫卡0-10 | |
---|---|---|
经纪人版 | 0.8.2.1以上 | 0.10.0以上 |
Api稳定性 | 稳定 | 试验 |
语言支持 | Scala,Java,Python | Scala,Java |
接收器DStream | 是 | 没有 |
直接DStream | 是 | 是 |
SSL / TLS支持 | 没有 | 是 |
偏移提交Api | 没有 | 是 |
动态主题订阅 | 没有 | 是 |
创建Kafka源(Batch批次)
Each row in the source has the following schema:
Column | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
必须为(Batch处理)和(streaming queries.流式查询)的Kafka源设置以下选项。
Option | value | meaning |
---|---|---|
assign (分配) | json string {"topicA":[0,1],"topicB":[2,4]} (json字符窜) | Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (具体主题分配消费。只能为Kafka源指定“assign”,“subscribe”或“subscribePattern”选项之一。) |
subscribe (订阅) | A comma-separated list of topics (以逗号分隔的主题列表) | The topic list to subscribe. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (要订阅的主题列表。只能为Kafka源指定“assign”,“subscribe”或“subscribePattern”选项之一。) |
subscribePattern (如上2选1) | Java regex string (Java正则表达式字符串) | The pattern used to subscribe to topic(s). Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. (用于订阅主题的模式。只能为Kafka源指定“assign”,“subscribe”或“subscribePattern”选项之一。) |
kafka.bootstrap.servers | A comma-separated list of host:port (主机:端口的逗号分隔列表) | The Kafka "bootstrap.servers" configuration.bootstrap(引导程序).servers" configuration. (Kafka“bootstrap.servers”配置。) "bootstrap.servers" -> "localhost:9092,anotherhost:9092" |
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", //,anotherhost:9092 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) )
以下配置是可选的:
选项 | 值 | 默认 | 查询类型 | 含义 |
---|---|---|---|---|
startingOffsets | "earliest", "latest" (streaming only), “最早”,“最新”(仅限流媒体) 或json string“”“{”topicA“:{”0“:23,”1“: - 1},”topicB“:{”0“: - 2} }“”“ | “最新”流式传输,“最早”批量 | 流和批 | 查询开始时的起点,从最早的偏移“最早”开始,最新的“最新”,即最新偏移量, 或指定每个TopicPartition的起始偏移量的json字符串。 在json中,-2作为偏移可以用来指最早的,-1表示最新的。 注意:对于批处理查询,不允许使用最新的(隐式的或通过使用-1)。 对于流式查询,这仅在新查询启动时适用,并且恢复将始终从查询离开的地方获取。查询期间新发现的分区最早开始。 |
endingOffsets | 最新或者json string {“topicA”:{“0”:23,“1”: - 1},“topicB”:{“0”: - 1}} | latest | 批次查询 | 批处理查询结束时的终点,即最新的最新参数,或者指定每个TopicPartition的结束偏移量的json字符串。在json中,-1作为偏移量可以用来指代最新的,而-2(最早的)作为偏移是不允许的。 |
failOnDataLoss | true or false | true | 流式查询 | 是否可能丢失数据时失败查询(例如,主题被删除或偏移超出范围)。这可能是一个虚惊。当您无法正常工作时,您可以禁用它。如果由于丢失的数据而无法从提供的偏移中读取任何数据,批量查询将始终失败。 |
kafkaConsumer.pollTimeoutMs | long | 512 | 流和批 | 以毫秒为单位的执行者轮询Kafka数据的超时时间。 |
fetchOffset.numRetries | INT | 3 | 流和批 | 在放弃获取Kafka抵消之前重试的次数。 |
fetchOffset.retryIntervalMs | long | 10 | 流和批 | 毫秒级别,然后重试以获取Kafka抵消 |
maxOffsetsPerTrigger | long | none | 流和批 | 每个触发间隔处理的最大偏移量的速率限制。指定的总偏移量将按不同卷的topicPartitions按比例分割。 |
卡夫卡自己的配置可以通过设置DataStreamReader.option
与kafka.
前缀,例如 stream.option("kafka.bootstrap.servers", "host:port")
。有关可能的kafkaParams,请参阅 Kafka消费者配置文档。
请注意,以下Kafka参数无法设置,Kafka源将抛出异常:
- group.id:Kafka源将自动为每个查询创建唯一的组ID。
"group.id" -> "use_a_separate_group_id_for_each_stream",
- auto.offset.reset:设置source选项
startingOffsets
以指定从哪里开始。结构化流式管理哪些偏移量在内部消耗,而不是依靠kafka消费者来做。当新的主题/分区被动态订阅时,这将确保不会丢失任何数据。请注意,startingOffsets
只有在启动新的流式查询时才适用,并且恢复将始终从查询离开的地方获取。"auto.offset.reset" -> "latest",
- key.deserializer:使用ByteArrayDeserializer的键始终反序列化为字节数组。使用DataFrame操作显式反序列化键。
"key.deserializer" -> classOf[StringDeserializer],
- value.deserializer:值始终使用ByteArrayDeserializer反序列化为字节数组。使用DataFrame操作来显式反序列化值。
"value.deserializer" -> classOf[StringDeserializer],
- enable.auto.commit:Kafka源不提交任何偏移量。
"enable.auto.commit" -> (false: java.lang.Boolean)
- interceptor.classes:Kafka源总是读取键和值作为字节数组。使用ConsumerInterceptor是不安全的,因为它可能会中断查询。
部署
与任何Spark应用程序一样,spark-submit
用于启动应用程序。spark-sql-kafka-0-10_2.11
并且其依赖性可以直接添加到spark-submit
使用中--packages
,例如,
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 ...
有关提交具有外部依赖关系的应用程序的更多详细信息,请参阅申请提交指南。
更多推荐
所有评论(0)