spring-boot 2.3.x 整合rocketmq
spring-boot 2.3.x整合rocketmq文章目录spring-boot 2.3.x整合rocketmq1、rocketMq的安装(docker形式)1.1、docker-compose.yml2、构建一个rocketMq的项目.1、定义yml配置文件,关于kafka的参数的详细定义,可以参考3.2、生产者测试代码3.3、定义消费端代码3.4、主启动类本地项目的基础环境环境版本jdk1
spring-boot 2.3.x 整合rocketmq
文章目录
本地项目的基础环境
环境 | 版本 |
---|---|
jdk | 1.8.0_201 |
maven | 3.6.0 |
Spring-boot | 2.3.3.RELEASE |
1、rocketMq的安装(docker形式)
这里使用docker,做一个快速的单机版本安装,需要更详细的其他形式的安装,可以查看其他的相关资料或者官网地址
https://github.com/apache/rocketmq-docker
《docker环境下安装rockermq以及rockermq-console》
1.1、docker-compose.yml
version: '3'
services:
namesrv:
image: apacherocketmq/rocketmq
container_name: namesrv
ports:
- 9876:9876
volumes:
- ./data/namesrv/logs:/home/rocketmq/logs
command: sh mqnamesrv
broker:
image: apacherocketmq/rocketmq
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- ./data/broker/logs:/home/rocketmq/logs
- ./data/broker/store:/home/rocketmq/store
- ./data/broker/broker.conf:/home/rocketmq/rocketmq-4.6.0/conf/broker.conf
command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
depends_on:
- namesrv
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8080:8080
environment:
JAVA_OPTS: -Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
depends_on:
- namesrv
2、构建一个rocketMq的项目
主要是导入rocketmq-spring-boot-starter
的包,以及spring-boot-starter-web
的包;
导入web的包,是再消费的时候,没有守护线程,程序启动后,就会自动退出,导入web包后,tomcat容器启动,消费的线程就不退出了;
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.badger</groupId>
<artifactId>badger-spring-boot-rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>badger-spring-boot-rocketmq</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.1、定义yml配置文件
rocketmq:
name-server: localhost:9876
producer:
group: test-producer-group
org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.class
具体更可以参看配置类
部分详细配置如下:
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv
# Producer 配置项
producer:
group: test-producer-group # 生产者分组
send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
secret-key: # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
# Consumer 配置项
consumer:
listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
test-consumer-group:
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
3.2、生产者测试代码
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { RocketmqApplicaltion.class })
public class TestApp {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void syncSend() {
for (int i = 0; i < 100; i++) {
// 同步消息
Message<String> bashMessage = new GenericMessage<String>("test_producer" + i);
SendResult syncSend = rocketMQTemplate.syncSend("test_producer", bashMessage);
System.out.println(syncSend);
}
}
@Test
public void asyncSend() {
for (int i = 0; i < 100; i++) {
// 异步消息
Message<String> message = new GenericMessage<String>("test_producer" + i);
rocketMQTemplate.asyncSend("test_producer", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败");
}
});
}
}
@Test
public void sendOneWay() {
for (int i = 0; i < 100; i++) {
// 单向发送消息
Message<String> message = new GenericMessage<String>("test_producer" + i);
rocketMQTemplate.sendOneWay("test_producer", message);
System.out.println("只发送一次");
}
}
@Test
public void syncSendOrder() {
// 发送有序消息
String[] tags = new String[] { "TagA", "TagC", "TagD" };
for (int i = 0; i < 10; i++) {
// 加个时间前缀
Message<String> message = new GenericMessage<String>("我是顺序消息" + i);
SendResult sendResult = rocketMQTemplate.syncSendOrderly("test_producer:" + tags[i % tags.length], message,
i + "");
System.out.println(sendResult);
}
}
}
注意:
1、跟原生rocketmq Api对比,rocketMQ start 的做了二次的封装,把同步异步发送消息,用方法名称做了区别;相同的是,无论是原生的api还是二次封装的api,异步调用的时候,回调是在参数体里的,毕竟异步发送需要等待回调,而同步发送可以只有回调。
2、顺序消息:严格顺序消息模式下,消费者收到的所有消息均是有顺序的
发送消息的时候,消息被存储在MessageQueue
队列里的,默认的时候,是4个队列;为了保证消息的顺序,是需要把相同业务的数据按照顺序写入对应的队列中,单个队列下,数据是严格有序的;
rocketMQ start 对原生api做了二次封装,提供了默认的MessageQueue
选择器,用的字符串的hash算法实现的,如果不满足实际需求,需要重写选择器。
3、继承RocketMQListener
接口,泛型类型写消息生产者Message<T>
泛型一致;会有默认转换器,把二进制数据转成对应的泛型实体;
4、注解@RocketMQMessageListener
标注的RocketMQListener
接口实现中,默认使用的Push模式的consumer,
如果需要使用pull模式的,则需要单独使用模板类RocketMQTemplate
,并且需要再yaml配置文件中,写上相应配置;详细可以参考源码 org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
throws MQClientException {
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
return litePullConsumer;
}
3.3、定义消费端代码
@Component
@RocketMQMessageListener(topic = "test_producer", consumerGroup = "test_consumer-group")
public class DemoConsumer implements RocketMQListener<String> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(String message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
注意:
1、消费端的注解@RocketMQMessageListener
属性中consumerGroup
,多个消费端(消费集群)消费同一个topic的时候,需要定义成一致;
2、消费端消费的时候,是会多线程的形式消费topic里的4个MessageQueue的,如果要消费顺序消息,需要指定属性consumeMode
为ConsumeMode.ORDERLY
,表示同步消费;
3.4、主启动类
@SpringBootApplication
public class RocketmqApplicaltion {
public static void main(String[] args) throws Exception {
SpringApplication.run(RocketmqApplicaltion.class, args);
}
}
详细代码也可以参看《码云》
更多推荐
所有评论(0)