kafka consumer指定时间戳消费
通过指定的时间戳来获取offset,从此offset来消费数据public List<ConsumerRecordDtoNew> consumerByTimestamp(long timestamp) {//===========================//时间戳设置//Map<TopicPartition, OffsetAndMetadata> offset =
·
通过指定的时间戳来获取offset,从此offset来消费数据
@Slf4j
public class KafkaConsumer extends KafkaBase {
private KafkaConsumerDtoNew kafkaConsumerDto;
private Consumer<String, String> consumer;
private String consumerGroup;
public KafkaConsumer(KafkaDataSource kafkaDataSource, KafkaConsumerDtoNew kafkaConsumerDto) {
super(kafkaDataSource);
this.kafkaConsumerDto = kafkaConsumerDto;
if (StringUtils.isNotBlank(kafkaConsumerDto.getGroupId())) {
this.consumerGroup = kafkaConsumerDto.getGroupId();
} else {
this.consumerGroup = "consumer-" + kafkaConsumerDto.getLoginUserName() + "-" + ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss.SSS"));
}
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaDataSource.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaDataSource.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaDataSource.getValueDeserializer());
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerDto.getOffset());//earliest,latest
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
}
public List<ConsumerRecordDtoNew> consumerByTimestamp(long timestamp) {
//===========================
//时间戳设置
// Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
/*这两个方法需要绑定使用,否则consumer.assignment()获取的数据为空
consumer.assign(Arrays.asList(new TopicPartition("t7", 2)));
Set<TopicPartition> partitionInfos = consumer.assignment();*/
List<PartitionInfo> partitionInfos = consumer.partitionsFor(kafkaConsumerDto.getTopic());
if (null != partitionInfos && partitionInfos.size() > 0) {
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo p : partitionInfos) {
map.put(new TopicPartition(p.topic(), p.partition()), timestamp);
}
Map<TopicPartition, OffsetAndTimestamp> offsetTimestamp = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetTimestamp.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
//根据消费里的timestamp确定offset
long position = 0;
if (value != null) {
position = value.offset();
} else {
//当指定时间戳大于最分区最新数据时间戳时,为null
consumer.assign(Collections.singleton(key));
consumer.seekToEnd(Collections.singleton(key));
position = consumer.position(key);
}
//offset.put(key, new OffsetAndMetadata(position));
//以下两个方法需要配合使用
consumer.assign(Collections.singleton(key));
consumer.seek(key, position);
}
}
//时间戳设置完毕
//=========================
List<ConsumerRecordDtoNew> list = new ArrayList<>(5);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
list.add(new ConsumerRecordDtoNew(record.topic(), record.partition(), record.offset(), record.timestamp(), record.key(), record.value(), this.consumerGroup));
}
if (list.size() >= 5) {
break;
}
}
consumer.commitSync();
log.info("consumer records: " + Arrays.asList(list));
return list;
}
@Override
public void close() {
consumer.close();
}
}
更多推荐
所有评论(0)