kafka重置offset到某个时间点
/*** @Title RestOffset.java* @Package com.dc.config.kafka.init* @Description: TODO(用一句话描述该文件做什么)* @author zsh11619* @date 2019年9月19日* @version V1.0*/package com.dc.config.kafka.init;import java.text.D
·
/**
* @Title RestOffset.java
* @Package com.dc.config.kafka.init
* @Description: TODO(用一句话描述该文件做什么)
* @author zsh11619
* @date 2019年9月19日
* @version V1.0
*/
package com.dc.config.kafka.init;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import com.dc.common.util.ThrowableUtils;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
/**
* @ClassName RestOffset
* @Package com.dc.config.kafka.init
* @Description 用于根据时间重置位置点
* @author zsh11619
* @date 2018年9月19日
*
*/
@Component
public class RestOffset {
private static final Logger logger = LoggerFactory.getLogger(RestOffset.class);
@Value("${kafka.consumer.dbfrom}")
private String consumerDbfrom;
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("#{'${kafka.consumer.topic}'.split(',')}")
private String[] topics;
// 重置到某个时间点的时间戳
@Value("${kafka.consumer.restOffset.toTime:0}")
private Long fetchDataTime;
// 失效
@Value("${kafka.consumer.restOffset.toTime.invalidTime:}")
private String invalidTime;
@Value("${kafka.security.protocol:}")
private String securityProtocol;
@Value("${kafka.sasl.mechanism:}")
private String saslMechanism;
@Value("${kafka.sasl.jaas.config:}")
private String saslJaasConfig;
private DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void run(){
if (!checkParameter()) {
return;
}
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", groupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
if (!ObjectUtils.isEmpty(securityProtocol) && !ObjectUtils.isEmpty(saslMechanism)
&& !ObjectUtils.isEmpty(saslJaasConfig)) {
props.put("security.protocol", securityProtocol);
props.put("sasl.mechanism", saslMechanism);
props.put("sasl.jaas.config", saslJaasConfig);
System.out.println("####重置位置点开启SASL####");
}
KafkaConsumer<String, String> consumer = null;
try {
for (String topic : topics) {
consumer = new KafkaConsumer<>(props);
restOffset(consumer, topic, fetchDataTime);
}
} finally {
if (consumer!=null) {
consumer.close();
}
}
}
/**
*
* @Title restOffset
* @Description
* @param consumer 消费者
* @param topic 主题
* @param fetchDataTime 需要查找的位置点时间戳
* @return void
*/
private void restOffset(KafkaConsumer<String, String> consumer, String topic, long fetchDataTime) {
try {
// 获取topic的partition信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()),
fetchDataTime);
}
logger.error("设置读取时间戳,{}", fetchDataTime);
System.out.println("设置读取时间戳,{"+fetchDataTime+"}");
consumer.assign(topicPartitions);
// 获取每个partition一个小时之前的偏移量
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
logger.error("{},开始设置各分区初始偏移量...", topic);
System.out.println(topic+",开始设置各分区初始偏移量...");
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
// 如果设置的查询偏移量的时间点大于最大的offset记录时间,那么value就为空
offsetTimestamp = entry.getValue();
if (offsetTimestamp != null) {
int partition = entry.getKey().partition();
long timestamp = offsetTimestamp.timestamp();
long offset = offsetTimestamp.offset();
logger.error("partition = {}, time = {}, offset = {}", partition, df.format(new Date(timestamp)), offset);
System.out.println("partition ={"+partition+"}, time = {"+ df.format(new Date(timestamp))+"}, offset = {"+offset+"}");
// 设置读取消息的偏移量
consumer.seek(entry.getKey(), offset);
}
}
logger.error("{},设置各分区初始偏移量结束...", topic);
System.out.println("{"+topic+"},设置各分区初始偏移量结束...");
// while (true) {
// ConsumerRecords<String, String> records = consumer.poll(1000);
// for (ConsumerRecord<String, String> record : records) {
// System.out.println("partition = " + record.partition() + ", offset = " + record.offset());
// }
// }
} catch (Exception e) {
logger.error("{},重置位置点异常:{}", topic, ThrowableUtils.getThrowableMsg(e));
System.out.println("{"+topic+"},重置位置点异常:{"+ThrowableUtils.getThrowableMsg(e)+"}");
}
}
private boolean checkParameter() {
if (ObjectUtils.isEmpty(invalidTime) || ObjectUtils.isEmpty(DateUtil.parse(invalidTime))) {
logger.error("{},未配置失效时间,或不合法yyyy-MM-dd hh:mm:ss ", consumerDbfrom);
return false;
}
Date date1 = DateUtil.parse(invalidTime);
// 相差一个月,31天
long betweenDay = DateUtil.between(date1, new Date(), DateUnit.SECOND, false);
if (betweenDay >= 0) {
logger.error("{},需要重置的时间戳已经失效", consumerDbfrom);
return false;
}
if (ObjectUtils.isEmpty(topics)) {
logger.error("{},未获取到主题", consumerDbfrom);
return false;
}
if (ObjectUtils.isEmpty(fetchDataTime)) {
logger.error("{},需要重置的时间戳为空", consumerDbfrom);
return false;
}
if (fetchDataTime <= 0) {
logger.error("{},需要重置的时间戳为空", consumerDbfrom);
return false;
}
return true;
}
}
注意: 1.当设置的时间点>分区消息的最大时间点时将不会生效 2.在执行重置操作时要保证当前消费者组全部的消费者都没有在消费数据
更多推荐
所有评论(0)