springboot获取Kafka的Lag
直接上代码pom文件<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.10</version></dependency>application
·
直接上代码
pom文件
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.10</version>
</dependency>
application.yml设置auto-offset-reset: earliest,如果不设置这个值,默认的是latest
spring:
application:
name: sd-km
kafka:
bootstrap-servers: 192.188.234.12:9092
consumer:
auto-offset-reset: earliest
group-id: test
auto-offset-reset值的含义解释
- earliest
- 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest
- 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- none
- topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
工具类:
@Component
public class KafkaOffsetTools {
private static KafkaOffsetTools _this;
@Autowired
private ConsumerFactory<Long, String> consumerFactory;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostConstruct
public void init() {
_this = this;
}
/**
* 获取lag
**/
public static Long getLAG(String topic) {
return getDescribe(topic)[2];
}
/**
* 获取分组下的表述信息
**/
private static long[] getDescribe(String topic) {
long[] describe = new long[3];
Consumer<Long, String> consumer = createConsumer();
List<PartitionInfo> partitionInfos = _this.kafkaTemplate.partitionsFor(topic);
List<TopicPartition> tp = new ArrayList<>();
partitionInfos.forEach(str -> {
TopicPartition topicPartition = new TopicPartition(topic, str.partition());
tp.add(topicPartition);
long logEndOffset = consumer.endOffsets(tp).get(topicPartition);
consumer.assign(tp);
//consumer.position(topicPartition);
long currentOffset = consumer.position(topicPartition);
System.out.println("logEndOffset : " + logEndOffset + ", currentOffset : "+ currentOffset);
describe[0] += currentOffset;
describe[1] += logEndOffset;
describe[2] = describe[1] - describe[0];
tp.clear();
});
System.out.println(Arrays.toString(describe));
return describe;
}
/**
* 创建消费者
**/
private static Consumer<Long, String> createConsumer() {
return _this.consumerFactory.createConsumer();
}
}
在需要的地方调用:
KafkaOffsetTools.getLAG("topicname")
更多推荐


所有评论(0)