RabbitMQ基本概念讲解及其三种队列模式(Direct,Fanout,Topic)的使用方法
1. RabbitMQ简介消息队列分为很多种,常用的一般分为ActiveMQ,RabbitMQ,Kafka,这三个依次能处理更高数据量的任务,并且安全度也会降低,可能会出现数据丢失,但是,这三者的目的都是一致的,为了解耦,异步信息,流量削峰等问题实现高性能,高可用,可伸缩和最终一致性。RabbitMq的主要特点如下:可靠性:RMQ可以使用其持久化,传输确认,发布确认来保证可靠性。灵活路由...
1. RabbitMQ简介
消息队列分为很多种,常用的一般分为ActiveMQ,RabbitMQ,Kafka,这三个依次能处理更高数据量的任务,并且安全度也会降低,可能会出现数据丢失,但是,这三者的目的都是一致的,为了解耦,异步信息,流量削峰等问题实现高性能,高可用,可伸缩和最终一致性。
RabbitMq的主要特点如下:
- 可靠性:RMQ可以使用其持久化,传输确认,发布确认来保证可靠性。
- 灵活路由:在消息进入队列之前,通过Exchange来路由消息。
- 集群:可以做成分布式的。
- 高可用:保证分布式的HA
- 多种协议:支持多种消息队列协议,比如mqtt
- 多语言客户端:web端可以支持很多门语言,如java,.NET,Ruby
- 管理页面:有web管理界面
- 跟踪机制:如果消息异常,RabbitMq提供了消息跟踪机制
- 插件机制:可自定义开发
架构图:
重要概念:
- RabbitMq Server:也叫broker server。任务就是维护一条从producer到consumer的路线,保证数据能够按照指定的防止进行传输。
- Producer:消息生产者,消息的发送方,将消息发送到Exchange
- Consumer:消息消费者,RabbitMq会将Queue中的消息传递给消费者
- Exchange:交换器,有direct,fanout,topic,headers四种类型,每种拥有不同的路由规则,本身不存储数据。
- Queue:队列,在rmq内部负责存储消息。消息消费者通过订阅队列来获取消息,Rmq中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,然后消费者再去利用。多对多关系。
- RoutingKey:路由规则,通过指定路由规则指定消息流向哪里。
安装:
至于安装就不说了。
我是使用的3.7.4
2.RabbitMq的三种模式
(1). 直接模式(Direct)
当我们在单节点进行消息传递的时候,就可以使用这种模式。
在这种情况下,我们需要指定好RouteKey对应的那个指定的Queue,就可以了。
使用思路:
- 一般情况下exchange使用RMQ自带的defauly Exchange。
- 单节点,不需要让Exchange进行任何bind操作
- 消息传递,指定RouteKey,不然没法指向Queue
- 如果vhost不存在RouteKey指定的队列名,就会被抛弃。
案例实现:
如上面所说的,在写代码前,我们现在web端,创建一个队列。
然后Add Queue
然后Exchange就不用管了,因为是默认的。
然后编写代码,编写代码分成两个部分来体现demo
- 生产者
- 消费者
- 首先来搭建环境:
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
- 配置
server:
port: 8099
spring:
rabbitmq:
host: 127.0.0.1
- 编写生产者
在生产方面:
我们可以先看看上面的架构图,首先,我们将数据产生,然后进去exchange,利用你指定的routekey去到达指定的Queue。
首先我们来到web界面,先创建好队列的routeKey,方便我们去一会进行指定。
然后编写我们的生产者,在convertAndSend里指定好我们的routeKey,运行就行了
@RunWith(SpringRunner.class)//替代Junit测试环境
@SpringBootTest(classes = RabbitApplication.class)//指定程序的入口
public class ProductTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
rabbitTemplate.convertAndSend("DirectTestQueue","test1");
}
}
然后回到我们的web端,可以看见,我们的test1被阻塞在了队列。
所以我们还需要一个消费者,将队列里的信息,进行某种消费。
我们需要两个注解,让rabbitMq知道,这是消费者方法
@RabbitListener(queues = "DirectTestQueue")//声明为消费者
@Component//添加进去容器
public class CustomerOne {
@RabbitHandler
public void getMsg(String msg) {
System.out.println("直接模式消息消费:"+msg);
}
}
然后运行之后,我们可以发现,消息确实被消费处理掉了,阻塞的信息也消失了。
(2). 分列模式(Fanout)
- 什么是分列模式
也就是当我们需要将信息一次发给多个队列的时候,我们就需要这种模式,如图:
- 创建Queue
然后我们就来到web界面,创建多个queue,然后去完成个demo
- 创建Exchange并对接好创建的Queue
然后这个时候,由于我们要运送到不同Queue,所以肯定去运送信息的Exchange也不能只是一个了,所以比起刚才,我们还要额外创建非default的Exchange,然后将创建出的Exchange去指定好对应的Queue
点进去FanoutExchange1,然后去给它对接两个Queue,因为我们没有匹配规则,所以先不写RoutingQueue
然后去编写分列模式代码即可
首先我们把启动方法写好,因为我们没有匹配规则,所以RouteKey为null就可以了,只是选好Exchange的名字就行
/**
* 分列模式
*/
@Test
public void sengMsg2() {
rabbitTemplate.convertAndSend("FanoutExchange1","","分列模式测试");
}
然后写上具体对于消息的消费逻辑就行了,可刚才差不多。
@RabbitListener(queues = "FanoutTest1")//声明为消费者
@Component//添加进去容器
public class CustomerOne {
@RabbitHandler
public void getMsg(String msg) {
System.out.println("FanoutTest1分列模式消息消费:"+msg);
}
}
@RabbitListener(queues = "FanoutTest2")//声明为消费者
@Component//添加进去容器
public class CustomerOne {
@RabbitHandler
public void getMsg(String msg) {
System.out.println("FanoutTest2分列模式消息消费:"+msg);
}
}
最后运行代码
(3). 主题模式(Topic)
首先简单说一下主题模式的概念,刚才我们说到了分列模式,意思就是没有routeKey,而主题模式相当于分列模式的加强版,也就是有匹配规则的分列算法。
然后我们在web端对主题模式的Exchange与Queue进行一下安排。
其实和之前的分列模式是差不多的,只是增加了routingKey而已
Exchange声明完了,Queue创建完了,二者绑定完了,下面就该进行代码的Demo实现了
生产者方法
/**
* 主题模式
*/
@Test
public void sendMsg3() {
rabbitTemplate.convertAndSend("TopicExchange1","good.abc","主题模式测试");
}
三个消费者
@RabbitListener(queues = "TopicTest1")//声明为消费者
@Component//添加进去容器
public class CustomerOne {
@RabbitHandler
public void getMsg(String msg) {
System.out.println("good.#分列模式消息消费:"+msg);
}
}
@RabbitListener(queues = "TopicTest2")//声明为消费者
@Component//添加进去容器
public class CustomerTwo {
@RabbitHandler
public void getMsg(String msg) {
System.out.println("#.log分列模式消息消费:"+msg);
}
}
@RabbitListener(queues = "TopicTest3")//声明为消费者
@Component//添加进去容器
public class CustomerThree {
@RabbitHandler
public void getMsg(String msg) {
System.out.println("good.log分列模式消息消费:"+msg);
}
}
然后启动运行,我们发现,它确实进行了filter的过滤操作。。
更多推荐
所有评论(0)