离去年写了有关偏移量有关文章快一年了,但最近在偏移量方面遇到了些小问题,在这里记录下。还有关于偏移量半自动提交,是个很经典的问题,顺便也记录下。


关于拉取指定偏移量

应该只有用consumer.assign(topicPartitionList);和consumer.seek(topicPartition,offset);这种指定分区的方法才能指定偏移量。

在sparkstreaming中同样也是用assign的方法提交的最后实现拉取指定偏移量的方法。

KafkaUtils.createDirectStream(JSSC, LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Assign(topicPartitionList, kafkaParams, repairOffset));

但是在这个过程中,配置的自动提交偏移量失去作用,必须手动提交偏移量,即使用consumer.commitAsync();。


半自动提交偏移量

在任务执行结束后提交一次偏移量,同时在执行失败后提交一次偏移量,尽量保证偏移量不丢失,

  try {
            while(true) {
                consumerRecords = this.consumer.poll(100);
                for (ConsumerRecord<String, String> record : consumerRecords) {
          
                    String value = record.value();
                    if (StringUtils.isNotBlank(value)) {
                        //业务逻辑
                    }
                    consumer.commitAsync();
                   
                }
            }
        }catch (Exception e) {
            System.out.println("commit failed");
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }

问题:

之前因为环境调整,在卸载kafkamanager的时候,新建了一个groupid。但在重装kafkamanager后,这个groupid丢失。具体原因有待排查。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐