背景

Kafka的topic进行扩容,出现丢数据的情况,只能通过重启的方式来解决。

解决方法

KafkaSource创建的时候,在Properties中,通过设置参数 flink.partition-discovery.interval-millis 来打开自动发现功能。

此参数的功能是间隔多久(interval)获取一次kakfa的元数据。

默认是关闭的,只要设置interval大于0即可开启,此时FlinkKafkaConsumer会启动一个线程根据interval定期获取Kafka最新的元数据。

需要注意的是,新增的partiton会从EARLIEST位置开始消费。

    val properties = new Properties()
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameter.getRequired(SOURCE_KAFKA_BROKERS))
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    properties.setProperty("flink.partition-discovery.interval-millis",(10 * 1000).toString)  // 自动发现消费的partition变化

备注

如果需要动态发现topic,也是该参数控制的,不过仅限通过正则表达式指定topic的方式

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐