MQ: kafka的Java接入与入门示例(topic增删改查,Producer多参发送,Consumer多分区接受)
本文主要通过实际编码来对《MQ: 一张图读懂kafka工作原理》提到的部分原理进行验证与实现。相关文章参考:MQ: 消息队列常见应用场景及主流消息队列ActiveMQ、RabbitMQ、RocketMQ和Kafka的简单对比MQ: 一张图读懂kafka工作原理1.版本说明后续代码依赖于以下版本,其他版本不保证代码可用:kafka 服务版本:2.11-1.0.1kafka-cli...
本文主要通过实际编码来对《MQ: 一张图读懂kafka工作原理》提到的部分原理进行验证与实现。
相关文章参考:
1.版本说明
后续代码依赖于以下版本,其他版本不保证代码可用:
kafka
服务版本:2.11-1.0.1kafka-clients.jar
版本:2.2.0spring-kafka.jar
版本:1.3.5.RELEASEspring-boot
版本:1.5.10.RELEASE
2.kafka接入
pom.xml
先引入kafka的spring依赖包,这个包提供Producer和Consumer相关的操作。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
如果想进行Topic、Partition相关的操作,则引入下面的包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
application.properties
只给出最基本配置,如需调优,请自行增加其他配置。
spring.kafka.bootstrap-servers=127.0.0.1:9092
3.示例代码:Topic的增删改查
Toplic的增删改查需要通过AdminClient
操作,主要依赖kafka-clients.jar
。
3.1.获取kafka管理客户端
/**
* 获取kafka管理客户端
*/
private static AdminClient getKafkaAdminClient() {
Map<String, Object> props = new HashMap<>(1);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "10.126.153.155:9092");
return KafkaAdminClient.create(props);
}
3.2.获取全部topic名称
/**
* 获取全部topic名称
*/
private static Collection<String> getAllTopic(AdminClient client) throws InterruptedException, ExecutionException {
return client.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
}
3.3.显示指定Topic的详细配置
/**
* 显示指定topic的信息
*/
private static void showTopicInfo(AdminClient client, Collection<String> topics, List<String> wantedTopicList) throws InterruptedException, ExecutionException {
client.describeTopics(topics).all().get().forEach((topic, description) -> {
if (wantedTopicList.contains(topic)) {
log.info("==== Topic {} Begin ====", topic);
for (TopicPartitionInfo partition : description.partitions()) {
log.info(partition.toString());
}
log.info("==== Topic {} End ====", topic);
}
});
}
3.4.新建Topic
可以一次性创建多个Topic,每个topic需要指定名称、Partition数量和Replicas数量。
Replicas数量不能超过broker数量。本文使用的kafka只有一个broker。
//创建topic:副本数不能超过broker数量
client.createTopics(Lists.newArrayList(
//聊天室 3分区
new NewTopic(TOPIC_CHAT_ROOM, 3, (short) 1),
//邮件 3分区
new NewTopic(TOPIC_MAIL, 3, (short) 1),
//短信 1分区
new NewTopic(TOPIC_SMS, 1, (short) 1)
));
3.5.删除Topic
可以一次性删除多个Topic。
client.deleteTopics(Lists.newArrayList("topic-send-sms","topic-send-mail"));
3.6.修改Topic
无法已经存在的Topic的分区数量等配置,只能删掉之后重建。
3.7.测试代码与运行结果
/**
* 聊天室 3分区
*/
public static final String TOPIC_CHAT_ROOM = "topic-hc-chat-room";
public static final String PERSON_LORA = "Lora";
public static final String PERSON_JACK = "Jack";
public static final String PERSON_PAUL = "Paul";
/**
* 邮件 3分区
*/
public static final String TOPIC_MAIL = "topic-hc-mail";
public static final String CONSUMER_GROUP_MAIL_1 = "MailConsumer-Group-1";
public static final String CONSUMER_MAIL = "MailConsumer-ALL";
public static final String CONSUMER_GROUP_MAIL_2 = "MailConsumer-Group-2";
public static final String CONSUMER_MAIL_PARTITION_0 = "MailConsumer-P0";
public static final String CONSUMER_MAIL_PARTITION_12 = "MailConsumer-P1,P2";
public static final String CONSUMER_GROUP_MAIL_3 = "MailConsumer-Group-3";
public static final String CONSUMER_MAIL_MULTI_0 = "MailConsumer-M0";
public static final String CONSUMER_MAIL_MULTI_1 = "MailConsumer-M1";
public static final String CONSUMER_MAIL_MULTI_2 = "MailConsumer-M2";
public static final String CONSUMER_MAIL_MULTI_3 = "MailConsumer-M3";
/**
* 短信 1分区
*/
public static final String TOPIC_SMS = "topic-hc-sms";
public static final String CONSUMER_SMS = "SmsConsumer";
/**
* 显示、创建、删除topic
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
//获取kafka管理客户端
AdminClient client = getKafkaAdminClient();
//查询全部topic
Collection<String> topics = getAllTopic(client);
//创建topic:副本数不能超过broker数量
client.createTopics(Lists.newArrayList(
//聊天室 3分区
new NewTopic(TOPIC_CHAT_ROOM, 3, (short) 1),
//邮件 3分区
new NewTopic(TOPIC_MAIL, 3, (short) 1),
//短信 1分区
new NewTopic(TOPIC_SMS, 1, (short) 1)
));
//查询topic详情
List<String> wantedTopicList = Lists.newArrayList(TOPIC_CHAT_ROOM, TOPIC_MAIL, TOPIC_SMS);
showTopicInfo(client, topics, wantedTopicList);
//删除topic:想要修改topic的配置如分区等需要删掉重建
client.deleteTopics(Lists.newArrayList("topic-send-sms","topic-send-mail"));
}
运行结果
运行结果显示了Topic的Partition、Leader、Replicas和isr配置。
INFO - Kafka version: 2.2.0
INFO - Kafka commitId: 05fcfde8f69b0349
INFO - ==== Topic topic-hc-chat-room Begin ====
INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - (partition=1, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - (partition=2, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - ==== Topic topic-hc-chat-room End ====
INFO - ==== Topic topic-hc-mail Begin ====
INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - (partition=1, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - (partition=2, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - ==== Topic topic-hc-mail End ====
INFO - ==== Topic topic-hc-sms Begin ====
INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - ==== Topic topic-hc-sms End ====
4.示例代码:Producer发送消息
发送端的逻辑比较清晰,只需要主要发送时传递的必填与选填参数即可。
参考上一篇文章,消息发送参数:topic必填、partition选填、key选填、message必填。
根据这些参数,将消息发送至哪个Topic-Partition的路由规则,还是去参考上一篇文章。
下面通过一个API来展示:
/**
* <p>生产者</P>
*
* @author hanchao
*/
@Slf4j
@RestController
public class ProducerController {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送消息
*/
@GetMapping("/kafka/batch-send")
public boolean batchSendMessage(@RequestParam(required = false) String producer,
@RequestParam String topic,
@RequestParam(required = false) Integer partition,
@RequestParam(required = false) String key,
@RequestParam String value,
@RequestParam(required = false) Integer batch) {
producer = Objects.isNull(producer) ? "Message Producer" : producer;
batch = Objects.isNull(batch) ? 1 : batch;
for (int i = 0; i < batch; i++) {
sendMessage(producer, topic, partition, key, value + i);
}
return true;
}
/**
* 发送消息
*/
private void sendMessage(String producer, String topic, Integer partition, String key, String value) {
log.info("======>>> " + producer + ": topic={}, [partition={}], [key={}], value={}", topic, partition, key, value);
// log.info("{}--发送消息:{}", producer, value);
if (Objects.nonNull(partition)) {
if (StringUtils.isNotEmpty(key)) {
kafkaTemplate.send(topic, partition, key, value);
} else {
kafkaTemplate.send(topic, partition, null, value);
}
} else {
if (StringUtils.isNotEmpty(key)) {
kafkaTemplate.send(topic, key, value);
} else {
kafkaTemplate.send(topic, value);
}
}
}
}
5.示例代码:Consumer消费消息
相对于Producer
,Consumer
的逻辑相对复杂,因为涉及Consumer Group
的概念。
在本人的开发版本中,通过注解@KafkaListener
的groupId
参数设置Consumer Group
,通过id
参数标记Consumer
。
为了方便打印日志,简单定义一个抽象类:
@Slf4j
public abstract class AbstractConsumer {
/**
* 打印消息
*/
public void logRecord(String name, ConsumerRecord<?, ?> record) {
log.info("<<<------ " + name + " : topic={}, [partition={}], [key={}], value={}, offset={}",record.topic(), record.partition(), record.key(), record.value(), record.offset());
log.info("{}--收到消息:{}", name, record.value());
}
}
5.1.Consumer Group只有1个Consumer
下面的Consumer Group只有1个Consumer。
/**
* <p>邮件-3个分区-Consumer Group只有1个Consumer</P>
*
* @author hanchao
*/
@Component
public class MailConsumer extends AbstractConsumer {
@KafkaListener(id = KafkaClientDemo.CONSUMER_MAIL, groupId = KafkaClientDemo.CONSUMER_GROUP_MAIL_1, topics = {KafkaClientDemo.TOPIC_MAIL})
public void listenTopicForSms0(ConsumerRecord<?, ?> record) {
logRecord(KafkaClientDemo.CONSUMER_MAIL, record);
}
}
测试代码
/**
* 保证消息被处理完
*/
@After
public void tearDown() throws Exception {
Thread.sleep(5000);
}
/**
* http get 请求 test 的简单封装
*/
public void simpleGet(String url) throws Exception {
mockMvc.perform(MockMvcRequestBuilders.get(url))
.andExpect(MockMvcResultMatchers.status().isOk())
.andDo(MockMvcResultHandlers.print())
.andReturn().getResponse().getContentAsString();
}
/**
* 短信,一个分区,一个Consumer
* producer 无需传参 partition和key
* consumer 无需指定group
*/
@Test
public void testForSimple() throws Exception {
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&value=hello&batch=5");
}
测试结果
- 若Consumer-Group只有1个Consumer,则这个Partition中的所有消息都被这个Consumer消费。
- Producer生产的 5条消息,被随机分配给了3个partition。
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello0
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello1
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello2
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello3
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello4
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=1], [key=null], value=hello2, offset=11147
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11157
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=null], value=hello3, offset=11158
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=null], value=hello1, offset=21147
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=null], value=hello4, offset=21148
5.2.Consumer Group只有1个Consumer - 指定partition发送
上面的例子中,partition和key未传参,最终的消息随机分布在partition中。
下面,指定partition的值:
/**
* 指定分区进行分发
*/
@Test
public void testForPartition() throws Exception {
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=0&value=hello&batch=1");
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=1&value=hello&batch=1");
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=2&value=hello&batch=1");
}
**测试结果:**消息按照设想,分别被存放到了分区0、1、2 。
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=0], [key=null], value=hello0
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=1], [key=null], value=hello0
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=2], [key=null], value=hello0
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11159
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=1], [key=null], value=hello0, offset=11148
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=null], value=hello0, offset=21149
5.3.Consumer Group只有1个Consumer - 指定keyf发送
如果不指定partition,只是指定key呢?看下面的代码:
/**
* 指定key进行分发
* key的作用是为消息选择存储分区,key可以为空。
* 当指定key且不为空的时候,kafka是根据key的hash值与分区数取模来决定数据存储到那个分区。
* 当key=null时,kafka是先从缓存中取分区号,然后判断缓存的值是否为空,如果不为空,就将消息存到这个分区,否则随机选择一个分区进行存储,并将分区号缓存起来,供下次使用。
*/
@Test
public void testForKey() throws Exception {
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&key=9&value=hello&batch=1");
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&key=10&value=hello&batch=1");
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&key=11&value=hello&batch=1");
}
测试结果
key的作用是为消息选择存储分区,key可以为空。
当指定key且不为空的时候,kafka是根据key的hash值与分区数取模来决定数据存储到那个分区。
当key=null时,kafka是先从缓存中取分区号,然后判断缓存的值是否为空,如果不为空,就将消息存到这个分区,否则随机选择一个分区进行存储,并将分区号缓存起来,供下次使用。
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=9], value=hello0
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=10], value=hello0
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=11], value=hello0
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=9], value=hello0, offset=21150
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=1], [key=10], value=hello0, offset=11149
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=11], value=hello0, offset=11160
5.4.Consumer Group有多个Consumer - 手动指定分区消费
下面的Consumer Group有2个Consumer,其中Consumer0指定接收Partition0的消息,Consumer1指定接收Partition1和2的消息。
/**
* <p>邮件-3个分区-分区消费</P>
*
* @author hanchao
*/
@Component
public class MailPartitionConsumer extends AbstractConsumer {
/**
* 分区消费能够加快消息消费速度
* 此Consumer只消费分区0的数据
*/
@KafkaListener(id = CONSUMER_MAIL_PARTITION_0, groupId = CONSUMER_GROUP_MAIL_2, topicPartitions = {@TopicPartition(topic = TOPIC_MAIL, partitions = "0")})
public void listenTopicForSms0(ConsumerRecord<?, ?> record) {
logRecord(CONSUMER_MAIL_PARTITION_0, record);
}
/**
* 分区消费能够加快消息消费速度
* 此Consumer消费分区1,2的数据
*/
@KafkaListener(id = CONSUMER_MAIL_PARTITION_12, groupId = CONSUMER_GROUP_MAIL_2, topicPartitions = {@TopicPartition(topic = TOPIC_MAIL, partitions = {"1", "2"})})
public void listenTopicForSms1(ConsumerRecord<?, ?> record) {
logRecord(CONSUMER_MAIL_PARTITION_12, record);
}
}
测试代码:
/**
* 指定分区进行分发
*/
@Test
public void testForPartition() throws Exception {
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=0&value=hello&batch=1");
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=1&value=hello&batch=1");
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&partition=2&value=hello&batch=1");
}
测试结果
- MailConsumer-P0确实只消费了partition=0的消息, MailConsumer-P1,P2消费了partition=1,2的消息。
- 分区消费的好处是:同一个Consumer接收消息时串行的,多个Consumer同时接收多个分区的消息,能够加快消息接收速度。
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=0], [key=null], value=hello0
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=1], [key=null], value=hello0
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=2], [key=null], value=hello0
INFO - <<<------ MailConsumer-P0 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11152
INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=1], [key=null], value=hello0, offset=11142
INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=2], [key=null], value=hello0, offset=21142
特别注意
如果Topic共3个分区,却在编码时,只指定了2个分区会怎样?
将 id = CONSUMER_MAIL_PARTITION_12 的partition参数由{“1”, “2”}修改为{“1”},修改之后代码如下:
/**
* 分区消费能够加快消息消费速度
* 此Consumer消费分区1的数据
*/
@KafkaListener(id = CONSUMER_MAIL_PARTITION_12, groupId = CONSUMER_GROUP_MAIL_2, topicPartitions = {@TopicPartition(topic = TOPIC_MAIL, partitions = {"1"})})
public void listenTopicForSms1(ConsumerRecord<?, ?> record) {
logRecord(CONSUMER_MAIL_PARTITION_12, record);
}
再次运行,测试结果如下:partition=2的消息没有被消费!!!
INFO - <<<------ MailConsumer-P0 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11154
INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=1], [key=null], value=hello0, offset=11144
所以:
手动设置partition可能会导致部分分区的数据被遗忘
,不建议手动设置,推荐下一节的自动配置方式。
5.5.Consumer Group有多个Consumer + 自动分区消费
自动分区消费配置方式也很简单,只需要去掉partition属性,只保留topic配置即可:
/**
* <p>邮件-3个分区-分区消费</P>
*
* @author hanchao
*/
@Component
public class MailPartitionConsumer extends AbstractConsumer {
/**
* 分区消费能够加快消息消费速度
* 此Consumer只消费分区0的数据
*/
@KafkaListener(id = CONSUMER_MAIL_PARTITION_0, groupId = CONSUMER_GROUP_MAIL_2, topicPattern = TOPIC_MAIL)
public void listenTopicForSms0(ConsumerRecord<?, ?> record) {
logRecord(CONSUMER_MAIL_PARTITION_0, record);
}
/**
* 分区消费能够加快消息消费速度
* 此Consumer消费分区1的数据
*/
@KafkaListener(id = CONSUMER_MAIL_PARTITION_12, groupId = CONSUMER_GROUP_MAIL_2, topicPattern = TOPIC_MAIL)
public void listenTopicForSms1(ConsumerRecord<?, ?> record) {
logRecord(CONSUMER_MAIL_PARTITION_12, record);
}
}
再次运行结果:
- kafka自动将Consumer Group接受的消息分配给其内的Consumer们。
- 多个Partition的消息可以被一个Consumer消费。
- 单个Partition的消息只能被其中一个Consumer消费,不能被Consumer-Group内的多个Consumer消费。
INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11156
INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=1], [key=null], value=hello0, offset=11146
INFO - <<<------ MailConsumer-P0 : topic=topic-hc-mail, [partition=2], [key=null], value=hello0, offset=21146
5.6.Consumer Group有多个Consumer + Consumer过多
上面场景中,消息partition=3,而consumer=2。如果Consumer数量大于partition数量呢?
下面的Consumer Group中,存在4个Consumer:
/**
* <p>邮件-3个分区-Consumer数量大于分区数量</P>
*
* @author hanchao
*/
@Component
public class MailMultiConsumer extends AbstractConsumer {
/**
* 如果Consumer-Group组中的Consumer数量多于分区数量,则在服务稳定运行期间,会有Consumer永远无法消费消息
*/
@KafkaListener(id = CONSUMER_MAIL_MULTI_0, groupId = CONSUMER_GROUP_MAIL_3, topics = {TOPIC_MAIL})
public void listenTopic0(ConsumerRecord<?, ?> record) {
logRecord(CONSUMER_MAIL_MULTI_0, record);
}
@KafkaListener(id = CONSUMER_MAIL_MULTI_1, groupId = CONSUMER_GROUP_MAIL_3, topics = {TOPIC_MAIL})
public void listenTopic1(ConsumerRecord<?, ?> record) {
logRecord(CONSUMER_MAIL_MULTI_1, record);
}
@KafkaListener(id = CONSUMER_MAIL_MULTI_2, groupId = CONSUMER_GROUP_MAIL_3, topics = {TOPIC_MAIL})
public void listenTopic2(ConsumerRecord<?, ?> record) {
logRecord(CONSUMER_MAIL_MULTI_2, record);
}
@KafkaListener(id = CONSUMER_MAIL_MULTI_3, groupId = CONSUMER_GROUP_MAIL_3, topics = {TOPIC_MAIL})
public void listenTopic3(ConsumerRecord<?, ?> record) {
logRecord(CONSUMER_MAIL_MULTI_3, record);
}
}
测试结果
- 每次运行,总有1个分区不会获取消息。
- 也就是说:若单个Topic的分区数量小于Consumer-Group内的Consumer个数,则会存在Consumer接受不到这个Topic的消息。
INFO - <<<------ MailConsumer-M3 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11156
INFO - <<<------ MailConsumer-M1 : topic=topic-hc-mail, [partition=1], [key=null], value=hello0, offset=11146
INFO - <<<------ MailConsumer-M2 : topic=topic-hc-mail, [partition=2], [key=null], value=hello0, offset=21146
5.7.多Consumer Group
上面的场景都是针对单个Consumer Group,现在对上述3个Consumer Group进行同时测试:
@Test
public void testForSimple() throws Exception {
simpleGet("/kafka/batch-send?topic=" + TOPIC_MAIL + "&value=hello&batch=5");
}
测试结果(为了方便查看,测试结果的日志顺序进行了调整):
- Producer共计生产了5条消息,每个Consumer-Group分别收到了5条消息。
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello0
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello1
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello2
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello3
INFO - ======>>> Message Producer: topic=topic-hc-mail, [partition=null], [key=null], value=hello4
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=1], [key=null], value=hello2, offset=11147
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11157
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=0], [key=null], value=hello3, offset=11158
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=null], value=hello1, offset=21147
INFO - <<<------ MailConsumer-ALL : topic=topic-hc-mail, [partition=2], [key=null], value=hello4, offset=21148
INFO - <<<------ MailConsumer-P0 : topic=topic-hc-mail, [partition=2], [key=null], value=hello1, offset=21147
INFO - <<<------ MailConsumer-P0 : topic=topic-hc-mail, [partition=2], [key=null], value=hello4, offset=21148
INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=1], [key=null], value=hello2, offset=11147
INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11157
INFO - <<<------ MailConsumer-P1,P2 : topic=topic-hc-mail, [partition=0], [key=null], value=hello3, offset=11158
INFO - <<<------ MailConsumer-M1 : topic=topic-hc-mail, [partition=1], [key=null], value=hello2, offset=11147
INFO - <<<------ MailConsumer-M2 : topic=topic-hc-mail, [partition=2], [key=null], value=hello1, offset=21147
INFO - <<<------ MailConsumer-M2 : topic=topic-hc-mail, [partition=2], [key=null], value=hello4, offset=21148
INFO - <<<------ MailConsumer-M3 : topic=topic-hc-mail, [partition=0], [key=null], value=hello0, offset=11157
INFO - <<<------ MailConsumer-M3 : topic=topic-hc-mail, [partition=0], [key=null], value=hello3, offset=11158
5.8.聊天室示例
上面的例子挺枯燥的,最后来个更形象的例子。
聊天室场景:
- 聊天室有3个人:Jack、Paul和Lora。
- Jack发送消息:新人报到,多多指教!
- Lora发送消息:欢迎欢迎!!!
- Paul发送消息:欢迎!另:入群请改名,谢谢!
这个场景其实上面已经基本实现了,这里就不细说了,直接给代码和结果 。
消费者代码
/**
* <p>聊天室消费者</P>
*
* @author hanchao
*/
@Component
public class ChatConsumerPaul extends AbstractConsumer {
/**
* 聊天者:Paul
*/
@KafkaListener(id = PERSON_PAUL, groupId = PERSON_PAUL, topics = {TOPIC_CHAT_ROOM})
public void listenTopic2(ConsumerRecord<?, ?> record) {
logRecord(PERSON_PAUL, record);
}
}
/**
* <p>聊天室消费者</P>
*
* @author hanchao
*/
@Component
public class ChatConsumerLora extends AbstractConsumer {
/**
* 聊天者:Lora
*/
@KafkaListener(id = PERSON_LORA, groupId = PERSON_LORA, topics = {TOPIC_CHAT_ROOM})
public void listenTopic1(ConsumerRecord<?, ?> record) {
logRecord(PERSON_LORA, record);
}
}
/**
* <p>聊天室消费者</P>
*
* @author hanchao
*/
@Component
public class ChatConsumerPaul extends AbstractConsumer {
/**
* 聊天者:Paul
*/
@KafkaListener(id = PERSON_PAUL, groupId = PERSON_PAUL, topics = {TOPIC_CHAT_ROOM})
public void listenTopic2(ConsumerRecord<?, ?> record) {
logRecord(PERSON_PAUL, record);
}
}
测试代码
/**
* 聊天室
*/
@Test
public void testForChatRoom() throws Exception {
simpleGet("/kafka/batch-send?topic=" + TOPIC_CHAT_ROOM + "&producer=" + PERSON_JACK + "&value=" + "新人报到,多多指教!");
Thread.sleep(1000);
simpleGet("/kafka/batch-send?topic=" + TOPIC_CHAT_ROOM + "&producer=" + PERSON_LORA + "&value=" + "欢迎欢迎!!!");
Thread.sleep(1000);
simpleGet("/kafka/batch-send?topic=" + TOPIC_CHAT_ROOM + "&producer=" + PERSON_PAUL + "&value=" + "欢迎!另:入群请改名,谢谢!");
}
测试结果
INFO - Jack--发送消息:新人报到,多多指教!0
INFO - Jack--收到消息:新人报到,多多指教!0
INFO - Paul--收到消息:新人报到,多多指教!0
INFO - Lora--收到消息:新人报到,多多指教!0
INFO - Lora--发送消息:欢迎欢迎!!!0
INFO - Jack--收到消息:欢迎欢迎!!!0
INFO - Lora--收到消息:欢迎欢迎!!!0
INFO - Paul--收到消息:欢迎欢迎!!!0
INFO - Paul--发送消息:欢迎!另:入群请改名,谢谢!0
INFO - Lora--收到消息:欢迎!另:入群请改名,谢谢!0
INFO - Jack--收到消息:欢迎!另:入群请改名,谢谢!0
INFO - Paul--收到消息:欢迎!另:入群请改名,谢谢!0
更多推荐
所有评论(0)