消息中间件——RocketMQ是什么?
RocketMQ是阿里开发的消息中间件,吸取了RabbitMQ和Kafka的优点:并发高,功能丰富,适用场景广。双十一里可以承受数亿级的高并发,主要是它的功劳。一、RocketMQ内部结构模型和概念上图就是rocketMQ的架构图,需要注意的是:每一个组件都是集群形式,因为它被开发创造的时候,就是为了解决大规模数据的生产环境下的消息发送。所以集群形式部署才是它最适合的用法。RocketMQ由以下这
RocketMQ是阿里开发的消息中间件,吸取了RabbitMQ和Kafka的优点:并发高,功能丰富,适用场景广。双十一里可以承受数亿级的高并发,主要是它的功劳。
一、RocketMQ内部结构模型和概念
上图就是rocketMQ的架构图,需要注意的是:每一个组件都是集群形式,因为它被开发创造的时候,就是为了解决大规模数据的生产环境下的消息发送。所以集群形式部署才是它最适合的用法。
RocketMQ由以下这几个组件组成
- NameServer : 路由消息的提供者,他管理着broker的信息,相当于zookeeper的功能。
- Broker:实际处理消息存储、转发等服务的核心组件 (实际干活的)。
- Producer:消息生产者集群。通常是业务系统中的一个功能模块。
- Consumer:消息消费者集群。通常也是业务系统中的一个功能模块。
以上4个概念,可以理解为是物理上有区别的概念,还有一些逻辑概念:
-
Topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息
订阅的基本单位。 -
Group
Producer Group具有相同角色的生产者组合在一起 或者 Consumer Group具有相同角色的消费者组合在一起。组与组之间可以是不同的业务逻辑,彼此不影响。 -
MessageQueue
同一个Topic下的数据,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。 -
Message
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。
二、RocketMQ安装和配置
因为RocketMQ的源码是Java编写的,所以它是一个Java项目,在启动它之前,我们要求必须有jdk的环境。具体怎么安装jdk我就不在这里讲解了。我这里用的CentOS7的Linux系统:
1.首先确保已经安装了jdk
2. 下载RocketMQ安装包
执行安装命令:
wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
3.解压安装包
解压命令
unzip rocketmq-all-4.7.1-bin-release.zip
4.配置环境变量
cd到解压的文件后,输入pwd查看文件目录
然后把目录配置到环境变量中:
# 编辑环境变量文件
vi ~/.bash_profile
修改完环境变量文件后,执行刷新命令,让环境变量生效。
source ~/.bash_profile
5.启动NameServer
用静默启动的方式启动NameServer服务
nohup bin/mqnamesrv &
启动成功后,用jps命令,查看是否有 NameservStartup程序,表示启动成功。
6.启动Broker
也以静默启动的方式启动runbroker.sh
nohup ./mqbroker &
启动后,jps指令可以看到一个BrokerStartup进程。
至此,我们启动RocketMQ就完成了,其实就是启动了两个项目,对应我们上面将的组件:一个是NameServer 和 Broker
三、springBoot入门实战
创建一个空的springBoot项目。
1.在pom文件中导入依赖包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
2.在配置文件中配置地址信息
在这个配置文件:application.properties 配置RocketMQ的ID地址和端口 以及 分组名称
rocketmq.name-server=192.168.69.128:9876
rocketmq.producer.group=springBootGroup
3. 创建生产者Producer
创建MQTestController文件,用户对外提供的接口,可以调取发送消息。
@RestController
@RequestMapping("/MQTest")
public class MQTestController {
private final String topic = "TestTopic";
@Resource
private SpringProducer producer;
@RequestMapping("/sendMessage")
public String sendMessage(String message){
producer.sendMessage(topic,message);
return "消息发送完成";
}
}
创建 生产者类 SpringProducer
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic,String msg){
this.rocketMQTemplate.convertAndSend(topic,msg);
}
}
4. 创建消费者Consumer
创建消费者类SpringConsumer,去监听消息。
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("===我是消费者===收到了消息 : "+ message);
}
}
启动springboot项目的入口类
启动main方法,我们的生产者类和消费者类都会加载到spring容器里,并启动。
@SpringBootApplication
public class RocketMQSBApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQSBApplication.class,args);
}
}
调用api接口,去发送消息,看效果
启动项目后,默认是8080端口。我们调用Controller里配好的接口地址,并加上message消息。
可以看到控制台里,消费者已经把这条消息消费并打印出来了:
这就是非常简单的一次Java通过springBoot使用RocketMQ的实战操作。
更多推荐
所有评论(0)