深入理解 Kafka 的 ConsumerRebalanceListener

在分布式系统中,数据的一致性和可靠性是至关重要的。Apache Kafka 作为一个流行的分布式流处理平台,提供了强大的数据传输和处理能力。在 Kafka 中,消费者组(Consumer Group)的概念允许多个消费者实例共同处理一个主题的数据。然而,当消费者实例的个数发生变化时,如何确保数据的平衡和一致性呢?这就引出了我们今天要讨论的主题——ConsumerRebalanceListener

ConsumerRebalanceListener 简介

ConsumerRebalanceListener 是 Kafka 提供的一个回调接口,用户可以实现该接口来监听分区重新平衡(partition rebalance)事件。当消费者组中的分区分配发生变化时,Kafka 会触发重新平衡操作。这个接口有两个主要的方法:

  1. onPartitionsRevoked(Collection<TopicPartition> partitions):在重新平衡操作期间,当消费者需要放弃一些分区时调用。
  2. onPartitionsAssigned(Collection<TopicPartition> partitions):在分区重新分配完成并且消费者开始获取数据之前调用,并且只有在调用 Consumer#poll() 方法时才会触发。

示例配置

在开始之前,我们需要配置 Kafka 服务器和相关的生产者与消费者属性。以下是示例配置代码:

package com.logicbig.example;

import java.util.Properties;

public class ExampleConfig {
  public static final String BROKERS = "localhost:9092";

  public static Properties getProducerProps() {
      Properties props = new Properties();
      props.put("bootstrap.servers", BROKERS);
      props.put("acks", "all");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      return props;
  }

  public static Properties getConsumerProps() {
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", BROKERS);
      props.setProperty("group.id", "testGroup");
      props.setProperty("enable.auto.commit", "false");
      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      return props;
  }
}

创建示例主题

接下来,我们创建一个具有 3 个分区的主题:

package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;

public class TopicCreator {
  public static void main(String[] args) throws Exception {
      createTopic("example-topic-2020-6-24", 3);
  }

  private static void createTopic(String topicName, int numPartitions) throws Exception {
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleConfig.BROKERS);
      AdminClient admin = AdminClient.create(config);

      // 检查主题是否已存在
      boolean alreadyExists = admin.listTopics().names().get().stream()
                                   .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
      if (alreadyExists) {
          System.out.printf("topic already exits: %s%n", topicName);
      } else {
          // 创建新主题
          System.out.printf("creating topic: %s%n", topicName);
          NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
          admin.createTopics(Collections.singleton(newTopic)).all().get();
      }

      // 描述主题
      System.out.println("-- describing topic --");
      admin.describeTopics(Collections.singleton(topicName)).all().get()
           .forEach((topic, desc) -> {
               System.out.println("Topic: " + topic);
               System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                           desc.partitions()
                               .stream()
                               .map(p -> Integer.toString(p.partition()))
                               .collect(Collectors.joining(",")));
           });

      admin.close();
  }
}

使用 ConsumerRebalanceListener

最后,我们通过一个示例来展示 ConsumerRebalanceListener 的使用:

package com.logicbig.example;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class ConsumerRebalanceListenerExample {

  public static void main(String[] args) throws InterruptedException {
      ExecutorService executorService = Executors.newFixedThreadPool(3);
      for (int i = 0; i < 3; i++) {
          int finalI = i;
          executorService.execute(() -> startConsumer("consumer-" + finalI));
          Thread.sleep(3000);
      }
      executorService.shutdown();
      executorService.awaitTermination(3, TimeUnit.MINUTES);
  }

  private static KafkaConsumer<String, String> startConsumer(String name) {
      Properties consumerProps = ExampleConfig.getConsumerProps();
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
      consumer.subscribe(Collections.singleton("example-topic-2020-6-24"),
              new ConsumerRebalanceListener() {
                  @Override
                  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                      System.out.printf("onPartitionsRevoked - consumerName: %s, partitions: %s%n", name,
                              formatPartitions(partitions));
                  }

                  @Override
                  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                      System.out.printf("onPartitionsAssigned - consumerName: %s, partitions: %s%n", name,
                              formatPartitions(partitions));
                  }
              });
      System.out.printf("starting consumerName: %s%n", name);
      consumer.poll(Duration.ofSeconds(10));
      System.out.printf("closing consumerName: %s%n", name);
      consumer.close();
      return consumer;
  }

  private static List<String> formatPartitions(Collection<TopicPartition> partitions) {
      return partitions.stream().map(topicPartition ->
              String.format("topic: %s, partition: %s", topicPartition.topic(), topicPartition.partition()))
                     .collect(Collectors.toList());
  }
}

在这个示例中,我们首先启动了一个消费者实例,随后每隔 3 秒启动一个新的消费者实例。可以看到,随着消费者实例的增加和关闭,分区的分配也在不断变化。通过 ConsumerRebalanceListener,我们可以在分区分配变化时执行一些自定义操作。

总结

通过本文的示例,我们可以看到 ConsumerRebalanceListener 在 Kafka 中的应用。它允许我们在分区重新平衡时执行一些自定义操作,从而更好地管理消费者组中的分区分配。这对于确保数据的一致性和可靠性至关重要。

项目依赖和技术

以下是本示例项目中使用的一些关键依赖和技术:

  • Apache Kafka 2.5.0
  • JDK 8
  • Maven 3.5.4

希望本文能帮助你更好地理解 Kafka 中的分区重新平衡机制以及如何使用 ConsumerRebalanceListener

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐