在Kafka服务端中每个分区的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者客户端而言,也有一个offset,表示的是消费到分区中某个消息所在的位置。
消费者客户端每次调用poll时返回的是未被消费的消息,为了能够知道哪些消息被消费过,哪些消息未被消费,消费者客户端使用offset来记录上一次消费位置。为了做到这一点,需要将消费位移进行持久化而不是简单的保存在内存中,这样当消费者客户端重启或者新的消费者加入进行分区再平衡时能够保存之前的消费位移。Kafka在旧版消费者客户端offset是保存在zookeeper中的,在新版中kafka将其保存在_consumer_offsets中。需要注意的是如果消费者消费位置是x,提交的消费者位移是x+1而不是x…
在kafka消费者客户端中默认的位移提交时自动的,消费者客户端参数enable.auto.commit配置,默认为true。自动提交是间隔一定周期提交,周期由消费者客户端参数auto.commit.interval.ms配置,默认是5秒。
kafka中消费者客户端逻辑处理中offset提交时一个比较麻烦的事情,默认的自动提交策略虽然免去了维护offset的麻烦,但是会带来消息重复和消息丢失的问题。假设第一次消费完了,准备提交位移,但是这时候消费者客户端崩溃了,当消费者客户端重新启动的时候,又需要在开始的位置消费消息,造成了消息的重复。
kafka消费者客户端提供了手动管理位移的方法操作,分为同步提交和异步提交。如果想用手动提交offset需要设置消费者客户端参数enable.auto.commit=false。同步提交和异步提交对应了KafkaConsumer中commitSynccommitAsync两个方法。


    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-test-001");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "false");
        return properties;
    }

    public static void main(String[] args) throws Exception {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);

            for (ConsumerRecord<String, String> record : records) {
                System.out.println("topic=" + record.topic() + ",partition=" + record.partition()
                        + ",offset=" + record.offset() + ",key=" + record.key() + ",value=" + record.value());
                long offset = record.offset();
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                // 每次消费一条就提交一次
                consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1)));
            }
        }
    }

或者每次消费一条消息就提交一次:


    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-test-001");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"false");
        return properties;
    }

    public static void main(String[] args) throws Exception{
        Properties props = initConfig();
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topic));
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(1000);
            try {
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic=" + record.topic() + ",partition=" + record.partition()
                            + ",offset=" + record.offset() + ",key=" + record.key() + ",value=" + record.value());
                }
            }finally {
                consumer.commitSync();
            }
        }
    }

异步提交offset示例:


    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-test-001");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"false");
        return properties;
    }

    public static void main(String[] args) throws Exception{
        Properties props = initConfig();
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topic));
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(1000);
            try {
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic=" + record.topic() + ",partition=" + record.partition()
                            + ",offset=" + record.offset() + ",key=" + record.key() + ",value=" + record.value());
                }
            }finally {
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if(exception != null){
                            System.out.println("commit offset success");
                        }else{
                            System.out.println("commit offset failed with error msg "+exception.getMessage());
                        }
                    }
                });
            }
        }
    }

当一个消费者组建立或者订阅了新的主体是没有位移可以查找的,另外_consumer_offsets主体中关于这个消费组的位移信息过期而被删除后也是没有位移相关信息的。除了查找不到消费者位移,位移越界(一般和seek执行有关)也会触发auto.offset.rest参数的执行。每当消费者查找不到消费位移记录时,会根据消费者客户端参数auto.offset.reset配置来决定从何处开始消费,默认这是latest,从分区的末尾开始消费。如果是earliest则会从起始处开始消费,就是offset=0开始消费。还有一个值是none表示既不从消息开始出消费也不从消息最新出开始消费,如果找不到消费位移,会抛异常。
kafka消费者客户端提供了seek方法,用来在指定的位移处开始消费。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐