1. RabbitMQ简介

消息队列分为很多种,常用的一般分为ActiveMQ,RabbitMQ,Kafka,这三个依次能处理更高数据量的任务,并且安全度也会降低,可能会出现数据丢失,但是,这三者的目的都是一致的,为了解耦,异步信息,流量削峰等问题实现高性能,高可用,可伸缩和最终一致性

RabbitMq的主要特点如下:

  1. 可靠性:RMQ可以使用其持久化,传输确认,发布确认来保证可靠性。
  2. 灵活路由:在消息进入队列之前,通过Exchange来路由消息。
  3. 集群:可以做成分布式的。
  4. 高可用:保证分布式的HA
  5. 多种协议:支持多种消息队列协议,比如mqtt
  6. 多语言客户端:web端可以支持很多门语言,如java,.NET,Ruby
  7. 管理页面:有web管理界面
  8. 跟踪机制:如果消息异常,RabbitMq提供了消息跟踪机制
  9. 插件机制:可自定义开发

架构图:
在这里插入图片描述

重要概念:

  1. RabbitMq Server:也叫broker server。任务就是维护一条从producer到consumer的路线,保证数据能够按照指定的防止进行传输。
  2. Producer:消息生产者,消息的发送方,将消息发送到Exchange
  3. Consumer:消息消费者,RabbitMq会将Queue中的消息传递给消费者
  4. Exchange:交换器,有direct,fanout,topic,headers四种类型,每种拥有不同的路由规则,本身不存储数据
  5. Queue:队列,在rmq内部负责存储消息。消息消费者通过订阅队列来获取消息,Rmq中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,然后消费者再去利用。多对多关系。
  6. RoutingKey:路由规则,通过指定路由规则指定消息流向哪里。

安装:

至于安装就不说了。

我是使用的3.7.4

在这里插入图片描述

2.RabbitMq的三种模式

(1). 直接模式(Direct)

当我们在单节点进行消息传递的时候,就可以使用这种模式。

在这里插入图片描述

在这种情况下,我们需要指定好RouteKey对应的那个指定的Queue,就可以了。

使用思路

  1. 一般情况下exchange使用RMQ自带的defauly Exchange。
  2. 单节点,不需要让Exchange进行任何bind操作
  3. 消息传递,指定RouteKey,不然没法指向Queue
  4. 如果vhost不存在RouteKey指定的队列名,就会被抛弃。

案例实现

如上面所说的,在写代码前,我们现在web端,创建一个队列。

在这里插入图片描述
然后Add Queue
在这里插入图片描述
然后Exchange就不用管了,因为是默认的。

然后编写代码,编写代码分成两个部分来体现demo

  1. 生产者
  2. 消费者
  • 首先来搭建环境:

pom.xml

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
  • 配置
server:
  port: 8099
spring:
  rabbitmq:
    host: 127.0.0.1
  • 编写生产者

在生产方面:

我们可以先看看上面的架构图,首先,我们将数据产生,然后进去exchange,利用你指定的routekey去到达指定的Queue。

首先我们来到web界面,先创建好队列的routeKey,方便我们去一会进行指定。

在这里插入图片描述
然后编写我们的生产者,在convertAndSend里指定好我们的routeKey,运行就行了

@RunWith(SpringRunner.class)//替代Junit测试环境
@SpringBootTest(classes = RabbitApplication.class)//指定程序的入口
public class ProductTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMsg(){
        rabbitTemplate.convertAndSend("DirectTestQueue","test1");
    }

}

然后回到我们的web端,可以看见,我们的test1被阻塞在了队列。
在这里插入图片描述
所以我们还需要一个消费者,将队列里的信息,进行某种消费。
我们需要两个注解,让rabbitMq知道,这是消费者方法

@RabbitListener(queues = "DirectTestQueue")//声明为消费者
@Component//添加进去容器
public class CustomerOne {

    @RabbitHandler
    public void getMsg(String msg) {
        System.out.println("直接模式消息消费:"+msg);
    }

}

然后运行之后,我们可以发现,消息确实被消费处理掉了,阻塞的信息也消失了。
在这里插入图片描述

(2). 分列模式(Fanout)
  • 什么是分列模式

也就是当我们需要将信息一次发给多个队列的时候,我们就需要这种模式,如图:

在这里插入图片描述

  • 创建Queue

然后我们就来到web界面,创建多个queue,然后去完成个demo

在这里插入图片描述

  • 创建Exchange并对接好创建的Queue

然后这个时候,由于我们要运送到不同Queue,所以肯定去运送信息的Exchange也不能只是一个了,所以比起刚才,我们还要额外创建非default的Exchange,然后将创建出的Exchange去指定好对应的Queue

在这里插入图片描述

在这里插入图片描述

点进去FanoutExchange1,然后去给它对接两个Queue,因为我们没有匹配规则,所以先不写RoutingQueue

在这里插入图片描述

然后去编写分列模式代码即可

首先我们把启动方法写好,因为我们没有匹配规则,所以RouteKey为null就可以了,只是选好Exchange的名字就行

/**
     * 分列模式
     */
    @Test
    public void sengMsg2() {
        rabbitTemplate.convertAndSend("FanoutExchange1","","分列模式测试");
    }

然后写上具体对于消息的消费逻辑就行了,可刚才差不多。

@RabbitListener(queues = "FanoutTest1")//声明为消费者
@Component//添加进去容器
public class CustomerOne {

    @RabbitHandler
    public void getMsg(String msg) {
        System.out.println("FanoutTest1分列模式消息消费:"+msg);
    }

}
@RabbitListener(queues = "FanoutTest2")//声明为消费者
@Component//添加进去容器
public class CustomerOne {

    @RabbitHandler
    public void getMsg(String msg) {
        System.out.println("FanoutTest2分列模式消息消费:"+msg);
    }

}

最后运行代码

在这里插入图片描述

在这里插入图片描述

(3). 主题模式(Topic)

首先简单说一下主题模式的概念,刚才我们说到了分列模式,意思就是没有routeKey,而主题模式相当于分列模式的加强版,也就是有匹配规则的分列算法。

在这里插入图片描述

然后我们在web端对主题模式的Exchange与Queue进行一下安排。

其实和之前的分列模式是差不多的,只是增加了routingKey而已

在这里插入图片描述

在这里插入图片描述

Exchange声明完了,Queue创建完了,二者绑定完了,下面就该进行代码的Demo实现了

生产者方法

/**
     * 主题模式
     */
    @Test
    public void sendMsg3() {
        rabbitTemplate.convertAndSend("TopicExchange1","good.abc","主题模式测试");
    }

三个消费者

@RabbitListener(queues = "TopicTest1")//声明为消费者
@Component//添加进去容器
public class CustomerOne {

    @RabbitHandler
    public void getMsg(String msg) {
        System.out.println("good.#分列模式消息消费:"+msg);
    }

}

@RabbitListener(queues = "TopicTest2")//声明为消费者
@Component//添加进去容器
public class CustomerTwo {

    @RabbitHandler
    public void getMsg(String msg) {
        System.out.println("#.log分列模式消息消费:"+msg);
    }

}

@RabbitListener(queues = "TopicTest3")//声明为消费者
@Component//添加进去容器
public class CustomerThree {

    @RabbitHandler
    public void getMsg(String msg) {
        System.out.println("good.log分列模式消息消费:"+msg);
    }

}

然后启动运行,我们发现,它确实进行了filter的过滤操作。。

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐