springboot整合rocketMq、rocketMq批量发送、消费、rocketMq手动发送和监听
RocketMQ 是一款高性能、高吞吐量、低延迟的消息中间件。由阿里出品,后来捐赠给开源组织Apache。一般用于流量削峰填谷、异步通信、系统之间异步解耦、顺序消息、定时消息、事务消息等场景。RocketMQ5.0还支持类使用kafka的流处理。
·
1. 简介
RocketMQ 是一款高性能、高吞吐量、低延迟的消息中间件。由阿里出品,后来捐赠给开源组织Apache。一般用于流量削峰填谷、异步通信、系统之间异步解耦、顺序消息、定时消息、事务消息等场景。RocketMQ5.0还支持类使用kafka的流处理。
2. 基本概念说明
- NameServer: NameServer是RocketMQ的路由中心,用于Broker服务的注册和发现
- Broker: Broker主要负责消息的存储、投递和查询以及服务高可用保证。
- topic: 主题是 Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。大白话就是生产者发送消息后消息存储的位置和消费者要消费的消息存在容器里,这个容器官方取了名字叫topic。
- 队列: 一个Topic下会由一到多个队列来存储消息。大白话就是,topic的容器比较大表示一类消息,队列是在这一类消息的基础上做了区分,队列一般配合tag使用。
- Tag: 消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类
- 生产者: 消息的生产者
- 生产者组: 代表着一群topic相同的Producer。即一个生产者组是同一类Producer的组合,主要用于事务消息。如果Producer是TransactionMQProducer,则发送的是事务消息。如果节点1发送完消息后,消息存储到broker的Half Message Queue中,还未存储到目标topic的queue中时,此时节点1崩溃,则可以通过同一Group下的节点2进行二阶段提交,或回溯。
- Message: 生产者向Topic发送并最终传送给消费者的数据消息的载体。
消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。 - Message Key: 消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。
- Message ID: 消息的全局唯一标识,由消息队列RocketMQ系统自动生成,唯一标识某条消息。
- 普通消息: 发送的一般消息
- 顺序消息: 对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
- 延迟消息: 延迟消息发送是指消息发送到Apache RocketMQ后,延迟一定时间后才投递到Consumer进行消费。
- 消费者: 消息的消费者
- Consumer Group: 消费者的分组,同一个消费者组内的消费者,可以消费同一个topic或tag消息。
- 订阅: 消费者组和topic/tag建立关联关系。注意同一个消费者组,不能订阅不同topic,否则会有问题。
- 死信队列: 死信队列用于处理无法被正常消费的消息。当一条消息消费失败后,RocketMQ会自动进行消息重试;达到最大重试次数后依然消费失败,则RocketMQ会将消息发送到对应的死信队列(一般是%DLQ%消费者组ID)。
3. springboot整合
3.1 说明
已有springboot项目且已经能启动;本文基于RocketMQ2.0.4;
3.2 导包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>2.2.2</version>
</dependency>
3.3 application配置文件添加
rocketmq:
name-server: 192.168.1.224:9876 # 自己的访问地址,如果是集群,集群之间可以使用;分割如ip:端口;ip:端口;ip:端口
producer:
group: test_procuct_group # 生产者组group,事务消息重试需要
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
3.4 生产者
@Component
public class MQProducerService {
// 建议同一类业务用一个TOPIC,不同的小类型可以用tag区分
private static final String TOPIC = "TEST_TOPIC";
private static final String TAG = "TEST_TAG";
//超时时间,重试需要
@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送单向消息
* 只负责发送消息,不等待应答,不关心发送结果,如日志
*/
public void sendOneWayMsg(String message) {
rocketMQTemplate.sendOneWay(TOPIC, MessageBuilder.withPayload(message).build());
}
/**
* 普通发送 message可以是对象
*/
public void send(String message) {
// 包含tag,=,包含tag的方式都是 TOPIC+":"+TAG 格式
rocketMQTemplate.send(TOPIC + ":" + TAG, MessageBuilder.withPayload(message).build());
//不包含tag
rocketMQTemplate.send(TOPIC, MessageBuilder.withPayload(message).build());
}
/**
* 发送同步消息
* message也可以是对象,sendResult为返回的发送结果
*/
public SendResult sendMsg(String message) {
// 等待结果返回,此处会阻塞直到结果返回
SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, MessageBuilder.withPayload(message).build());
// 真实业务,请不要使用此种日志打印方式,会影响线上的性能
System.out.println(sendResult);
return sendResult;
}
/**
* 发送异步消息
* message也可以是对象,结果回调
*/
public void sendAsyncMsg(String message) {
rocketMQTemplate.asyncSend(TOPIC, MessageBuilder.withPayload(message).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
/*
* 处理消息发送成功逻辑,
* SendResult: 结果有两种状态,如果业务需要区分请判断
* /
}
@Override
public void onException(Throwable throwable) {
// 处理消息发送异常逻辑
}
});
}
/**
* 发送延时消息
* delayLevel=0表示不延迟;
* 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendDelayMsg(String message, int delayLevel) {
rocketMQTemplate.syncSend(TOPIC, MessageBuilder.withPayload(message).build(), messageTimeOut, delayLevel);
}
}
3.5消费者
/**
* topic = 生产者的topic
* selectorExpression 生产者的tag
* consumerGroup 消费者组
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "TEST_TOPIC", selectorExpression = "TAG", consumerGroup = "test_consumer_group")
// 泛型为发送的消息类型
public class MQConsumerService implements RocketMQListener<String> {
// 监听到消息就会执行此方法
@Override
public void onMessage(String message) {
log.info("监听到消息:message={}", message);
}
}
4. 手动发送 & 批量发送
说明: RocketMQ批量发送实现方式主要通过自身提供的 批量接口。有两种方式, 一般使用DefaultMQProducer 发送。通过看源码得知,最终将List 包装为继承了Message的MessageBatch,由此推出使用RocketMQTemplate 发送参数传递为在限制内的MessageBatch也是可以实现批量发送的。下文采用DefaultMQProducer演示。
4.1 生产者
package com.yzd.rpa.server.web;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
//导入 RocketMqListSplitter
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
@Component
public class MQBatchProducerService {
// 建议同一类业务用一个TOPIC,不同的小类型可以用tag区分
private static final String TOPIC = "TEST_TOPIC";
private static final String TAG = "TEST_TAG";
public void sendBatch(List<String> messages) {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
//自己的访问地址,如果是集群,集群之间可以使用;分割如ip:端口;ip:端口;ip:端口
defaultMQProducer.setNamesrvAddr("192.168.1.224:9876");
// 生产者组group
defaultMQProducer.setProducerGroup("test_procuct_group");
// 消息超时时间
defaultMQProducer.setSendMsgTimeout(3000);
//同步发送重试次数
defaultMQProducer.setRetryTimesWhenSendFailed(1);
//异步发送重试次数
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(1);
//最大消息大小 默认4M
defaultMQProducer.setMaxMessageSize(1024 * 1024 * 4);
//消息压缩阈值 默认4k
defaultMQProducer.setCompressMsgBodyOverHowmuch(1024 * 4);
try {
defaultMQProducer.start();
} catch (Exception e) {
throw new RuntimeException("启动生产者失败!");
}
//消息封装为 Message
List<Message> messageList = messages.stream().map(item -> {
Message message = new Message();
message.setTopic(TOPIC);
message.setTags(TAG);
message.setBody(item.getBytes(StandardCharsets.UTF_8));
return message;
}).collect(Collectors.toList());
//发送消息 RocketMQ 最大默认大小4M
RocketMqListSplitter rocketMqMapSplitter = new RocketMqListSplitter(messages, 1024 * 1024 * 4);
while (rocketMqMapSplitter.hasNext()) {
List<Message> next = rocketMqMapSplitter.next();
try {
defaultMQProducer.send(next, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//发送成功回调
}
@Override
public void onException(Throwable e) {
//发送失败回调
}
});
} catch (Exception e) {
//异常回调
}
}
//关闭资源
defaultMQProducer.shutdown();
}
}
4.2 消息切分类
//包名
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageDecoder;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class RocketMqListSplitter implements Iterator<List<Message>> {
private final List<Message> messages;
private final int maxLimit;
// 要进行批量发送消息的小集合起始索引
private int currIndex;
public RocketMqListSplitter(List<Message> messages, Integer maxLimit) {
this.messages = messages;
if (maxLimit == null) {
// 默认每次最大发送4M
this.maxLimit = 1024 * 1024 * 4;
} else {
this.maxLimit = maxLimit;
}
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
// 记录当前要发送的这一小批次消息列表的大小
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
// 获取当前遍历的消息
Message message = messages.get(nextIndex);
//设置唯一ID,需要先设置再计算大小
MessageClientIDSetter.setUniqID(message);
// 统计当前遍历的message的大小, 使用mq自带的计算方式,不然大小计算可能错误
int tmpSize = MessageDecoder.encodeMessage(message).length;
// 判断当前消息本身是否大于限制
if (tmpSize > maxLimit) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > maxLimit) {
break;
} else {
totalSize += tmpSize;
}
}
// end-for
// 获取当前messages列表的子集合[currIndex, nextIndex)
List<Message> subList = messages.subList(currIndex, nextIndex);
// 下次遍历的开始索引
currIndex = nextIndex;
return subList;
}
}
5. 手动监听&批量消费
说明: 1. 手动拉取消费,此种方式不方便;2. 借助spring使用监听
5.1 手动监听
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
public class MQBatchConsumerService {
// 建议同一类业务用一个TOPIC,不同的小类型可以用tag区分
private static final String TOPIC = "TEST_TOPIC";
private static final String TAG = "TEST_TAG";
public static void main(String[] args) throws Exception {
//消费者组
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("test_procuct_group");
//自己的访问地址,如果是集群,集群之间可以使用;分割如ip:端口;ip:端口;ip:端口
litePullConsumer.setNamesrvAddr("192.168.1.224:9876");
//设置订阅的topic
litePullConsumer.subscribe(TOPIC, TAG);
//每次拉取的数量
litePullConsumer.setPullBatchSize(20);
//启动监听
litePullConsumer.start();
try {
//监听
while (true) {
List<MessageExt> messageExts = litePullConsumer.poll();
//睡眠
TimeUnit.SECONDS.sleep(5);
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
// 包名
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class MQBatchConsumer {
// 建议同一类业务用一个TOPIC,不同的小类型可以用tag区分
private static final String TOPIC = "TEST_TOPIC";
private static final String TAG = "TEST_TAG";
public static void main(String[] args) throws Exception {
MQBatchConsumer batchConsumer = new MQBatchConsumer();
batchConsumer.listenerDefaultMQPushConsumer();
}
public void listenerDefaultMQPushConsumer() {
DefaultMQPushConsumer mqPullConsumer = new DefaultMQPushConsumer(consumerGroup);
//自己的访问地址,如果是集群,集群之间可以使用;分割如ip:端口;ip:端口;ip:端口
mqPullConsumer.setNamesrvAddr("192.168.1.224:9876");
mqPullConsumer.setConsumerGroup("test_procuct_group");
mqPullConsumer.setPullBatchSize(10);
try {
mqPullConsumer.subscribe(TOPIC, TAG);
mqPullConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
List<String> contents = msgs.stream().map(item -> new String(item.getBody(), StandardCharsets.UTF_8)).collect(Collectors.toList());
//业务处理
System.out.println(contents);
} catch (Exception e) {
}
});
mqPullConsumer.start();
/*
* 监听的topic需要已经存在,
* 如果不存在不会报错
* 可以使用 mqPullConsumer.fetchSubscribeMessageQueues(topic); 获取topic信息,如果topic不存在会报错
* /
//睡眠
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
throw new RuntimeException("服务连接失败");
mqPullConsumer.shutdown();
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)