RabbitMQ消息中间件组件,目前比较流行的消息中间件有:RabbitMQ、RocketMQ、ActiveMQ、Kafka等。

我的代码中用的是RabbitMQ,先介绍几个概念:

      一:消息队列的特性如下:

  • 异步性,将耗时的同步操作通过以发送消息的方式进行了异步化处理,减少了同步等待的时间。
  • 松耦合,消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行。
  • 分布式,通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性(当然消息队列本身也可以做成分布式集群)。
  • 可靠性,消息队列一般会把接收到的消息存储到本地硬盘上(当消息被处理完之后,存储信息根据不同的消息队列实现,有可能将其删除),这样即使应用挂掉或者消息队列本身挂掉,消息也能够重新加载。                                                                        

    二:AMQP(Advanced Message Queuing Protocol)高级消息队列协议

  • 是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

    AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,如 Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。                                                                                                                                                   三:消息发送过程中的几个对象

  • 发消息者 send

  • 队列  queue

  • 收消息者 receive

  • 交换器 Exchange

  • 路由  routing

  •  

  • enter image description here

  • 发送者------交换器(路由并且过滤消息)-------队列(队列存储并发送消息)------接收者

  • 虚拟主机、交换机、队列和绑定。

  • 一个虚拟主机持有一组交换机、队列、绑定。RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止 A 组访问 B 组的交换机/队列/绑定,必须为 A 和 B 分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。

  • 交换器常用的规则:topic(最灵活。根据路由键可以模糊匹配)fanout(广播形式的)                                                                 一下是我的测试代码。都测试通过!

  • <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • @Configuration
    public class RabbitConfig {
    
        @Bean
        public Queue helloQueue() {
            return new Queue("hello");
        }
    
        @Bean
        public Queue neoQueue() {
            return new Queue("neo");
        }
    
        @Bean
        public Queue objectQueue() {
            return new Queue("object");
        }
    
    
    }

      定义三个队列名称分别为hello.neo.object

  • 发送者

  • @Component
    public class HelloSend {
        public static final Logger logger = LoggerFactory.getLogger(HelloSend.class);
        @Autowired
        private AmqpTemplate rabbitTemplate;
        public void send(){
            String message = "hello "+new Date();
            logger.info("message:"+message);
            rabbitTemplate.convertAndSend("hello",message);
            logger.info("队列发送成功");
        }
    }
  • 接受者   监听hello队列
  • @Component
    @RabbitListener(queues = "hello")
    public class HelloReceiver {
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  : " + hello);
        }
    }
  • 以上是最简单的直接向队列里面发送信息。接收者监听队列并接受信息                                                                                     TOPIC模式的信息推送(分红色的代码前后对应。如果不对应需要在队列上添加(name="")指明调用的具体队列名称
  • package com.neo.rabbitmq.send.topic;
    
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * topic交换机模式配置类: 描述信息
     *
     * @author liyy
     * @date 2018-07-18 18:36
     */
    @Configuration
    public class TopicRabbitConfig {
        public static final String message = "topic.message";
    
        public static final String messages = "topic.messages";
    
        /**
         * 定义消息队列1
         * @return
         */
        @Bean(name = "queueMessage")
        public Queue messageQueue(){
            return new Queue(TopicRabbitConfig.message);
        }
    
        /**
         * 定义消息队列2
         * @return
         */
        @Bean(name = "queueMessages")
        public Queue messagesQueue(){
            return new Queue(TopicRabbitConfig.messages);
        }
    
        /**
         * 定义交换机
         */
        @Bean
        public TopicExchange exchange(){
            return new TopicExchange("topicExchange");
        }
    
        /**
         * 绑定消息队列到交换机,路由key:topic.message
         * @return
         */
        @Bean
        Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
        }
    
    
        /**
         * 绑定消息队列到交换机,路由key:topic.#
         * @return
         */
        @Bean
        Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange){
            return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
        }
    
    }
    
  • package com.neo.rabbitmq.send.topic;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * : 描述信息
     *
     * @author liyy
     * @date 2018-07-18 18:52
     */
    @Component
    public class TopicSend {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void send1(){
            String context = "hi, i am message 1";
    
            amqpTemplate.convertAndSend("topicExchange","topic.message",context);
        }
    
    
        public void send2(){
            String context = "hi, i am message 2";
    
            amqpTemplate.convertAndSend("topicExchange","topic.messages",context);//交换机、路邮键
        }
    
    }
    
  • package com.neo.rabbit.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "topic.message")
    public class TopicReceiver {
    
        @RabbitHandler
        public void process(String message) {
            System.out.println("Topic Receiver1  : " + message);
        }
    
    }
    
  • package com.neo.rabbit.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "topic.messages")
    public class TopicReceiver2 {
    
        @RabbitHandler
        public void process(String message) {
            System.out.println("Topic Receiver2  : " + message);
        }
    
    }                                                                                                                                       

send1的测试结果:

2018-07-19 13:51:12.325  INFO 37968 --- [cTaskExecutor-1] c.n.rabbitmq.receive.topic.TopicReceive  : Topic Receiver1  hi, i am message 1
2018-07-19 13:51:12.332  INFO 37968 --- [cTaskExecutor-1] c.n.r.receive.topic.TopicReceive2        : Topic Receiver2  hi, i am message 1

send2的测试结果:

2018-07-19 13:50:02.416  INFO 51108 --- [cTaskExecutor-1] c.n.r.receive.topic.TopicReceive2        : Topic Receiver2  hi, i am message 2

 

  • 最后再测一个广播模式的交换器
  • package com.neo.rabbitmq.send.fanout;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 广播模式: 描述信息
     *
     * @author liyy
     * @date 2018-07-19 10:44
     */
    @Configuration
    public class FanoutRabbitConfig {
        public static String  queueA = "fanoutqueueA";
    
        public static String  queueB = "fanoutqueueB";
    
        public static String  queueC = "fanoutqueueC";
    
        @Bean(name = "queueA")
        public Queue queueA(){
            return new Queue(FanoutRabbitConfig.queueA);
        }
    
        @Bean(name = "queueB")
        public Queue queueB(){
            return new Queue(FanoutRabbitConfig.queueB);
        }
    
        @Bean(name = "queueC")
        public Queue queueC(){
            return new Queue(FanoutRabbitConfig.queueC);
        }
    
        /**
         * 定义广播模式的交换机
         */
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("exchange");
        }
    
        /**
         * 绑定交换机到队列
         */
        @Bean
        Binding bindingExchangeA(Queue queueA, FanoutExchange exchange){
            return BindingBuilder.bind(queueA).to(exchange);
        }
        /**
         * 绑定交换机到队列
         */
        @Bean
        Binding bindingExchangeB(Queue queueB, FanoutExchange exchange){
            return BindingBuilder.bind(queueB).to(exchange);
        }
        /**
         * 绑定交换机到队列
         */
        @Bean
        Binding bindingExchangeC(Queue queueC, FanoutExchange exchange){
            return BindingBuilder.bind(queueC).to(exchange);
        }
    
    }
    
  • package com.neo.rabbitmq.send.fanout;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * : 描述信息
     *
     * @author liyy
     * @date 2018-07-18 18:52
     */
    @Component
    public class FanoutSend {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void send(){
            String context = "hi, i am message 1";
            //routing key设置为空串
            amqpTemplate.convertAndSend("fanoutExchange","",context);
        }
    
    
    }
    
  • package com.neo.rabbitmq.receive.fanout;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 接受者: 描述信息监听队列,可以接受对象
     *
     * @author liyy
     * @date 2018-07-18 17:25
     */
    @Component
    @RabbitListener(queues = "fanoutqueueA")
    public class FanoutReceive1 {
        public static final Logger logger = LoggerFactory.getLogger(FanoutReceive1.class);
    
        @RabbitHandler
        public void process(String message){
            logger.info("fanoutA Receiver1  "+message);
        }
    }
    
  • package com.neo.rabbitmq.receive.fanout;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 接受者: 描述信息监听队列,可以接受对象
     *
     * @author liyy
     * @date 2018-07-18 17:25
     */
    @Component
    @RabbitListener(queues = "fanoutqueueB")
    public class FanoutReceive2 {
        public static final Logger logger = LoggerFactory.getLogger(FanoutReceive2.class);
    
        @RabbitHandler
        public void process(String message){
            logger.info("fanoutB Receiver2  "+message);
        }
    }
    
  • package com.neo.rabbitmq.receive.fanout;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 接受者: 描述信息监听队列,可以接受对象
     *
     * @author liyy
     * @date 2018-07-18 17:25
     */
    @Component
    @RabbitListener(queues = "fanoutqueueC")
    public class FanoutReceive3 {
        public static final Logger logger = LoggerFactory.getLogger(FanoutReceive3.class);
    
        @RabbitHandler
        public void process(String message){
            logger.info("fanoutC Receiver3  "+message);
        }
    }
  • 测试结果:
  • 2018-07-19 11:34:09.680  INFO 55432 --- [cTaskExecutor-1] c.n.r.receive.fanout.FanoutReceive3      : fanoutC Receiver3  hi, i am message 1
    2018-07-19 11:34:09.680  INFO 55432 --- [cTaskExecutor-1] c.n.r.receive.fanout.FanoutReceive2      : fanoutB Receiver2  hi, i am message 1
    2018-07-19 11:34:09.680  INFO 55432 --- [cTaskExecutor-1] c.n.r.receive.fanout.FanoutReceive1      : fanoutA Receiver1  hi, i am message 1 
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐