一.基本概念

1.消费者和消费组

Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所示:

 

Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:

 

二.消息接收

1.必要参数设置

/**
 * Kafka 消费者分析
 */
@Slf4j
public class KafkaConsumerAnalysis {
    public static final String brokerList = "192.168.37.129:9092";
    public static final String topic = "test";
    public static final String groupId = "group.yfy";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);
​
    public static Properties initConfig() {
        Properties props = new Properties();
        // 与KafkaProducer中设置保持一致
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 必填参数,该参数和KafkaProducer中的相同,制定连接Kafka集群所需的broker地址清单,可以设置一个或者多个
        props.put("bootstrap.servers", brokerList);
        // 消费者隶属于的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称
        props.put("group.id", groupId);
        // 指定KafkaConsumer对应的客户端ID,默认为空,如果不设置KafkaConsumer会自动生成一个非空字符串
        props.put("client.id", "consumer.client.id.demo");
​
        return props;
    }
    
    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
​
        // 正则订阅主题
//        consumer.subscribe(Pattern.compile("test.*"));
​
        // 指定订阅的分区
//        consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
​
        try {
            while (isRunning.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
                    System.out.println("key = " + record.key() + ", value = " + record.value());
                    //do something to process record.
                }
            }
        } catch (Exception e) {
            log.error("occur exception ", e);
        } finally {
            consumer.close();
        }
    }
}

1)这是 个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询kafka请求数据。

2)消费者必须持续对 Kafka进行轮询,否则会被认为己经死亡,分区会被移交给群组里的其他消费者。传给poll() 方法的参数是一个超时时间,用于控制poll()方法的的阻塞时间。如果该参数被设为 0,poll ()会立即返回 ,否则它会在指定的毫秒数内一直等待 broker 返回数据。

3)poll ()方法能返回一个记录列表。每条记录都包含了记录所属主题的信息、记录分区的信息、记录在分区里的偏移量以及记录的键值对。我们一般会遍历这个列表 ,逐条处理这些记录。 poll()方法有一个超时参数,它指定了方法在多久之后可以返回,不管有没有可用的数据都要返回。 超时时间的设置取决于应用程序对响应速度的要求,比如要在多长时间内把控制权归还给执行轮询的线程。4)在退出应用程序之前 close()方怯关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳井认定它已死亡,因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息

2.订阅主题和分区

创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接Kafka与其他系统时非常有用。比如订阅所有的测试主题:

consumer.subscribe(Pattern.compile("test.*"));

指定订阅的分区

consumer.assign(Arrays.asList(new TopicPartition("test", 0)));

3.反序列化

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

4.位移提交

对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。

当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。

自动提交

这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。

需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

同步提交

public class CheckOffsetAndCommit {
    public static final String brokerList = "192.168.37.129:9092";
    public static final String topic = "test";
    public static final String groupId = "group.yfy";
    private static AtomicBoolean running = new AtomicBoolean(true);
​
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
​
        // 手动提交开启
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }
​
    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
​
​
        TopicPartition tp = new TopicPartition(topic, 0);
        consumer.assign(Arrays.asList(tp));
        long lastConsumedOffset = -1;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            if (records.isEmpty()) {
                break;
            }
            List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
            lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync();//同步提交消费位移
        }
        System.out.println("comsumed offset is " + lastConsumedOffset);
        OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
        System.out.println("commited offset is " + offsetAndMetadata.offset());
        long posititon = consumer.position(tp);
        System.out.println("the offset of the next record is " + posititon);
    }
}

异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。

public class OffsetCommitAsyncCallback extends ConsumerClientConfig {
​
    private static AtomicBoolean running = new AtomicBoolean(true);
​
    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
​
        try {
            while (running.get()) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    //do some logical processing.
                }
                // 异步回调
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                           Exception exception) {
                        if (exception == null) {
                            System.out.println(offsets);
                        } else {
                            log.error("fail to commit offsets {}", offsets, exception);
                        }
                    }
                });
            }
        } finally {
            consumer.close();
        }
​
        try {
            while (running.get()) {
                consumer.commitAsync();
            }
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}

但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,我们可以使用同步和异步提交结合的方式提交。在消费者关闭前commitSync()提交一下。

        try {
            while (running.get()) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                consumer.commitAsync();
            }
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
  • 如果一切正常,我们使用commitAsync()方法来提交。这样速度更快,而且即使这次提交失败,下一次提交很可能会成功。

  • 如果直接关闭消费者,就没有所谓的“下一次提交”了。使用 commitSync()方法也会一直重试,直到提交成功或发生无法恢复的错误。

5.指定位移消费

到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来处理的,但是这个方法对于普通开发人员来说就是个黑盒处理,无法精确掌握其消费的起始位置。

seek()方法正好提供了这个功能,让我们得以追踪以前的消费或者回溯消费

public class SeekDemo extends ConsumerClientConfig {
​
​
    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        // timeout参数设置多少合适?太短会使分区分配失败,太长又有可能造成一些不必要的等待
        consumer.poll(Duration.ofMillis(2000));
        // 获取消费者所分配到的分区
        Set<TopicPartition> assignment = consumer.assignment();
        System.out.println(assignment);
        for (TopicPartition tp : assignment) {
            // 参数partition表示分区,offset表示指定从分区的哪个位置开始消费
            consumer.seek(tp, 10);
        }
//        consumer.seek(new TopicPartition(topic,0),10);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            //consume the record.
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.offset() + ":" + record.value());
            }
        }
    }
​
}

增加判断是否分配到了分区

public class SeekDemoAssignment extends ConsumerClientConfig {
​
​
    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        long start = System.currentTimeMillis();
        Set<TopicPartition> assignment = new HashSet<>();
        while (assignment.size() == 0) {
            consumer.poll(Duration.ofMillis(100));
            assignment = consumer.assignment();
        }
        long end = System.currentTimeMillis();
        System.out.println(end - start);
        System.out.println(assignment);
        for (TopicPartition tp : assignment) {
            consumer.seek(tp, 10);
        }
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            //consume the record.
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.offset() + ":" + record.value());
            }
        }
    }
}
​

指定从分区末尾开始消费

        // 指定从分区末尾开始消费
        Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);
        for (TopicPartition tp : assignment) {
            consumer.seek(tp, offsets.get(tp));
        }

演示位移越界操作,修改代码如下:

        // 指定从分区末尾开始消费
        Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);
        for (TopicPartition tp : assignment) {
            consumer.seek(tp, offsets.get(tp)+1);
        }

6.消费者拦截器

消费者拦截器主要是在消费到消息或者在提交消费位移时进行的一些定制化的操作。

public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
    private static final long EXPIRE_INTERVAL = 10 * 1000;
​
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        System.out.println("before:" + records);
        long now = System.currentTimeMillis();
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords
                = new HashMap<>();
        for (TopicPartition tp : records.partitions()) {
            List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
            List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
            for (ConsumerRecord<String, String> record : tpRecords) {
                if (now - record.timestamp() < EXPIRE_INTERVAL) {
                    newTpRecords.add(record);
                }
            }
            if (!newTpRecords.isEmpty()) {
                newRecords.put(tp, newTpRecords);
            }
        }
        return new ConsumerRecords<>(newRecords);
    }
​
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((tp, offset) ->
                System.out.println(tp + ":" + offset.offset()));
    }
​
    @Override
    public void close() {
    }
​
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

实现自定义拦截器之后,需要在KafkaConsumer中配置指定这个拦截器,如下

props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptorTTL.class.getName());

发送端同时发送两条消息,其中一条修改timestamp的值来使其变得超时,如下:

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");
        ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 10 * 1000, "Kafka-demo-001", "hello, Kafka!->超时");

启动消费端运行如下,只收到了未超时的消息:

before:org.apache.kafka.clients.consumer.ConsumerRecords@7adda9cc
topic = test, partition = 1, offset = 18
key = Kafka-demo-001, value = hello, Kafka!

三.其它消费者参数

fetch.min.bytes

这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力。

2.fetch.max.wait.ms

上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。

3.max.partition.fetch.bytes

这个参数指定了每个分区返回的最多字节数,默认为1M。也就是说,KafkaConsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。

4.max.poll.records

这个参数控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐