Flink Kafka Connector 关于Partition动态发现
背景Kafka的topic进行扩容,出现丢数据的情况,只能通过重启的方式来解决。解决方法KafkaSource创建的时候,在Properties中,通过设置参数flink.partition-discovery.interval-millis 来打开自动发现功能。此参数的功能是间隔多久(interval)获取一次kakfa的元数据。默认是关闭的,只要设置interval大于0...
·
背景
Kafka的topic进行扩容,出现丢数据的情况,只能通过重启的方式来解决。
解决方法
KafkaSource创建的时候,在Properties中,通过设置参数 flink.partition-discovery.interval-millis 来打开自动发现功能。
此参数的功能是间隔多久(interval)获取一次kakfa的元数据。
默认是关闭的,只要设置interval大于0即可开启,此时FlinkKafkaConsumer会启动一个线程根据interval定期获取Kafka最新的元数据。
需要注意的是,新增的partiton会从EARLIEST位置开始消费。
val properties = new Properties()
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameter.getRequired(SOURCE_KAFKA_BROKERS))
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
properties.setProperty("flink.partition-discovery.interval-millis",(10 * 1000).toString) // 自动发现消费的partition变化
备注
如果需要动态发现topic,也是该参数控制的,不过仅限通过正则表达式指定topic的方式。
更多推荐
已为社区贡献1条内容
所有评论(0)