学习使用RocketMQ
官方说明:随着使用越来越多的队列和虚拟主题,ActiveMQ IO模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。因此,我们那时开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。看到这里可以很清楚的知道RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。
学习使用RocketMQ
一、 环境搭建
考虑到大部分Java开发者还是习惯于在windows环境下开发,因此本篇讲解如何在windows环境下搭建一个单机开发环境。
1. 基于windows的环境搭建
1.1 准备工作
Windows必须先安装64bit的JDK1.8或以上版本。
从RockitMQ官网 http://rocketmq.apache.org/dowloading/releases/ 下载最新的release包。我这里下载的版本是v4.4.0
解压到本地目录。
目录结构
上图是rocketmq-all-4.4.0-bin-release.zip 包解压后的目录结构。bin目录下存放可运行的脚本。
1.2 RocketMQ基本结构
如下图:
RocketMQ基本结构
如上图所示,一个正常工作的RocketMQ包括四个部分。
- NameServer :基于高可用设计产生的,用于服务发现和路由。正式应用时通常采用集群部署。
- Broker:实现队列机制,负责消息存储和转发。正式应用时也采用集群部署。
- Producer:消息生产者,生成消息并发送到RocketMQ中,生产者通常是我们自己实现的应用程序。
- Consumer:消息消费者,从RocketMQ中接收消息并进行业务处理。这部分也通常是我们自己实现的。
这里并不打算深入讲解RocketMQ的架构和特性,因为我觉得,针对于初学者,太早地深入知识细节,会让人感到迷惘,学习起来吃力。为了避免浪费时间,以上的知识己经能满足本文后面要实践的内容。随着实践我们会慢慢加深对RocketMQ的理解,再回过头来一点点深入了解。
以上的知识都来源于RocketMQ官网的参考文档,需要更多细节的同学, 可以打开
http://rocketmq.apache.org/docs/quick-start/ 自行阅读。
1.3 Windows环境下启动最小应用
从上面的图可以了解到,RocketMQ自身分为 NameServer 和 Broker 两个部分,因此,用作本机开发调试用的最小应用,应该分别启动一个NameServer和一个Broker节点。
RocketMQ默认提供了 windows环境 和 linux环境 下的启动脚本。脚本位于bin目录下,windows的脚本以.cmd为文件名后缀,linux环境的脚本以.sh为文件名后缀。
本篇操作细节部分只专注于讲windows环境下的脚本,linux环境下的脚本请直接参考官网原文。
不过,通常情况下,windows下的脚本双击启动时,都是窗口一闪而过,启动失败。下面的内容就帮大家解决这些问题。
第一步,配置 JAVA_HOME 和 ROCKETMQ_HOME 环境变量
JAVA_HOME 的配置已经是老生常谈,这里不再赘述,不懂的话请自行百度。
ROCKETMQ_HOME 应指向解压后的Readme.md文件所在目录。
如上面的第一张图,我的 ROCKETMQ_HOME 应配置为 E:\rocketMq
第二步,启动 NameServer
NameServer的启动脚本是bin目录下的mqnamesrv.cmd。
上文讲过,即使配置好了ROCKETMQ_HOME环境变量,mqnamesrv.cmd的启动通常也以失败告终。
阅读mqnamesrv.cmd脚本,发现其实际上是调用了runserver.cmd脚本来实现启动的动作。
而在runserver.cmd脚本,java的默认启动参数中,启动时堆内存的大小为2g,老旧一点的机器上根本没有这么多空闲内存。
因此,用编辑器修改一下runserver.cmd脚本。将原来的内存参数注释掉(cmd脚本使用rem关键字):
rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改为:
set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"
启动方式一:直接双击mqnamesrv.cmd脚本启动NameServer。
启动方式二:使用cmd命令启动,首先进入rocketMq的安装目录,再进入bin目录,执行‘start mqnamesrv.cmd’,启动NameServer
NameServer启动显示
看到 The Name Server boot success 字样,表示NameServer己启动成功。
windows环境下,可以在目录%USERPROFILE%\logs\rocketmqlogs下找到NameServer的启动日志。文件名为namesrv.log。
第三步,启动 Broker
Broker的启动脚本是mqbroker.cmd。
与mqnamesrv.cmd脚本类似,mqbroker.cmd是调用runbroker.cmd脚本启动Broker的。
同样的,优化一下runbroker.cmd的启动内存
- rem set “JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g”
- set “JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m”
此外,Broker脚本启动之前要指定 NameServer的地址。
NameServer默认启动端口是9876,这点可以从NameServer的启动日志中找到记录。
启动方式一:修改mqbroker.cmd脚本,增加NameServer的地址。
1.rem 添加此行,指定NameServer的地址
2.set "NAMESRV_ADDR=localhost:9876"
3.rem 在此行之前添加NameServer的地址
4.call "%ROCKETMQ_HOME%\bin\runbroker.cmd" org.apache.rocketmq.broker.BrokerStartup %*
双击mqbroker.cmd脚本启动Broker。
启动方式二:不 修改mqbroker.cmd脚本,直接使用cmd命令启动,首先跟启动NameServer一样先进入rocketmq安装目录的bin目录下面,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’启动broker
Broker启动成功
看到 The broker … boot success 字样,表示Broker己启动成功,使用命令启动的界面没有内容显示
与NameServer类似,可以在目录%USERPROFILE%\logs\rocketmqlogs下找到Broker的启动日志。文件名为broker.log。
1.4 验证RocketMQ功能
RocketMQ自带了推送与接收消息的脚本tools.cmd,用来验证RocketMQ的功能是否正常。
tool.cmd脚本需要带参数执行,无法用简单的双击方式启动。因此,我们打开一个cmd窗口,并跳转到bin目录下。
打开cmd窗口并跳转到bin目录下
启动消费者
与mqbroker.cmd脚本类似,启动tool.cmd命令之前我们要指定NameServer地址。
这里我们采用命令方式指定,并启动消费者。依次执行如下命令:
set NAMESRV_ADDR=127.0.0.1:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer
启动消费者成功
启动生产者
再打开一个cmd窗口,依次执行如下命令:
set NAMESRV_ADDR=127.0.0.1:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer
生产者启动命令
启动成功后,生产者会发送1000个消息,然后自动退出。
生产者发送消息并退出
此时,在消费者界面按下Ctrl + C,就会收到刚刚生产者发出的消息。
消费者接收消息
至此,RocketMQ最小应用己经可以正常工作,能满足我们开发环境下调试代码的需求。
1.5 RocketMQ控制台
我们实际开发中不愿意总是去编写命令查询,管理,定位rocketmq的消息,我们希望rabbitmq一样的控制台直接打开消息。
下载rocketMq控制台源码:https://github.com/apache/rocketmq-externals.git
下载完后解压到本地电脑
可以看出控制台是使用springboot开发的,最终的控制台项目是 rocketmq-console,由于该项目端口号默认使用的是8080可能会冲突,我们需要修改端口号
进入rocketmq-console项目
找到项目的配置文件,将端口号修改为8088,并且知道rocketmq的nameserver路径为本机
修改保存后我们编辑改项目,使用cmd命令进入rocketmq-console项目,执行mvn clean package -Dmaven.test.skip=true 命令
编译打包成功后在rocketmq-console项目下面就会出现一个target文件夹,进入该文件夹 就会看到我们打包好的jar包
我们直接运行该jar包即可,使用cmd命令进入到该目录,然后执行命令 java -jar rocketmq-console-ng-1.0.0.jar 或者 mvn spring-boot:run 命令
使用浏览器输入 http://127.0.0.1:8088/ 即可进入rocketmq控制台
我们看到的红色的就是消息总数,由于我们执行了 rocketmq自带的验证功能,发送了1000条消息
windows环境,将下面代码,复制到bat中,用于一键启动MqNameServer
、MqBroker
、测试consumer
、测试producer
## 进入bin目录,打开cmd命令窗口,依次执行
mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
tools.cmd org.apache.rocketmq.example.quickstart.Consumer
tools.cmd org.apache.rocketmq.example.quickstart.Producer
2.基于docker的环境搭建
待完善
二、Springboot集成RocketMQ
什么是RocketMQ?
官方说明:
随着使用越来越多的队列和虚拟主题,ActiveMQ IO模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。因此,我们那时开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。
看到这里可以很清楚的知道RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。
具有以下特性:
* 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
* 能够保证严格的消息顺序,在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
* 提供丰富的消息拉取模式,支持拉(pull)和推(push)两种消息模式
* 单一队列百万消息的堆积能力,亿级消息堆积能力
* 支持多种消息协议,如 JMS、MQTT 等
* 分布式高可用的部署架构,满足至少一次消息传递语义
为什么选择RocketMQ消息队列?
-
首先RocketMQ是阿里巴巴自研出来的,也已开源。其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂欢节零点千万级 TPS、万亿级数据洪峰,创造了全球最大的业务消息并发以及流转纪录(日志类消息除外);
-
在始终保证高性能前提下,支持亿级消息堆积,不影响集群的正常服务,在削峰填谷(蓄洪)、微服务解耦的场景下尤为重要;这,就能说明RocketMQ的强大。
RocketMQ的特点和优势(可跳过看三的整合代码)
削峰填谷(主要解决诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,海量消息堆积能力强)
异步解耦(高可用松耦合架构设计,对高依赖的项目之间进行解耦,当下游系统出现宕机,不会影响上游系统的正常运行,或者雪崩)
转存失败 重新上传 取消
顺序消息(顺序消息即保证消息的先进先出,比如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等)
分布式事务消息(确保数据的最终一致性,大量引入 MQ 的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性,减少系统间的交互)
项目基于之前搭建 SpringCloud搭建Nacos项目 增加RocketMQ功能,项目搭建参考 springcloud集成nacos的配置中心,注册中心_lockie的博客-CSDN博客
当前项目环境版本为:
SpringBoot 2.2.2.RELEASE
RocketMQ 4.7.0
生产者项目,消费者项目都增加配置文件
org.apache.rocketmq rocketmq-client 4.7.0MQ生产者配置
mq生产者项目 boot-order-service 端口号 8802
配置文件配置:
spring.application.name=boot-order-service
server.port=8802
# nacos配置地址
nacos.config.server-addr=127.0.0.1:8848
# nacos注册地址
nacos.discovery.server-addr=127.0.0.1:8848
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
# 是否开启自动配置
rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.producer.groupName=${spring.application.name}
# mq的nameserver地址
rocketmq.producer.namesrvAddr=127.0.0.1:9876
# 消息最大长度 默认 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
# 发送消息超时时间,默认 3000
rocketmq.producer.sendMsgTimeOut=3000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2
新增一个 MQProducerConfigure 配置类,用来初始化MQ生产者
package com.lockie.cloudorder.rocketmq;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: lockie
* @Date: 2020/4/21 10:28
* @Description: mq生产者配置
*/
@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MQProducerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);
private String groupName;
private String namesrvAddr;
// 消息最大值
private Integer maxMessageSize;
// 消息发送超时时间
private Integer sendMsgTimeOut;
// 失败重试次数
private Integer retryTimesWhenSendFailed;
/**
* mq 生成者配置
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")
public DefaultMQProducer defaultProducer() throws MQClientException {
LOGGER.info("defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();
LOGGER.info("rocketmq producer server 开启成功----------------------------------");
return producer;
}
}
MQ消费者配置
mq消费者项目 boot-user-service 端口号 8801
增加配置参数
spring.application.name=boot-user-service
server.port=8801
# nacos配置地址
nacos.config.server-addr=127.0.0.1:8848
# nacos注册地址
nacos.discovery.server-addr=127.0.0.1:8848
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
# 是否开启自动配置
rocketmq.consumer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.consumer.groupName=${spring.application.name}
# mq的nameserver地址
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
# 消费者订阅的主题topic和tags(*标识订阅该主题下所有的tags),格式: topic~tag1||tag2||tags3;
rocketmq.consumer.topics=TestTopic~TestTag;TestTopic~HelloTag;HelloTopic~HelloTag;MyTopic~*
# 消费者线程数据量
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
# 设置一次消费信心的条数,默认1
rocketmq.consumer.consumeMessageBatchMaxSize=1
新建一个MQConsumerConfigure 类用来初始化MQ消费者
package com.lockie.bootuser.rocketmq;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: lockie
* @Date: 2020/4/21 10:28
* @Description: mq消费者配置
*/
@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MQConsumerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfigure.class);
private String groupName;
private String namesrvAddr;
private String topics;
// 消费者线程数据量
private Integer consumeThreadMin;
private Integer consumeThreadMax;
private Integer consumeMessageBatchMaxSize;
@Autowired
private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;
/**
* mq 消费者配置
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")
public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
LOGGER.info("defaultConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
// 设置监听
consumer.registerMessageListener(consumeMsgListenerProcessor);
/**
* 设置consumer第一次启动是从队列头部开始还是队列尾部开始
* 如果不是第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);
try {
// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
String[] topicArr = topics.split(";");
for (String topic : topicArr) {
String[] tagArr = topic.split("~");
consumer.subscribe(tagArr[0], tagArr[1]);
}
consumer.start();
LOGGER.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr);
} catch (MQClientException e) {
LOGGER.error("consumer 创建失败!");
}
return consumer;
}
}
这个只是初始化操作,实际对消费者对消息处理放在 consumer.registerMessageListener(consumeMsgListenerProcessor); 这个监听类里面了,实际接收消息,处理消息都放在监听类里
新建一个监听类处理消息
package com.lockie.bootuser.rocketmq;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* @author: lockie
* @Date: 2020/4/21 11:05
* @Description: 消费者监听
*/
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
/**
* 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
* 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
* @param msgList
* @param consumeConcurrentlyContext
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty(msgList)) {
LOGGER.info("MQ接收消息为空,直接返回成功");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = msgList.get(0);
LOGGER.info("MQ接收到的消息为:" + messageExt.toString());
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
LOGGER.info("MQ消息topic={}, tags={}, 消息内容={}", topic,tags,body);
} catch (Exception e) {
LOGGER.error("获取MQ消息内容异常{}",e);
}
// TODO 处理业务逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
测试消息发送接收
生产者 boot-order-service 新建一个controller,再新建一个send方法,发送消息
package com.lockie.cloudorder.rocketmq;
import com.lockie.cloudorder.model.Results;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author: lockie
* @Date: 2020/4/21 11:17
* @Description:
*/
@RestController
@RequestMapping("/mqProducer")
public class MQProducerController {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerController.class);
@Autowired
DefaultMQProducer defaultMQProducer;
/**
* 发送简单的MQ消息
* @param msg
* @return
*/
@GetMapping("/send")
public Results send(String msg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
if (StringUtils.isEmpty(msg)) {
return new Results().succeed();
}
LOGGER.info("发送MQ消息内容:" + msg);
Message sendMsg = new Message("TestTopic", "TestTag", msg.getBytes());
// 默认3秒超时
SendResult sendResult = defaultMQProducer.send(sendMsg);
LOGGER.info("消息发送响应:" + sendResult.toString());
return new Results().succeed(sendResult);
}
}
浏览器请求发送send接口 http://127.0.0.1:8802/mqProducer/send?msg=hello
修改topic和tags为MyTopic,MyTags,再发送一次
我们进入rocketmq控制台查看
更多推荐
所有评论(0)