下面是Flink读取Kafka数据的代码,其中就有五种读取offset的方式,并配置相应的介绍

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

Properties props = new Properties();
props.setProperty("bootstrap.servers",KAFKA_BROKER);
props.setProperty("zookeeper.connect", ZK_HOST);
props.setProperty("group.id",GROUP_ID);
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(TOPIC, new SimpleStringSchema(), props);

/**
* Map<KafkaTopicPartition, Long> Long参数指定的offset位置
* KafkaTopicPartition构造函数有两个参数,第一个为topic名字,第二个为分区数
* 获取offset信息,可以用过Kafka自带的kafka-consumer-groups.sh脚本获取
*/
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("maxwell_new", 0), 11111111l);
offsets.put(new KafkaTopicPartition("maxwell_new", 1), 222222l);
offsets.put(new KafkaTopicPartition("maxwell_new", 2), 33333333l);

/**
* Flink从topic中最初的数据开始消费
*/
consumer.setStartFromEarliest();

/**
* Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略
*/
consumer.setStartFromTimestamp(1559801580000l);

/**
* Flink从topic中指定的offset开始,这个比较复杂,需要手动指定offset
*/
consumer.setStartFromSpecificOffsets(offsets);

/**
* Flink从topic中最新的数据开始消费
*/
consumer.setStartFromLatest();

/**
* Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
*/
consumer.setStartFromGroupOffsets();
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐