kafka指定偏移量拉取与偏移量半自动提交
离去年写了有关偏移量有关文章快一年了,但最近在偏移量方面遇到了些小问题,在这里记录下。还有关于偏移量半自动提交,是个很经典的问题,顺便也记录下。
·
离去年写了有关偏移量有关文章快一年了,但最近在偏移量方面遇到了些小问题,在这里记录下。还有关于偏移量半自动提交,是个很经典的问题,顺便也记录下。
关于拉取指定偏移量
应该只有用consumer.assign(topicPartitionList);和consumer.seek(topicPartition,offset);这种指定分区的方法才能指定偏移量。
在sparkstreaming中同样也是用assign的方法提交的最后实现拉取指定偏移量的方法。
KafkaUtils.createDirectStream(JSSC, LocationStrategies.PreferConsistent(),
ConsumerStrategies.Assign(topicPartitionList, kafkaParams, repairOffset));
但是在这个过程中,配置的自动提交偏移量失去作用,必须手动提交偏移量,即使用consumer.commitAsync();。
半自动提交偏移量
在任务执行结束后提交一次偏移量,同时在执行失败后提交一次偏移量,尽量保证偏移量不丢失,
try {
while(true) {
consumerRecords = this.consumer.poll(100);
for (ConsumerRecord<String, String> record : consumerRecords) {
String value = record.value();
if (StringUtils.isNotBlank(value)) {
//业务逻辑
}
consumer.commitAsync();
}
}
}catch (Exception e) {
System.out.println("commit failed");
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
问题:
之前因为环境调整,在卸载kafkamanager的时候,新建了一个groupid。但在重装kafkamanager后,这个groupid丢失。具体原因有待排查。
更多推荐
已为社区贡献4条内容
所有评论(0)