flink Datastream开发之kafka(source&sink)
本文介绍基于kafkasource和sink的flink Datastream开发,包括消费kafka消息和将结果sink到kafka中
Flink在流处理中常见的Source
Flink在流处理中常见对的Source 和在批处理中的source基本一致.大致分为4大类
1.基于本地集合的source(Collection-based-source)
2.基于文件的source(File-based-source):
- 读取文本文件,既符合TextInPutFormat规范的文件,并将其作为字符串返回;
3.基于网络套接字的source(Socket-base-source):
- 从 socket 读取。元素可以用分隔符切分;
4.自定义的source(Custom-source)
从官网可以看到目前支持的数据源
基于kafka数据源的source操作
Flink 提供的 Kafka 连接器,用于向 Kafka 主题读取或写入数据。Flink Kafka Consumer集成了Flink 的检查点机制,可提供一次性处理语义。为实现这一目标,Flink 并不完全依赖 kafka的消费者群体偏移跟踪,而是在内部跟踪和检查这些偏移。
不同版本的兼容
各个版本实现类关系图
构造函数参数详解:
1. topic:
- kafka消息主题
2. valueDeseralizer/deserializer:
- 用于反序列化来自 Kafka 的数据
3. props: 指定kafka集群的属性:
- “bootstrap.servers”(以逗号分隔的 Kafka 经纪人名单)
- “zookeeper.connect”(逗号分隔的 Zookeeper 服务器列表)
- “group.id”消费者群组的 ID
使用案例:
DeserializationSchema / KeyedDeserializationSchema反序列化
kafka传过来的数据是二进制的,所以FlinkKafkaConsumer需要知道如何将kafka中的二进制数据转换为java/Scala对象,也是反序列化
DeserialzationSchema(只反序列化 value)
通过 T deserialize(byte[] message) FlinkKafkaConsumer 从 kafka 获 取 的 每 条 消 息 都 会 通 过 DeserialzationSchema 的 T deserialize(byte[] message)反序列化处理反序列化 Schema 类型(接口)
KeyedDeserializationSchema(反序列化 key 和 value)
常用反序列化Schema
Schema | 描述 |
---|---|
SimpleStringSchema | 可以将消息反序列化为字符串。当我们接收到消息并且反序列化失败的时候,会出现以下两种情况: |
1) Flink 从 deserialize(…)方法中抛出异常,这会导致 job 的失败,然后 job 会重启; | |
2) 在 deserialize(…) 方法出现失败的时候返回 null,这会让 Flink Kafka consumer 默默的忽略这条消息。如果配置了 checkpoint 为 enable,由于 consumer 的失败容忍机制,失败的消息会被继续消费,因此还会继续失败,这就会导致 job 被不断自动重启 | |
JSONKeyValueDeserializationSchema | |
JSONDeserializationSchema | 可以把序列化后的 Json 反序列化成 ObjectNode,ObjectNode 可以通过objectNode.get(“field”).as(Int/String/…)() 来访问指定的字段 |
TypeInformationSerializationSchema | |
TypeInformationKeyValueSerializationSchema | (适合读写均是 flink 的场景)他们会基于 Flink 的 TypeInformation 来创建 schema。这对于那些从Flink 写入,又从 Flink 读出的数据是很有用的。这种 Flink-specific 的反序列化会比其他通用的序列化方式带来更高的性能。 |
Kafka Consumers消费模式
消费模式直接关系到从哪里开始消费
消费模式 | 说明 | |
---|---|---|
setStartFromEarliest | 从头开始,最早的记录 | 内部的 consumer 递交到 kafka/zk 的偏移量将被忽略 |
setStartFromLatest | 从尾开始,最新的记录 | 内部的 consumer 递交到 kafka/zk 的偏移量将被忽略以 consumer 递交到 kafka/zk 中的偏移量为起始位置开始消费 |
setStartFromGroupOffsets | 默认值,从当前消费组记录的偏移量开始,接着上次的偏移量消费 | group.id 设置在 consumer 的 properties 里面;如果没有找到记录的偏移量,则使用 consumer 的 properties 的auto.offset.reset 设置的策略 |
setStartFromSpecificOffsets(Map<TopicPartition, Long>的参数) | 从指定的具体位置开始消费 | |
setStartFromTimestamp(long) | 从指定的时间戳开始消费 | 对于每个分区,时间戳大于或者等于指定时间戳的记录将用作起始位置,如果一个分区的最新时间早于时间戳,那么只需要从最新记录中读取该分区,在此模式下,kafka/zk 中递交的偏移量将被忽略,时间戳指的是 kafka 中消息自带的时间戳 |
- 也就是说:如果是默认行为(setStartFromGroupOffsets),那么任务从检查点重启,按照重启前的 offset 进行消费,如果直接重启不从检查点重启并且 group.id 不变,程序会按照上次提交的 offset 的位置继续消费。如果 group.id 改变了,则程序按照 auto.offset.reset 设置的属性进行消费。但是如果程序带有状态的算子,还是建议使用检查点重启。如果是 setStartFromEarliest()/ setStartFromLatest():那么任务只会从最新或最老消费。
注意: properties.setProperty(“auto.offset.reset”, ”latest“)kafkaConsumer010. setStartFromLatest()不是一回事
从每个分区指定的偏移量读取(setStartFromSpecificOffsets)
- 上面的示例将使用者配置为从主题的分区 0,1 和 2 的指定偏移量开始 myTopic。偏移值应该是消费者应为每个分区读取的下一条记录。
注意 :
-
如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()该特定分区的默认组偏移行为。
-
当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个 Kafka 分区的起始位置由存储在保存点或检查点中的偏移量确定
flink 按照时间戳消息 (kafkasetStartFromTimestamp)
- flink 允许用户从指定的时间戳消费 kafka 中的数据,这极大方便了我们对 kafka 的各种操作,比如补历史数据,查看某个时间点的日志等等,使用方式很简单,指定一个时间戳就可,单位毫秒。
consumer.setStartFromTimestamp(...);//从指定的epoch时间戳(毫秒)开始
说明: 从时间戳消费者忽略 Zookeeper / Kafka 代理中任何提交的组偏移量。消费者将查找时间戳大于或等于的最早偏移量到 Kafka 的特定时间戳。如果没有这样的偏移量,消费者将使用从 kafka 读取数据的最新偏移量。
flink 操作kafka容错性
- 启用 Flink 的检查点后,Flink Kafka Consumer 将使用主题中的记录,并以一致的方式定期检查其所有 Kafka 偏移以及其他操作的状态。如果作业失败,Flink 会将流式程序恢复到最新检查点的状态,从存储在检查点中的偏移量开始重新使用来自 Kafka 的消息数据。因此,设置检查点的间隔定义了程序在发生故障时最多可以返回多少。
flink 在使用 kafka 是要实现容错,需要在执行环境中启用拓扑的检查点:
val env =StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000);//设置检查时间间隔为5000毫秒
- 如果未启用检查点,Kafka(kafka 0.9 以前)使用者将定期向 Zookeeper 提交偏移量,kafka以后提交到 broker,都是将 topic 提交给__consumer_offsets 函数来执行注意 :如果 flink 任务 不通过检查点重启,而是直接重启(groupId 不变),可能会丢失数据。
原因:kafka 自动更新 offset 时,fetch 到消息后就可以定期更新 offset,无论是否消费成功。如果在 kafka 更新 offset 的间期内数据没有写入第三方介质,任务挂掉这部分数据就会丢失。
Kafka动态分区检测
- 对于有实时处理业务需求的企业,随着业务增长数据量也会同步增长,将导致原有的 Kafka分区数不满足数据写入所需的并发度,需要扩展 Kafka 的分区或者增加 Kafka 的 topic,这时就要求实时处理程序。SparkStreaming(与 Kafka 0.10 版本结合支持动态分区检测)、Flink(创建一个线程,该线程会定期检测 Kafka 新增分区,然后将其添加到 kafkaFetcher 里) 都能动态发现新增 topic 分区并消费处理新增分区的数据。
Spark 无 需 做 任 何 配 置 就 可 动 态 发 现 Kafka 新 增 分 区 , 而 Flink 需 要 将 flink.partition-discovery.interval-millis 该属性设置为大于 0 ,属性值为时间间隔 单位为毫秒。
//动态感知 kafka 主题分区的增加 单位毫秒
properties.setProperty("flink.partition-discovery.interval-millis", "5000");
kafka主题分区数和flink并行度的关系
- 对于 kafka 的每一个 topic,获取其 partition 的数量,然后把(topic_name, partition_id)存入 KafkaTopicPartition 类型的 List 中。举个例子,假如我们的 topic 名字是 T,在 kafka中一共有 4 个 partition,那么这个 List 的内容类似于如下的格式:(T,0),(T,1),(T,2),( T,3 )最后,将这个列表中的内容注册,依然调用父类 FlinkKafkaConsumerBase 的setSubscribedPartitions,即消费 kafka 的数据。
消费Kafka时,Source Operator的并发度
- 如果没有指定,Source Operator 的个数与集群中的 TaskManager 的个数相等。如果手动设置,建议使用的 slot 个数=Kafka Partition 的个数/TaskManager 的个数。此时,Slot 的个数需大于等于 2.因为其中有一个 Source Operator。也不建议在一个 Slot 中启用多线程。也就是 每一个 TaskManager 的 slot 的个数等于(kafka 分区总数/taskManager 的总数)
- 这里讨论下 kafka 中 topic 的 partition 数量与 Flink 中 consumer 的线程数的关系,通过上面分析看出,一个 topic 最后生成的 list 的个数就是 partition 的数量,如果 Flink 消费时, consumer 数量大于 partition 数量,则多余的 consumer 不会消费到任何数据,也就是说,consumer的线程数,最好是等于 topic 的 partition 的数量,这样可以保证低延迟下达到最高的吞吐量。而且,每一个 consumer 线程只能保证消费的 partition 内的数据是有序的,并不保证全局 topic是有序消费的。
使用案例
package cn.itcast.stream.source
import java.util
import java.util.Properties
import org.apache.commons.collections.map.HashedMap
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
object StreamingKafkaSourceScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment //隐式转换
import org.apache.flink.api.scala._
//指定消费者主题
val topic = "test"
val props = new Properties();
props.setProperty("bootstrap.servers","node01:9092");
props.setProperty("group.id","test091601");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//动态感知 kafka 主题分区的增加 单位毫秒
props.setProperty("flink.partition-discovery.interval-millis", "5000");
val myConsumer = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), props)
/**
Map<KafkaTopicPartition, Long> Long 参数指定的 offset 位置
KafkaTopicPartition 构造函数有两个参数,第一个为 topic 名字,第二个为分区数
获取 offset 信息,可以用过 Kafka 自带的 kafka-consumer-groups.sh 脚本获取
*/
val offsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]();
offsets.put(new KafkaTopicPartition(topic, 0), 11111111l);
offsets.put(new KafkaTopicPartition(topic, 1), 222222l);
offsets.put(new KafkaTopicPartition(topic, 2), 33333333l);
/**
* Flink 从 topic 中最初的数据开始消费
*/
myConsumer.setStartFromEarliest();
/** * Flink 从 topic 中指定的时间点开始消费,指定时间点之前的数据忽略
*/ myConsumer.setStartFromTimestamp(1559801580000l);
/** * Flink 从 topic 中指定的 offset 开始,这个比较复杂,需要手动指定 offset */
myConsumer.setStartFromSpecificOffsets(offsets);
/**
* Flink 从 topic 中最新的数据开始消费
*/
myConsumer.setStartFromLatest();
/**
* Flink 从 topic 中指定的 group 上次消费的位置开始消费,所以必须配置 group.id 参数
*/
myConsumer.setStartFromGroupOffsets();
//添加消费源
val text = env.addSource(myConsumer)
text.print()
env.execute("StreamingFromCollectionScala")
}
}
sink到kafka
package cn.itcast.stream.sink
package cn.itcast.stream
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer011, FlinkKafkaProducer09} import org.apache.flink.api.scala._
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
object StreamKafkaSink {
val sinkTopic = "test"
//样例类
case class Student(id: Int, name: String, addr: String, sex: String)
val mapper: ObjectMapper = new ObjectMapper()
package cn.itcast.stream.sink
package cn.itcast.stream
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer011, FlinkKafkaProducer09} import org.apache.flink.api.scala._
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
object StreamKafkaSink { val sinkTopic = "test"
//样例类
case class Student(id: Int, name: String, addr: String, sex: String)
val mapper: ObjectMapper = new ObjectMapper()
//将对象转换成字符串
def toJsonString(T: Object): String = {
mapper.registerModule(DefaultScalaModule)
mapper.writeValueAsString(T)
def main(args: Array[String]): Unit = {
//1.创建流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.准备数据
val dataStream: DataStream[Student] = env.fromElements(
Student(8, "xiaoming", "beijing biejing", "female")
)
//将 传入的值student 转换成字符串
val studentStream: DataStream[String] = dataStream.map(student => toJsonString(student)
// 这里需要显示 SerializerFeature 中的某一个,否则会报同时匹配两个方法的错误
)
//studentStream.print()
val prop = new Properties()
prop.setProperty("bootstrap.servers", "node01:9092")
val myProducer = new FlinkKafkaProducer011[String](sinkTopic, new KeyedSerializationSchemaWrapper[String](new
SimpleStringSchema()), prop)
studentStream.addSink(myProducer)
studentStream.print()
env.execute("Flink add sink")
}
}
}
更多推荐
所有评论(0)