springboot集成rabbitMQ实现消息的推送
RabbitMQ消息中间件组件,目前比较流行的消息中间件有:RabbitMQ、RocketMQ、ActiveMQ、Kafka等。我的代码中用的是RabbitMQ,先介绍几个概念: 一:消息队列的特性如下:异步性,将耗时的同步操作通过以发送消息的方式进行了异步化处理,减少了同步等待的时间。松耦合,消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心...
RabbitMQ消息中间件组件,目前比较流行的消息中间件有:RabbitMQ、RocketMQ、ActiveMQ、Kafka等。
我的代码中用的是RabbitMQ,先介绍几个概念:
一:消息队列的特性如下:
- 异步性,将耗时的同步操作通过以发送消息的方式进行了异步化处理,减少了同步等待的时间。
- 松耦合,消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行。
- 分布式,通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性(当然消息队列本身也可以做成分布式集群)。
- 可靠性,消息队列一般会把接收到的消息存储到本地硬盘上(当消息被处理完之后,存储信息根据不同的消息队列实现,有可能将其删除),这样即使应用挂掉或者消息队列本身挂掉,消息也能够重新加载。
二:AMQP(Advanced Message Queuing Protocol)高级消息队列协议,
-
是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,如 Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 三:消息发送过程中的几个对象
-
发消息者 send
-
队列 queue
-
收消息者 receive
-
交换器 Exchange
-
路由 routing
-
-
发送者------交换器(路由并且过滤消息)-------队列(队列存储并发送消息)------接收者
-
虚拟主机、交换机、队列和绑定。
-
一个虚拟主机持有一组交换机、队列、绑定。RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止 A 组访问 B 组的交换机/队列/绑定,必须为 A 和 B 分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。
-
交换器常用的规则:topic(最灵活。根据路由键可以模糊匹配)fanout(广播形式的) 一下是我的测试代码。都测试通过!
-
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
@Configuration public class RabbitConfig { @Bean public Queue helloQueue() { return new Queue("hello"); } @Bean public Queue neoQueue() { return new Queue("neo"); } @Bean public Queue objectQueue() { return new Queue("object"); } }
定义三个队列名称分别为hello.neo.object
-
发送者
-
@Component public class HelloSend { public static final Logger logger = LoggerFactory.getLogger(HelloSend.class); @Autowired private AmqpTemplate rabbitTemplate; public void send(){ String message = "hello "+new Date(); logger.info("message:"+message); rabbitTemplate.convertAndSend("hello",message); logger.info("队列发送成功"); } }
- 接受者 监听hello队列
-
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
- 以上是最简单的直接向队列里面发送信息。接收者监听队列并接受信息 TOPIC模式的信息推送(分红色的代码前后对应。如果不对应需要在队列上添加(name="")指明调用的具体队列名称
-
package com.neo.rabbitmq.send.topic; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * topic交换机模式配置类: 描述信息 * * @author liyy * @date 2018-07-18 18:36 */ @Configuration public class TopicRabbitConfig { public static final String message = "topic.message"; public static final String messages = "topic.messages"; /** * 定义消息队列1 * @return */ @Bean(name = "queueMessage") public Queue messageQueue(){ return new Queue(TopicRabbitConfig.message); } /** * 定义消息队列2 * @return */ @Bean(name = "queueMessages") public Queue messagesQueue(){ return new Queue(TopicRabbitConfig.messages); } /** * 定义交换机 */ @Bean public TopicExchange exchange(){ return new TopicExchange("topicExchange"); } /** * 绑定消息队列到交换机,路由key:topic.message * @return */ @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } /** * 绑定消息队列到交换机,路由key:topic.# * @return */ @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange){ return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
-
package com.neo.rabbitmq.send.topic; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * : 描述信息 * * @author liyy * @date 2018-07-18 18:52 */ @Component public class TopicSend { @Autowired private AmqpTemplate amqpTemplate; public void send1(){ String context = "hi, i am message 1"; amqpTemplate.convertAndSend("topicExchange","topic.message",context); } public void send2(){ String context = "hi, i am message 2"; amqpTemplate.convertAndSend("topicExchange","topic.messages",context);//交换机、路邮键 } }
-
package com.neo.rabbit.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "topic.message") public class TopicReceiver { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver1 : " + message); } }
-
package com.neo.rabbit.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "topic.messages") public class TopicReceiver2 { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver2 : " + message); } }
send1的测试结果:
2018-07-19 13:51:12.325 INFO 37968 --- [cTaskExecutor-1] c.n.rabbitmq.receive.topic.TopicReceive : Topic Receiver1 hi, i am message 1
2018-07-19 13:51:12.332 INFO 37968 --- [cTaskExecutor-1] c.n.r.receive.topic.TopicReceive2 : Topic Receiver2 hi, i am message 1
send2的测试结果:
2018-07-19 13:50:02.416 INFO 51108 --- [cTaskExecutor-1] c.n.r.receive.topic.TopicReceive2 : Topic Receiver2 hi, i am message 2
- 最后再测一个广播模式的交换器
-
package com.neo.rabbitmq.send.fanout; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 广播模式: 描述信息 * * @author liyy * @date 2018-07-19 10:44 */ @Configuration public class FanoutRabbitConfig { public static String queueA = "fanoutqueueA"; public static String queueB = "fanoutqueueB"; public static String queueC = "fanoutqueueC"; @Bean(name = "queueA") public Queue queueA(){ return new Queue(FanoutRabbitConfig.queueA); } @Bean(name = "queueB") public Queue queueB(){ return new Queue(FanoutRabbitConfig.queueB); } @Bean(name = "queueC") public Queue queueC(){ return new Queue(FanoutRabbitConfig.queueC); } /** * 定义广播模式的交换机 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("exchange"); } /** * 绑定交换机到队列 */ @Bean Binding bindingExchangeA(Queue queueA, FanoutExchange exchange){ return BindingBuilder.bind(queueA).to(exchange); } /** * 绑定交换机到队列 */ @Bean Binding bindingExchangeB(Queue queueB, FanoutExchange exchange){ return BindingBuilder.bind(queueB).to(exchange); } /** * 绑定交换机到队列 */ @Bean Binding bindingExchangeC(Queue queueC, FanoutExchange exchange){ return BindingBuilder.bind(queueC).to(exchange); } }
-
package com.neo.rabbitmq.send.fanout; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * : 描述信息 * * @author liyy * @date 2018-07-18 18:52 */ @Component public class FanoutSend { @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hi, i am message 1"; //routing key设置为空串 amqpTemplate.convertAndSend("fanoutExchange","",context); } }
-
package com.neo.rabbitmq.receive.fanout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 接受者: 描述信息监听队列,可以接受对象 * * @author liyy * @date 2018-07-18 17:25 */ @Component @RabbitListener(queues = "fanoutqueueA") public class FanoutReceive1 { public static final Logger logger = LoggerFactory.getLogger(FanoutReceive1.class); @RabbitHandler public void process(String message){ logger.info("fanoutA Receiver1 "+message); } }
-
package com.neo.rabbitmq.receive.fanout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 接受者: 描述信息监听队列,可以接受对象 * * @author liyy * @date 2018-07-18 17:25 */ @Component @RabbitListener(queues = "fanoutqueueB") public class FanoutReceive2 { public static final Logger logger = LoggerFactory.getLogger(FanoutReceive2.class); @RabbitHandler public void process(String message){ logger.info("fanoutB Receiver2 "+message); } }
-
package com.neo.rabbitmq.receive.fanout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 接受者: 描述信息监听队列,可以接受对象 * * @author liyy * @date 2018-07-18 17:25 */ @Component @RabbitListener(queues = "fanoutqueueC") public class FanoutReceive3 { public static final Logger logger = LoggerFactory.getLogger(FanoutReceive3.class); @RabbitHandler public void process(String message){ logger.info("fanoutC Receiver3 "+message); } }
- 测试结果:
- 2018-07-19 11:34:09.680 INFO 55432 --- [cTaskExecutor-1] c.n.r.receive.fanout.FanoutReceive3 : fanoutC Receiver3 hi, i am message 1
2018-07-19 11:34:09.680 INFO 55432 --- [cTaskExecutor-1] c.n.r.receive.fanout.FanoutReceive2 : fanoutB Receiver2 hi, i am message 1
2018-07-19 11:34:09.680 INFO 55432 --- [cTaskExecutor-1] c.n.r.receive.fanout.FanoutReceive1 : fanoutA Receiver1 hi, i am message 1
更多推荐
所有评论(0)