本文主要通过实际编码来对《MQ: 一张图读懂kafka工作原理》提到的部分原理进行验证与实现。

相关文章参考:

1.版本说明

后续代码依赖于以下版本,其他版本不保证代码可用:

  • kafka 服务版本:2.11-1.0.1
  • kafka-clients.jar 版本:2.2.0
  • spring-kafka.jar 版本:1.3.5.RELEASE
  • spring-boot版本:1.5.10.RELEASE

2.kafka接入

pom.xml

先引入kafka的spring依赖包,这个包提供Producer和Consumer相关的操作。

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.3.5.RELEASE</version>
</dependency>

如果想进行Topic、Partition相关的操作,则引入下面的包:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.2.0</version>
</dependency>

application.properties

只给出最基本配置,如需调优,请自行增加其他配置。

spring.kafka.bootstrap-servers=127.0.0.1:9092

3.示例代码:Topic的增删改查

Toplic的增删改查需要通过AdminClient操作,主要依赖kafka-clients.jar

3.1.获取kafka管理客户端

    /**
     * 获取kafka管理客户端
     */
    private static AdminClient getKafkaAdminClient() {
        Map<String, Object> props = new HashMap<>(1);
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "10.126.153.155:9092");
        return KafkaAdminClient.create(props);
    }

3.2.获取全部topic名称

    /**
     * 获取全部topic名称
     */
    private static Collection<String> getAllTopic(AdminClient client) throws InterruptedException, ExecutionException {
        return client.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
    }

3.3.显示指定Topic的详细配置

    /**
     * 显示指定topic的信息
     */
    private static void showTopicInfo(AdminClient client, Collection<String> topics, List<String> wantedTopicList) throws InterruptedException, ExecutionException {
        client.describeTopics(topics).all().get().forEach((topic, description) -> {
            if (wantedTopicList.contains(topic)) {
                log.info("==== Topic {} Begin ====", topic);
                for (TopicPartitionInfo partition : description.partitions()) {
                    log.info(partition.toString());
                }
                log.info("==== Topic {} End ====", topic);
            }
        });
    }

3.4.新建Topic

可以一次性创建多个Topic,每个topic需要指定名称、Partition数量和Replicas数量。

Replicas数量不能超过broker数量。本文使用的kafka只有一个broker。

        //创建topic:副本数不能超过broker数量
        client.createTopics(Lists.newArrayList(
                //聊天室   3分区
                new NewTopic(TOPIC_CHAT_ROOM, 3, (short) 1),
                //邮件     3分区
                new NewTopic(TOPIC_MAIL, 3, (short) 1),
                //短信     1分区
                new NewTopic(TOPIC_SMS, 1, (short) 1)
        ));

3.5.删除Topic

可以一次性删除多个Topic。

 client.deleteTopics(Lists.newArrayList("topic-send-sms","topic-send-mail"));

3.6.修改Topic

无法已经存在的Topic的分区数量等配置,只能删掉之后重建。

3.7.测试代码与运行结果

    /**
     * 聊天室   3分区
     */
    public static final String TOPIC_CHAT_ROOM = "topic-hc-chat-room";
    public static final String PERSON_LORA = "Lora";
    public static final String PERSON_JACK = "Jack";
    public static final String PERSON_PAUL = "Paul";

    /**
     * 邮件     3分区
     */
    public static final String TOPIC_MAIL = "topic-hc-mail";

    public static final String CONSUMER_GROUP_MAIL_1 = "MailConsumer-Group-1";
    public static final String CONSUMER_MAIL = "MailConsumer-ALL";

    public static final String CONSUMER_GROUP_MAIL_2 = "MailConsumer-Group-2";
    public static final String CONSUMER_MAIL_PARTITION_0 = "MailConsumer-P0";
    public static final String CONSUMER_MAIL_PARTITION_12 = "MailConsumer-P1,P2";

    public static final String CONSUMER_GROUP_MAIL_3 = "MailConsumer-Group-3";
    public static final String CONSUMER_MAIL_MULTI_0 = "MailConsumer-M0";
    public static final String CONSUMER_MAIL_MULTI_1 = "MailConsumer-M1";
    public static final String CONSUMER_MAIL_MULTI_2 = "MailConsumer-M2";
    public static final String CONSUMER_MAIL_MULTI_3 = "MailConsumer-M3";

    /**
     * 短信     1分区
     */
    public static final String TOPIC_SMS = "topic-hc-sms";
    public static final String CONSUMER_SMS = "SmsConsumer";
		/**
     * 显示、创建、删除topic
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //获取kafka管理客户端
        AdminClient client = getKafkaAdminClient();

        //查询全部topic
        Collection<String> topics = getAllTopic(client);

        //创建topic:副本数不能超过broker数量
        client.createTopics(Lists.newArrayList(
                //聊天室   3分区
                new NewTopic(TOPIC_CHAT_ROOM, 3, (short) 1),
                //邮件     3分区
                new NewTopic(TOPIC_MAIL, 3, (short) 1),
                //短信     1分区
                new NewTopic(TOPIC_SMS, 1, (short) 1)
        ));

        //查询topic详情
        List<String> wantedTopicList = Lists.newArrayList(TOPIC_CHAT_ROOM, TOPIC_MAIL, TOPIC_SMS);
        showTopicInfo(client, topics, wantedTopicList);

        //删除topic:想要修改topic的配置如分区等需要删掉重建
        client.deleteTopics(Lists.newArrayList("topic-send-sms","topic-send-mail"));
    }

运行结果

运行结果显示了Topic的Partition、Leader、Replicas和isr配置。

 INFO - Kafka version: 2.2.0 
 INFO - Kafka commitId: 05fcfde8f69b0349 
 INFO - ==== Topic topic-hc-chat-room Begin ==== 
 INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
 INFO - (partition=1, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
 INFO - (partition=2, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
 INFO - ==== Topic topic-hc-chat-room End ==== 
 INFO - ==== Topic topic-hc-mail Begin ==== 
 INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
 INFO - (partition=1, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
 INFO - (partition=2, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
 INFO - ==== Topic topic-hc-mail End ==== 
 INFO - ==== Topic topic-hc-sms Begin ==== 
 INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
 INFO - ==== Topic topic-hc-sms End ==== 

4.示例代码:Producer发送消息

发送端的逻辑比较清晰,只需要主要发送时传递的必填与选填参数即可。

参考上一篇文章,消息发送参数:topic必填、partition选填、key选填、message必填。

根据这些参数,将消息发送至哪个Topic-Partition的路由规则,还是去参考上一篇文章

下面通过一个API来展示:

/**
 * <p>生产者</P>
 *
 * @author hanchao
 */
@Slf4j
@RestController
public class ProducerController {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 发送消息
     */
    @GetMapping("/kafka/batch-send")
    public boolean batchSendMessage(@RequestParam(required = false) String producer,
                                    @RequestParam String topic,
                                    @RequestParam(required = false) Integer partition,
                                    @RequestParam(required = false) String key,
                                    @RequestParam String value,
                                    @RequestParam(required = false) Integer batch) {
        producer = Objects.isNull(producer) ? "Message Producer" : producer;
        batch = Objects.isNull(batch) ? 1 : batch;

        for (int i = 0; i < batch; i++) {
            sendMessage(producer, topic, partition, key, value + i);
        }
        return true;
    }

    /**
     * 发送消息
     */
    private void sendMessage(String producer, String topic, Integer partition, String key, String value) {
        log.info("======>>> " + producer + ": topic={}, [partition={}], [key={}], value={}", topic, partition, key, value);
//        log.info("{}--发送消息:{}", producer, value);
        if (Objects.nonNull(partition)) {
            if (StringUtils.isNotEmpty(key)) {
                kafkaTemplate.send(topic, partition, key, value);
            } else {
                kafkaTemplate.send(topic, partition, null, value);
            }
        } else {
            if (StringUtils.isNotEmpty(key)) {
                kafkaTemplate.send(topic, key, value);
            } else {
                kafkaTemplate.send(topic, value);
            }
        }
    }
}

5.示例代码:Consumer消费消息

相对于ProducerConsumer的逻辑相对复杂,因为涉及Consumer Group的概念。

在本人的开发版本中,通过注解@KafkaListenergroupId参数设置Consumer Group,通过id参数标记Consumer

为了方便打印日志,简单定义一个抽象类:

@Slf4j
public abstract class AbstractConsumer {
    /**
     * 打印消息
     */
    public void logRecord(String name, ConsumerRecord<?, ?> record) {
        log.info("<<<------ " + name + " : topic={}, [partition={}], [key={}], value={}, offset={}",record.topic(), record.partition(), record.key(), record.value(), record.offset());
        log.info("{}--收到消息:{}", name, record.value());
    }
}

5.1.Consumer Group只有1个Consumer

下面的Consumer Group只有1个Consumer。

/**
 * <p>邮件-3个分区-Consumer Group只有1个Consumer</P>
 *
 * @author hanchao
 */
@Component
public class MailConsumer extends AbstractConsumer {

    @KafkaListener(id = KafkaClientDemo.CONSUMER_MAIL, groupId = KafkaClientDemo.CONSUMER_GROUP_MAIL_1, topics = {KafkaClientDemo.TOPIC_MAIL})
    public void listenTopicForSms0(ConsumerRecord<?, ?> record) {
        logRecord(KafkaClientDemo.CONSUMER_MAIL, record);
    }
}

测试代码

    /**
     * 保证消息被处理完
     */
    @After
    public void tearDown() throws Exception {
        Thread.sleep(5000);
    }

		/**
     * http get 请求 test 的简单封装
     */
    public void simpleGet(String url) throws Exception {
        mockMvc.perform(MockMvcRequestBuilders.get(url))
                .andExpect(MockMvcResultMatchers.status().isOk())
                .andDo(MockMvcResultHandlers.print())
                .andReturn().getResponse().getContentAsString();
    }
    
    /**
     * 短信,一个分区,一个Consumer
     * producer 无需传参 partition和key
     * consumer 无需指定group
     */
    @Test
    public void testForSimple() throws Exception {
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&value=hello&batch=5");
    }

测试结果

  • 若Consumer-Group只有1个Consumer,则这个Partition中的所有消息都被这个Consumer消费。
  • Producer生产的 5条消息,被随机分配给了3个partition。
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello0 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello1 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello2 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello3 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello4 
 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=1], [key=null], value=hello2, offset=11147 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11157 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=null], value=hello3, offset=11158 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=null], value=hello1, offset=21147 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=null], value=hello4, offset=21148 

5.2.Consumer Group只有1个Consumer - 指定partition发送

上面的例子中,partition和key未传参,最终的消息随机分布在partition中。

下面,指定partition的值:

    /**
     * 指定分区进行分发
     */
    @Test
    public void testForPartition() throws Exception {
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=0&value=hello&batch=1");
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=1&value=hello&batch=1");
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=2&value=hello&batch=1");
    }

**测试结果:**消息按照设想,分别被存放到了分区0、1、2 。

 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=0], [key=null], value=hello0 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=1], [key=null], value=hello0 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=2], [key=null], value=hello0 
 
  INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11159 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=1], [key=null], value=hello0, offset=11148 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=null], value=hello0, offset=21149 

5.3.Consumer Group只有1个Consumer - 指定keyf发送

如果不指定partition,只是指定key呢?看下面的代码:

    /**
     * 指定key进行分发
     * key的作用是为消息选择存储分区,key可以为空。
     * 当指定key且不为空的时候,kafka是根据key的hash值与分区数取模来决定数据存储到那个分区。
     * 当key=null时,kafka是先从缓存中取分区号,然后判断缓存的值是否为空,如果不为空,就将消息存到这个分区,否则随机选择一个分区进行存储,并将分区号缓存起来,供下次使用。
     */
    @Test
    public void testForKey() throws Exception {
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&key=9&value=hello&batch=1");
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&key=10&value=hello&batch=1");
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&key=11&value=hello&batch=1");
    }

测试结果

key的作用是为消息选择存储分区,key可以为空。

当指定key且不为空的时候,kafka是根据key的hash值与分区数取模来决定数据存储到那个分区。

当key=null时,kafka是先从缓存中取分区号,然后判断缓存的值是否为空,如果不为空,就将消息存到这个分区,否则随机选择一个分区进行存储,并将分区号缓存起来,供下次使用。

 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=9], value=hello0 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=10], value=hello0 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=11], value=hello0 

 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=9], value=hello0, offset=21150 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=1], [key=10], value=hello0, offset=11149 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=11], value=hello0, offset=11160 

5.4.Consumer Group有多个Consumer - 手动指定分区消费

下面的Consumer Group有2个Consumer,其中Consumer0指定接收Partition0的消息,Consumer1指定接收Partition1和2的消息。

/**
 * <p>邮件-3个分区-分区消费</P>
 *
 * @author hanchao
 */
@Component
public class MailPartitionConsumer extends AbstractConsumer {
    /**
     * 分区消费能够加快消息消费速度
     * 此Consumer只消费分区0的数据
     */
    @KafkaListener(id = CONSUMER_MAIL_PARTITION_0, groupId = CONSUMER_GROUP_MAIL_2, topicPartitions = {@TopicPartition(topic = TOPIC_MAIL, partitions = "0")})
    public void listenTopicForSms0(ConsumerRecord<?, ?> record) {
        logRecord(CONSUMER_MAIL_PARTITION_0, record);
    }

    /**
     * 分区消费能够加快消息消费速度
     * 此Consumer消费分区1,2的数据
     */
    @KafkaListener(id = CONSUMER_MAIL_PARTITION_12, groupId = CONSUMER_GROUP_MAIL_2, topicPartitions = {@TopicPartition(topic = TOPIC_MAIL, partitions = {"1", "2"})})
    public void listenTopicForSms1(ConsumerRecord<?, ?> record) {
        logRecord(CONSUMER_MAIL_PARTITION_12, record);
    }
}

测试代码

    /**
     * 指定分区进行分发
     */
    @Test
    public void testForPartition() throws Exception {
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=0&value=hello&batch=1");
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=1&value=hello&batch=1");
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=2&value=hello&batch=1");
    }

测试结果

  • MailConsumer-P0确实只消费了partition=0的消息, MailConsumer-P1,P2消费了partition=1,2的消息。
  • 分区消费的好处是:同一个Consumer接收消息时串行的,多个Consumer同时接收多个分区的消息,能够加快消息接收速度。
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=0], [key=null], value=hello0 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=1], [key=null], value=hello0 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=2], [key=null], value=hello0 

 INFO - <<<------ MailConsumer-P0 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11152 
 INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=1], [key=null], value=hello0, offset=11142 
 INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=2], [key=null], value=hello0, offset=21142 

特别注意

如果Topic共3个分区,却在编码时,只指定了2个分区会怎样?

将 id = CONSUMER_MAIL_PARTITION_12 的partition参数由{“1”, “2”}修改为{“1”},修改之后代码如下:

    /**
     * 分区消费能够加快消息消费速度
     * 此Consumer消费分区1的数据
     */
    @KafkaListener(id = CONSUMER_MAIL_PARTITION_12, groupId = CONSUMER_GROUP_MAIL_2, topicPartitions = {@TopicPartition(topic = TOPIC_MAIL, partitions = {"1"})})
    public void listenTopicForSms1(ConsumerRecord<?, ?> record) {
        logRecord(CONSUMER_MAIL_PARTITION_12, record);
    }

再次运行,测试结果如下:partition=2的消息没有被消费!!!

 INFO - <<<------ MailConsumer-P0 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11154 
 INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=1], [key=null], value=hello0, offset=11144 

所以:

  • 手动设置partition可能会导致部分分区的数据被遗忘,不建议手动设置,推荐下一节的自动配置方式。

5.5.Consumer Group有多个Consumer + 自动分区消费

自动分区消费配置方式也很简单,只需要去掉partition属性,只保留topic配置即可:

/**
 * <p>邮件-3个分区-分区消费</P>
 *
 * @author hanchao
 */
@Component
public class MailPartitionConsumer extends AbstractConsumer {
    /**
     * 分区消费能够加快消息消费速度
     * 此Consumer只消费分区0的数据
     */
    @KafkaListener(id = CONSUMER_MAIL_PARTITION_0, groupId = CONSUMER_GROUP_MAIL_2, topicPattern = TOPIC_MAIL)
    public void listenTopicForSms0(ConsumerRecord<?, ?> record) {
        logRecord(CONSUMER_MAIL_PARTITION_0, record);
    }

    /**
     * 分区消费能够加快消息消费速度
     * 此Consumer消费分区1的数据
     */
    @KafkaListener(id = CONSUMER_MAIL_PARTITION_12, groupId = CONSUMER_GROUP_MAIL_2, topicPattern = TOPIC_MAIL)
    public void listenTopicForSms1(ConsumerRecord<?, ?> record) {
        logRecord(CONSUMER_MAIL_PARTITION_12, record);
    }
}

再次运行结果:

  • kafka自动将Consumer Group接受的消息分配给其内的Consumer们。
  • 多个Partition的消息可以被一个Consumer消费。
  • 单个Partition的消息只能被其中一个Consumer消费,不能被Consumer-Group内的多个Consumer消费。
 INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11156 
 INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=1], [key=null], value=hello0, offset=11146 
 INFO - <<<------ MailConsumer-P0 : topic=topic-hc-mail, [partition=2], [key=null], value=hello0, offset=21146 

5.6.Consumer Group有多个Consumer + Consumer过多

上面场景中,消息partition=3,而consumer=2。如果Consumer数量大于partition数量呢?

下面的Consumer Group中,存在4个Consumer:

/**
 * <p>邮件-3个分区-Consumer数量大于分区数量</P>
 *
 * @author hanchao
 */
@Component
public class MailMultiConsumer extends AbstractConsumer {
    /**
     * 如果Consumer-Group组中的Consumer数量多于分区数量,则在服务稳定运行期间,会有Consumer永远无法消费消息
     */
    @KafkaListener(id = CONSUMER_MAIL_MULTI_0, groupId = CONSUMER_GROUP_MAIL_3, topics = {TOPIC_MAIL})
    public void listenTopic0(ConsumerRecord<?, ?> record) {
        logRecord(CONSUMER_MAIL_MULTI_0, record);
    }

    @KafkaListener(id = CONSUMER_MAIL_MULTI_1, groupId = CONSUMER_GROUP_MAIL_3, topics = {TOPIC_MAIL})
    public void listenTopic1(ConsumerRecord<?, ?> record) {
        logRecord(CONSUMER_MAIL_MULTI_1, record);
    }

    @KafkaListener(id = CONSUMER_MAIL_MULTI_2, groupId = CONSUMER_GROUP_MAIL_3, topics = {TOPIC_MAIL})
    public void listenTopic2(ConsumerRecord<?, ?> record) {
        logRecord(CONSUMER_MAIL_MULTI_2, record);
    }

    @KafkaListener(id = CONSUMER_MAIL_MULTI_3, groupId = CONSUMER_GROUP_MAIL_3, topics = {TOPIC_MAIL})
    public void listenTopic3(ConsumerRecord<?, ?> record) {
        logRecord(CONSUMER_MAIL_MULTI_3, record);
    }
}

测试结果

  • 每次运行,总有1个分区不会获取消息。
  • 也就是说:若单个Topic的分区数量小于Consumer-Group内的Consumer个数,则会存在Consumer接受不到这个Topic的消息。
 INFO - <<<------ MailConsumer-M3 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11156 
 INFO - <<<------ MailConsumer-M1 : topic=topic-hc-mail, [partition=1], [key=null], value=hello0, offset=11146 
 INFO - <<<------ MailConsumer-M2 : topic=topic-hc-mail, [partition=2], [key=null], value=hello0, offset=21146 

5.7.多Consumer Group

上面的场景都是针对单个Consumer Group,现在对上述3个Consumer Group进行同时测试:

    @Test
    public void testForSimple() throws Exception {
        simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&value=hello&batch=5");
    }

测试结果(为了方便查看,测试结果的日志顺序进行了调整):

  • Producer共计生产了5条消息,每个Consumer-Group分别收到了5条消息。
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello0 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello1 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello2 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello3 
 INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello4 

 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=1], [key=null], value=hello2, offset=11147 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11157 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=null], value=hello3, offset=11158 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=null], value=hello1, offset=21147 
 INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=null], value=hello4, offset=21148 

 INFO - <<<------ MailConsumer-P0 : topic=topic-hc-mail, [partition=2], [key=null], value=hello1, offset=21147 
 INFO - <<<------ MailConsumer-P0 : topic=topic-hc-mail, [partition=2], [key=null], value=hello4, offset=21148 
 INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=1], [key=null], value=hello2, offset=11147 
 INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11157 
 INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=0], [key=null], value=hello3, offset=11158 

 INFO - <<<------ MailConsumer-M1 : topic=topic-hc-mail, [partition=1], [key=null], value=hello2, offset=11147 
 INFO - <<<------ MailConsumer-M2 : topic=topic-hc-mail, [partition=2], [key=null], value=hello1, offset=21147 
 INFO - <<<------ MailConsumer-M2 : topic=topic-hc-mail, [partition=2], [key=null], value=hello4, offset=21148 
 INFO - <<<------ MailConsumer-M3 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11157 
 INFO - <<<------ MailConsumer-M3 : topic=topic-hc-mail, [partition=0], [key=null], value=hello3, offset=11158 

5.8.聊天室示例

上面的例子挺枯燥的,最后来个更形象的例子。

聊天室场景:

  • 聊天室有3个人:Jack、Paul和Lora。
  • Jack发送消息:新人报到,多多指教!
  • Lora发送消息:欢迎欢迎!!!
  • Paul发送消息:欢迎!另:入群请改名,谢谢!

这个场景其实上面已经基本实现了,这里就不细说了,直接给代码和结果 。

消费者代码

/**
 * <p>聊天室消费者</P>
 *
 * @author hanchao
 */
@Component
public class ChatConsumerPaul extends AbstractConsumer {

    /**
     * 聊天者:Paul
     */
    @KafkaListener(id = PERSON_PAUL, groupId = PERSON_PAUL, topics = {TOPIC_CHAT_ROOM})
    public void listenTopic2(ConsumerRecord<?, ?> record) {
        logRecord(PERSON_PAUL, record);
    }
}
/**
 * <p>聊天室消费者</P>
 *
 * @author hanchao
 */
@Component
public class ChatConsumerLora extends AbstractConsumer {

    /**
     * 聊天者:Lora
     */
    @KafkaListener(id = PERSON_LORA, groupId = PERSON_LORA, topics = {TOPIC_CHAT_ROOM})
    public void listenTopic1(ConsumerRecord<?, ?> record) {
        logRecord(PERSON_LORA, record);
    }
}
/**
 * <p>聊天室消费者</P>
 *
 * @author hanchao
 */
@Component
public class ChatConsumerPaul extends AbstractConsumer {

    /**
     * 聊天者:Paul
     */
    @KafkaListener(id = PERSON_PAUL, groupId = PERSON_PAUL, topics = {TOPIC_CHAT_ROOM})
    public void listenTopic2(ConsumerRecord<?, ?> record) {
        logRecord(PERSON_PAUL, record);
    }
}

测试代码

    /**
     * 聊天室
     */
    @Test
    public void testForChatRoom() throws Exception {
        simpleGet("/kafka/batch-send?topic=" + TOPIC_CHAT_ROOM + "&producer=" + PERSON_JACK + "&value=" + "新人报到,多多指教!");

        Thread.sleep(1000);
        simpleGet("/kafka/batch-send?topic=" + TOPIC_CHAT_ROOM + "&producer=" + PERSON_LORA + "&value=" + "欢迎欢迎!!!");

        Thread.sleep(1000);
        simpleGet("/kafka/batch-send?topic=" + TOPIC_CHAT_ROOM + "&producer=" + PERSON_PAUL + "&value=" + "欢迎!另:入群请改名,谢谢!");
    }

测试结果

 INFO - Jack--发送消息:新人报到,多多指教!0 

 INFO - Jack--收到消息:新人报到,多多指教!0 
 INFO - Paul--收到消息:新人报到,多多指教!0 
 INFO - Lora--收到消息:新人报到,多多指教!0 


 INFO - Lora--发送消息:欢迎欢迎!!!0 

 INFO - Jack--收到消息:欢迎欢迎!!!0 
 INFO - Lora--收到消息:欢迎欢迎!!!0 
 INFO - Paul--收到消息:欢迎欢迎!!!0 
 

 INFO - Paul--发送消息:欢迎!另:入群请改名,谢谢!0 

 INFO - Lora--收到消息:欢迎!另:入群请改名,谢谢!0 
 INFO - Jack--收到消息:欢迎!另:入群请改名,谢谢!0 
 INFO - Paul--收到消息:欢迎!另:入群请改名,谢谢!0 
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐