1. 认识RabbitMQ

  1.1介绍RabbitMQ

RabbitMQ 是开源的高级消息队列协议(Advanced Message Queueing Protocol, AMQP) 的实现,用Erlang 语言编写,支持多种客户端。

RabbitMQ是目前应用相当广泛的消息中间件(其他同类的消息处理中间件有ActiveMQ、Kafka等)。在企业级应用、微服务应用中,RabbitMQ担当着十分重要的角色。例如,在业务服务模块中解耦、异步通信、高并发限流、超时业务、数据延迟处理等都可以使用RabbitMQ。

RabbitMQ的处理流程如图12-1所示

  

图 12-1

  1.2 使用场景

  (1)推送通知

  “发布/订阅”是RabbitMQ的重要功能。可以用"发布/订阅"功能来实现通知功能。消费者 (consumer) 一直监听RabbitMQ的数据。如果RabbitMQ有数据,则消费者会按照“先进先岀” 规则逐条进行消费。而生产者(producer)只需要将数据存入RabbitMQ。这样既降低了不同系统之间的耦合度,也确保了消息通知的及时性,且不影响系统的性能。

  "发布/订阅”功能支持三种模式:一对一、一对多、广播。这三种模式都可以根据规则选择分发的对象。众多消费者(consumer)可以根据规则选择是否接收这些数据,扩展性非常强。

  (2)异步任务

  后台系统接到任务后,将其分解成多个小任务,只要分别完成这些小任务,整个任务便可以完成。但是,如果某个小任务很费时,且延迟执行并不影响整个任务,则可以将该任务放入消息队列中去处理,以便加快请求响应时间。

  如果用户注册会员时有一项需求一发送验证邮件或短信验证码以完成验证,则可以使用 RabbitMQ的消息队列来实现,这样可以及时提醒用户操作已经成功。等待收到邮件或验证码,然后进行相应的确认,即完成验证。

  (3)多平台应用的通信

  RabbitMQ可以用于不同开发语言开发的应用间的通信(如Java开发的应用程序需要与C++ 开发的应用程序进行通信),实现企业应用集成。由于消息队列是无关平台和语言的,而且语义上也不是函数调用,因此RabbitMQ适合作为多个应用之间的松耦合的接口,且不需要发送方和接收方同时在线。

  不同语言的软件解耦,可以最大限度地减少程序之间的相互依赖,提高系统可用性及可扩展性, 同时还增加了消息的可靠传输和事务管理功能。

RabbitMQ提供两种事务模式:

  • AMQP事务模式。
  • Confirm事务模式。

  (4)消息延迟

  利用RabbitMQ消息队列演出功能,可以实现订单、支付过期定时取消功能。因为延迟队列存储延时消息,所以,当消息被发送以后,消费者不是立即拿到消息,而是等待指定时间后才拿到这个消息进行消费。

  当然,死信、计时器、定时任务也可以实现延退或定时功能,但是需要开发者去处理。

  要实现消息队列延迟功能,一般釆用官方提供的插件“rabbitmq_delayed_message_ exchange"来实现,但RabbitMQ版本必须是3.5.8版本以上才支持该插件。如果低于这个版本, 则可以利用“死信”来完成。

  (5)远程过程调用

  在实际的应用场景中,有时需要一些同步处理,以等待服务器端将消息处理完成后再进行下一 步处理,这相当于RPC ( Remote Procedure Call,远程过程调用)。RabbitMQ也支持RPC。

  1.3 特性

RabbitMQ具有以下特性。

  • 信息确认:RabbitMQ有以下两种应答模式。
    • 自动应答:当RabbitMQ把消息发送到接收端,接收端从队列接收消息时,会自动发送应答消息给服务器端。
    • 手动应答:需要开发人员手动调用方法告诉服务端已经收到。
  • 队列持久化:队列可以被持久化,但是否为持久化,要看持久化设置。
  • 信息持久化:设置properties.DeliveryMode值即可。默认值为1,代表不是持久的,2代表持久化。
  • 消息拒收:接收端可以拒收消息,而且在发送"reject”命令时,可以选择是否要把拒收的消息重新放回队列中。
  • 消息的QoS:在接收端设置的。发送端没有任何变化,接收端的代码也比较简单,只需要加上如 "channel.BasicQos(0,1, false);"的代码即可。

如果实际场景中对个别消息的丢失不是很敏感,则选用自动应答比较理想。

如果是一个消息都不能丢的场景,则需要选用手动应答,在正确处理完以后才应答。 如果选择了自动应答,那消息重发这个功能就没有了。

2. RabbitMQ的基本概念

  2.1 生产者、消费者和代理

RabbitMQ的角色有以下三种。

  • 生产者:消息的创建者,负责创建和推送数据到消息服务器。
  • 消费者:消息的接收方,用于处理数据和确认消息。
  • 代理:RabbitMQ本身,扮演“快递”的角色,本身不生产消息。

生产者和消费者并不属于RabbitMQ。RabbitMQ只是为生产者和消费者提供发送和接收消息的API。

  2.2 消息队列

Queue (队列)是RabbitMQ的内部对象,用于存储生产者的消息直到发送给消费者,也是消费者接收消息的地方。RabbitMQ中的消息也都只能存储在Queue中,多个消费者可以订阅同一 个 Queue。

Queue有以下一些重要的属性。

  • 持久性:如果启用,则队列将会在消息协商器(broker)重启前都有效。
  • 自动删除:如果启用,则队列将会在所有的消费者停止使用之后自动删除掉。
  • 惰性:如果没有声明队列,则应用程序调用队列时会导致异常,并不会主动声明。
  • 排他性:如果启用,则声明它的消费者才能使用。

  2.3 交换机

  Exchange (交换机)用于接收、分配消息。生产者先要指定一个“routing key”,然后将消息发送到交换机。这个"routing key"需要与"Exchange Type"及"binding key"联合使用才能最终生效,然后,交换机将消息路由到一个或多个Queue中,或丢弃。

  在虚拟主机的消息协商器(broker)中,每个Exchange都有睢一的名字。

  Exchange包含4种类型:direct、topic、fanout、headers。不同的类型代表绑定到队列的行为不同。

  (1)direct

  direct类型的行为是“先匹配,再投送”。在绑定队列时会设定一个routing key,只有在消息的routing key与队列匹配时,消息才会被交换机投送到绑定的队列中。允许一个队列通过一个固定的routing key (通常是队列的名字)进行绑定。Direct交换机将消息根据其routing key属性投递到包含对应key属性的绑定器上。

  Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式。它根据routing key 全文匹配去寻找队列。

  (2)topic

  按规则转发消息(最灵活)。主题交换机(topic exchange )转发消息主要根据通配符。队列和交换机的绑定会定义一种路由模式,通配符就要在这种路由模式和路由键之间匹配后,交换机才能转发消息。

  在这种交换机模式下,路由键必须是一串字符,用”.“隔开。

  路由模式必须包含一个星号“*”,主要用于匹配路由键指定位置的一个单词。

  topic还支持消息的routing key,用”*“或”#“的模式进行绑定。“*”匹配一个单词,“#” 匹配0个或多个单词。例如 “/binding key *.user.#”匹配 routing key 为“cn.user“和“us.user.db”, 但是不匹配“user.hello”

  (3)headers

  它根据应用程序消息的特定属性进行匹配,可以在binding key中标记消息为可选或必选。在队列与交换机绑定时,会设定一组键值对规则。消息中也包括一组键值对(headers属性),当这些键值対中有一对,或全部匹配时,消息被投送到对应队列。

  (4)fanout

  消息广播的模式,即将消息广播到所有绑定到它的队列中,而不考虑routing key的值(不管路由键或是路由模式)。如果配置了 routing key。则routing key依然会被忽略。

  2.4 绑定

  RabbitMQ中通过绑定(binding ),将Exchange与Queue关联起来。这样 RabbitMQ 就知道如何正确地将消息路由到指定的 Queue 了。

  在绑定 Exchange与 Queue时,—般会指定一个binding key。消赛者将消息发送给Exchange 时,一般会指定一个routing key。如果 binding key 与 routing key 相匹配,则消息将会被路由到对应的Queue中。

  绑定是生产者和消费者消息传递的连接。生产者发送消息到 Exchange,消费者从Queue接收消息,都是根据绑定来执行的。

  2.5 通道

  有些应用需要与AMQP代理建立多个连接。但同时开启多个TCP ( Transmission Control Protocol,传输控制协议)连接会消耗过多的系统资源,并使得防火堵的配置变得更加困难。“AMQP 0-9-1“协议用通道(channel)来处理多连接,可以把通道理解成“共享一个TCP连接的多个轻量化连接”。

  一个特定通道上的通信与其他通道上的通信是完全隔离的,因此,每个AMQP方法都需要携带一个通道号。这样客户端就可以指定此方法是为哪个通道准备的。

  2.6 消息确认

  消息确认(message acknowledgement )是指:当一个消息从队列中投递给消费者 (consumer)后,消费者会通知一下消息代理(broker),这个过程可以是自动的,也可以由处理消息的应用的开发者执行。当“消息确认”启用时,消息代理需要收到来自消费者的确认回执后, 才完全将消息从队列中删除。

  如果消息无法被成功路由,或被返给发送者并被丢弃,或消息代理执行了延期操作,则消息会 被放入一个“死信”队列中。此时,消息发送者可以选择某些参数来处理这些特殊情况。

3. RabbitMQ的六种工作模式

  3.1 简单模式

  生产者把消息放入队列,消费者获得消息,如图12-2所示。这个模式只有一个消费者、一个生产者、一个队列,只需要配置主机参数,其他参数使用默认值即可通信。

  

图 12-2

  3.2 工作队列模式

  这种模式出现了多个消费者,如图12-3所示。为了保证消费者之间的负载均衡和同步,需要在消息队列之间加上同步功能。

  工作队列(任务队列)背后的主要思想是:避免立即执行资源密集型任务(耗时),以便下一个任务执行时不用等待它完成。工作队列将任务封装为消息并将其发送到队列中。

  

图 12-3

  3.3 交换机模式

  实际上,前两种模式也使用了交换机,只是使用了采用默认设置的交换机。交换机参数是可以配置的,如果消息配置的交换机参数和RabbitMQ队列绑定(binding )的交换机名称相同,则转发,否则丢弃,如图12-4所示。

  

  图 12-4

  3.4 Routing 转发模式

  交换机要配置为direct类型,转发的规则变为检查队列的routing key值。如果routing key 值相同,则转发,否则丢弃,如图12-5所示。

  

图 12-5

  3.5 主题转发模式

  这种模式下交换机要配置为topic类型,routing key配置失效。发送到主题交换机的信息, 不能是任意routing key,它必须是一个单词的列表,用逗号分隔。特点是可以模糊匹配,匹配规则为:*(星号)可以代替一个词;#(#号)可以代替零个或更多的单词,其模式情况如图12-6 所示。

  

图 12-6

  3.6 RPC 模式

  这种模式主要使用在远程调用的场景下。如果一个应用程序需要另外一个应用程序来最终返回运行结果,那这个过程可能是比较耗时的操作,使用RPC模式是最合适的。其模式情况如图12-7 所示。

  

图 12-7

6种工作模式的主要特点如下。

  • 简单模式:只有一个生产者,一个消费者
  • 工作队列模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。
  • 订阅模式:一个生产者发送的消息会被多个消费者获取。
  • 路由模式:发送消息到交换机,并且要指定路由key,消费者在将队列绑定到交换机时需要指定路由key。
  • topic模式:根据主题进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,”*“只匹配一个词。

4. 认识AmqpTemplate接口

  Spring AMQP提供了操作AMQP协议的模板类AmqpTemplate,用于发送和接收消息, 它定义发送和接收消息等操作,还提供了 RabbitTemplate用于实现AmqpTemplate接口, 而且还提供了错误拋岀类AmqpException,RabbitTemplate支持消息的确认与返回(默认禁用)

  4.1 发送消息

  (1)send方法

AmqpTemplate模板提供了 send方法用来发送消息,它有以下3个重载:

  • void send(Message message) throws AmqpException
  • void send(String routingKey, Message message) throws AmqpException
  • void send(String exchange, String routingKey, Message message)throws AmqpException

  (2)convertAndSend 方法

  AmqpTemplate模板还提供了 convertAndSend方法用来发送消息。convertAndSend 方法相当于简化了的send方法,可以自动处理消息的序列化。下面通过两个功能一样的代码来比较两者的区别:

@Test
void contextLoads() {
    Message message = MessageBuilder.withBody("body content".getBytes())
        .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
        .setMessageId("1")
        .setHeader("header","header")
        .build();
    amqpTemplate.send("QueueHello",message);
}
@Test
void send(){
    amqpTemplate.convertAndSend("QueueHello","body content");
}

  上面代码和下面代码的效果一样。

  4.2 接收消息

接收消息可以有两种方式。

  • 直接去查询获取消息,即调用receive方法。如果该方法没有获得消息,则直接返回null, 因为receive方法不阻塞。
  • 异步接收,通过注册一个Listener (监听器)来实现消息接收。接收消息需要指定队列 (Queue),或设置默认的队列。

  AmqpTemplate提供的直接获得消息的方法是receive

  另外,AmqpTemplate也提供了直接接收POJO (代替消息对象)的方法receiveAndConvert,并提供了各种的 Messageconverter 用来处理返回的Object (对象)。

  从 Spring-Rabbit 1.3 版本开始,AmqpTemplate 也提供了 receiveAndReply 方法来异步接收、处理及回复消息。

  4.3 异步接收消息

  Spring AMQP也提供了多种不同的方式来实现异步接收消息,比如常用的通过 MessageListener (消息监听器)的方式来实现。

   从Spring-rabbit 1.4版本开始,可使用注解@RabbitListener来异步接收消息,它更为简便。 使用方法见以下代码:

@Component
@RabbitListener(queues = "object")
public class ObjectReceiver {
    @RabbitHandler
    public void process(User user){
        System.out.println("Receiver object"+user);
    }
}

5. 在Spring Boot中集成RabbitMQ

  5.1 安装RabbitMQ

  RabbitMQ是用Erlang语言开发的。所以,需要先安装Erlang环境,再安装RabbitMQ。

    (1)下载 Erlang 环境和 RabbitMQ

      到Erlang官网下载Erlang环境。

      到 RabbitMQ 官网下载 RabbitMQ。

    (2)安装

      下载完成后,先单击Erlang安装文件进行安装,然后单击RabbitMQ安装文件进行安装。在安装过程中,按照提示一步一步操作即可。在RabbitMQ成功安装后,会自动启动服务器。

    (3)开启网页管理界面

      虽然可以在命令行管理RabbitMQ,但稍微麻烦。RabbitMQ提供了可视化的网页管理平台, 可以使用“rabbitmq-plugms.bat enable rabbitmq_management”命令开启网页管理界面。

  5.2 界面化管理RabbitMQ

  (1)概览

    在安装配置完成后,开启网页管理,然后可以通过"http://localhost:15672"进行查看和管理, 输入默认的用户名"guest"和密码"guest"进行登录。RabbitMQ的后台界面如图12-8所示。

    

图 12-8

  (2)管理交换机

  进入交换机管理页面后,单击“Add exchange (添加交换机)”按钮,弹出添加界面,可以看到列出了 RabbitMQ 默认的4种类型,由于笔者已经添加了消息延迟插件,所以会有 “x-delayed-message”类型,如图 12-9 所示。

  

图 12-9

  (3)管理管理员

消息中间件的安全配置也是必不可少的。在RabbitMQ中,可以通过命令行创建用户、设置密码、绑定角色。常用的命令如下。

  • rabbitmqctl.bat list_users:查看现有用户。
  • rabbitmqctl.bat add_user username password:新増用户。新増的用户只有用户名、密码,没有管理员、超级管理员等角色。
  • rabbitmqctl.bat set_user_tags username administrator:设置角色。角色分为 none、 management、policymaker、monitoring、administrator。
  • rabbitmqctl change_password userName newPassword:修改密码命令。
  • rabbitmqctl.bat delete_user username:删除用户命令。

还可以在开启RabbitMQ网页管理界面之后,用可视化界面进行操作,如图12-10所示。其 中“Tags”是管理员类型。

  在创建用户后,需要指定用户访问一个虚拟机(如图12-11所示),并且该用户只能访问该虚拟机下的队列和交换机。如果没有指定,则默认是"No access",而不是“/"(所有)。在一个 RabbitMQ服务器上可以运行多个vhost,以适应不同的业务需要。这样做既可以满足权限配置的要求,也可以避免不同业务之间队列、交换机的命名冲突问题,因为不同vhost之间是隔离的,权限设置可以细化到主题。

  

图 12-10 管理管理员

图 12-11 设置权限

  5.3 在 Spring Boot 中配置 RabbitMQ

  (1)添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  (2)配置 application.properties 文件

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /

6. 在SpringBoot中实现RabbitMQ的四种发送/接收模式

  6.1 实例:实现发送和接收队列

  (1)配置队列

@Bean
public Queue queue(){
    return new Queue("Queue1");
}

  (2)创建接收者

  注意,发送者和接收者的 Queue 名称必须一致,否则不能接收,见以下代码:

@Component
//监听QueueHello的消息队列
@RabbitListener(queues = "Queue1")
public class ReceiveA {
    //@RabbitHandler来实现具体消费
    @RabbitHandler
    public void QueueReceiver(String Queue1){
        System.out.println("Receive A:"+Queue1);
    }
}

  (3)创建发送者

@Component
public class SendA {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void send(String content){
        System.out.println("Sender:"+content);
        //使用AmqpTemplate将消息发送到消息队列中
        rabbitTemplate.convertAndSend("Queue1",content);
    }
}

  (4)测试发送和接收情况

@Test
void QueueSend(){
    int i = 2;
    for (int j = 0; j < i; j++) {
        String msg = "Queue1 msg"+j+new Date();
        try {
            sendA.send(msg);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

运行测试,可以看到控制台输岀如下结果:

Receive A:Queue1 msg0Sun Aug 14 11:16:45 CST 2022
Receive A:Queue1 msg1Sun Aug 14 11:16:45 CST 2022

上述信息表示发送成功,且接收成功。

如果是多个接收者,则会均匀地将消息发送到 N 个接收者中,并不是全部发送一遍, 也会和"一对多” 一样,接收端仍然会均匀地接收到消息。

  6.2 实现发送和接收对象

  (1)编辑配置类

package com.intehel.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Configuration
@Component
public class RabbitMQConfig {
    @Bean
    public Queue objectQueue(){
        return new Queue("object");
    }

}

  (2)编写接收类

@Component
@RabbitListener(queues = "object")
public class ObjectReceiver {
    @RabbitHandler
    public void process(User user){
        System.out.println("Receiver object"+user);
    }
}

  (3)编写发送类

@Component
public class ObjectSender {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send(User user){
        System.out.println("Sender object"+user);
        this.amqpTemplate.convertAndSend("object",user);
    }
}

  (4)编写测试

@Test
void objectSend(){
    try {
        User user = new User();
        user.setId(1);
        user.setMsg("username");
        objectSender.send(user);
    }catch (Exception e){
        e.printStackTrace();
    }
}

  运行测试,可以看到控制台输岀如下结果:

Sender objectUser(id=1, msg=username)

Receiver objectUser(id=1, msg=username)

  6.3 实例:实现用接收器接收多个主题

  (1)配置topic

@Configuration
@Component
public class RabbitMQConfig {
    @Bean
    public Queue topicA(){
        return new Queue("topic.a");
    }
    @Bean
    public Queue topicB(){
        return new Queue("topic.b");
    }
    @Bean
    TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }
    @Bean
    Binding bingTopicA(Queue topicA,TopicExchange exchange){
        return BindingBuilder.bind(topicA).to(exchange).with("topic.a");
    }
    @Bean
    Binding bingTopicB(Queue topicB,TopicExchange exchange){
        return BindingBuilder.bind(topicB).to(exchange).with("topic.#");
    }
}

  (2)编写接收者A

@Component
@RabbitListener(queues = "topic.a")
public class TopicReceiveA {
    @RabbitHandler
    public void process(String message) {
        System.out.println("topicReceiveA: " + message);
    }
}

  (3)编写接收者B

@Component
@RabbitListener(queues = "topic.b")
public class TopicReceiveB {
    @RabbitHandler
    public void process(String message) {
        System.out.println("topicReceiveB: " + message);
    }
}

  (4)编写发送者

@Component
public class TopicSender {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send(){
        String context = "topic";
        System.out.println("Sender: "+context);
        this.amqpTemplate.convertAndSend("topicExchange","topic.1",context);
    }
    public void sendToA(){
        String context = "topicToA";
        System.out.println("Sender: "+context);
        this.amqpTemplate.convertAndSend("topicExchange","topic.a",context);
    }
    public void sendToB(){
        String context = "topicToB";
        System.out.println("Sender: "+context);
        this.amqpTemplate.convertAndSend("topicExchange","topic.b",context);
    }
}

   (5)编写测试

@Test
public void topic(){
    topicSender.send();
}
@Test
public void topicA(){
    topicSender.sendToA();
}
@Test
public void topicB(){
    topicSender.sendToB();
}

   6.4 实现广播模式 

  (1)配置fanout

@Configuration
@Component
public class RabbitMQConfig {
    @Bean
    public Queue fanoutA(){
        return new Queue("fanout.A");
    }
    @Bean
    public Queue fanoutB(){
        return new Queue("fanout.B");
    }
    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }
    @Bean
    Binding bingFanoutA(Queue fanoutA,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutA).to(fanoutExchange);
    }
    @Bean
    Binding bingFanoutB(Queue fanoutB,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutB).to(fanoutExchange);
    }
}

  (2)编写发送者

@Component
public class FanoutSender {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send(){
        String context = "Fanout";
        System.out.println("Sender: " + context);
        this.amqpTemplate.convertAndSend("fanoutExchange", "",context);
    }
}

  (3)编写接收者A

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiveA {
    @RabbitHandler
    public void process(String message){
        System.out.println("fanout ReceiveA: " + message);
    }
}

  (4)编写接收者B

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiveB {
    @RabbitHandler
    public void process(String message){
        System.out.println("fanout ReceiveB: " + message);
    }
}

  (5)编写测试

@SpringBootTest
public class FanoutSendControllerTest {
    @Autowired
    private FanoutSender sender;
    public void fanoutSend(){
        sender.send();
    }
}

  运行测试,可以看到控制台输出如下结果:

  fanout ReceiveB: Fanout
  fanout ReceiveA: Fanout

  6.5 实例:实现消息队列延迟功能

  要实现这个功能,一般使用RabbitMQ的消息队列延迟功能,即采用官方提供的插件 "rabbitmq_delayed_message_exchange”来实现。但 RabbitMQ 版本必须是 3.5.8 以上才支持该插件,否则得用其“死信”功能。

  (1)安装延迟插件

  用rabbitmq-plugins list命令可以查看安装的插件。如果没有,则直接访问官网进行下载,下载完成后,将其解压到RabbitMQ的plugins目录。

  然后执行下面的命令进行安装:

  rabbitmq-plugins enable rabbitmq_delayed_message_exchange

  (2)配置交换机

@Bean
public Queue queueDelay(){
    return new Queue("delay_queue_1");
}
@Bean
public CustomExchange delayExchange(){
    Map<String,Object> args = new HashMap<String,Object>();
    args.put("x-delayed-type","direct");
    return new CustomExchange("delayed_exchange","x-delayed-message",true,false,args);
}
@Bean
Binding bingDelayB(Queue queueDelay,CustomExchange delayExchange){
    return BindingBuilder.bind(queueDelay).to(delayExchange).with("delay_queue_1").noargs();
}

  这里要使用 CustomExchange,而不是 DirectExchange。CustomExchange 的类型必须是 x-delayed-message

  (3)实现消息发送

  这里设置消息延迟5s

@Service
public class CustomSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMsg(String queueName, String msg){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息发送时间:"+sdf.format(new Date()));
        rabbitTemplate.convertAndSend("delayed_exchange", queueName, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //消息延迟5s
                message.getMessageProperties().setHeader("x-delay",5000);
                return message;
            }
        });
    }
}

  (4)实现消息接收

@Component
public class CustomReceiver {
    @RabbitListener(queues = "delay_queue_1")
    public void receive(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(sdf.format(new Date()));
        System.out.println("Received: 执行取消订单" + msg);
    }
}

  (5)测试发送延迟消息

@SpringBootTest
public class FanoutSendControllerTest {
    @Autowired
    private CustomSender customSender;
    @Test
    public void send(){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        customSender.sendMsg("delay_queue_1","支付超时,取消订单通知!");
    }
}

  运行测试,可以看到控制台输岀如下结果:

  消息发送时间:2022-08-14 14:48:41

  2022-08-14 14:48:46
  Received: 执行取消订单支付超时,取消订单通知!

  至此,消息队列延迟功能成功实现。在rabbitmq_delayed_message_exchange插件产生之前,我们大都是使用“死信”功能来达到延迟队列的效果。

  “死信”在创建Queue(队列)时,要声明“死信”队列。队列里的消息到一定时间没被消费, 就会变成死信转发到死信相应的Exchange或Queue中。

  延退消息是Exchange到Queue或其他Exchange的延迟。但如果消息延迟到期了,或消息不能被分配给其他的Exchange或Queue,则消息会被丢弃。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐