rabbitmq项目实战
很久之前就想写了,消息队列不管是在分布式项目,还是单体项目中,都是使用率很高的消息中间件,先简单介绍一下市面上常用到的消息队列,然后说一下rabbitmq的使用消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题常用的有:RabbitMQ,ActiveMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ,每一种mq都有自己的使用场景,一般都是在web应用或者
很久之前就想写了,消息队列不管是在分布式项目,还是单体项目中,都是使用率很高的消息中间件,先简单介绍一下市面上常用到的消息队列,然后说一下rabbitmq的使用
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题
常用的有:RabbitMQ,ActiveMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ,每一种mq都有自己的使用场景,一般都是在web应用或者安卓应用使用
rabbitmq使用 框架使用spring cloud
1 创建生产者并导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
package com.pactera.business.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.UnsupportedEncodingException;
@Configuration
public class RabbitConfig {
/**
* 所有的消息发送都会转换成JSON格式发到交换机
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息是否必须路由到一个队列中,配合Return使用
rabbitTemplate.setMandatory(true);
// 消息返回, yml需要配置 publisher-returns: true
// 为RabbitTemplate设置ReturnCallback
//主要是两个回调,一个是confirm回调,一个是return回调,这两个有什么不同呢?
//经检验得知,如果推送去一个不存在交换机上,那么就会触发confirm回调;如果推送去一个存在的交换机,但对应的路由键(或者说队列)不存在,则会触发return回调。
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
try {
System.out.println("--------收到无法路由回发的消息--------");
System.out.println("ReturnCallback: " + "消息:" + message);
System.out.println("ReturnCallback: " + "回应码:" + replyCode);
System.out.println("ReturnCallback: " + "回应信息:" + replyText);
System.out.println("ReturnCallback: " + "交换机:" + exchange);
System.out.println("ReturnCallback: " + "路由键:" + routingKey);
System.out.println("properties:" + message.getMessageProperties());
System.out.println("body:" + new String(message.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
});
// Confirm异步确认,收到服务端的ACK以后会调用
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("--------收到服务端异步确认--------");
System.out.println("ConfirmCallback: " + "相关数据:" + correlationData);
System.out.println("ConfirmCallback: " + "确认情况:" + ack);
System.out.println("ConfirmCallback: " + "原因:" + cause);
}
});
return rabbitTemplate;
}
}
package com.pactera.business.rabbit;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@ConfigurationProperties(prefix = "mq")
public class RabbitSender {
@Value("${mq.directexchange}")
private String directExchange;
// 自定义的模板,所有的消息都会转换成JSON发送
@Autowired
AmqpTemplate amqpTemplate;
public void send(Map<String, Object> map, String directRoutingKey) {
// 发送JSON字符串
ObjectMapper mapper = new ObjectMapper();
String sycMsg = null;
try {
sycMsg = mapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
System.out.println(sycMsg);
amqpTemplate.convertAndSend(directExchange, directRoutingKey, sycMsg);
}
}
调用生产者消息队列产生一条队列发送给消费者
rabbitSender.send(paramMap, app.getAppKey());
2 配置rabbitmq文件
//application.yml 配置
mq:
directexchange: PARK_CLOUD_TO_APP_EXCHANGE
spacequeue: PARK_SPACE_QUEUE
directroutingkey: park-space
//application-dev.yml
// 生产者
rabbitmq:
username: admin
password: admin
host: 192.168.9.58
port: 5672
virtual-host: park-dev
// 消费者
rabbitmq:
username: admin
password: admin
host: 192.168.9.58
port: 5672
virtual-host: park-dev
listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: manual
retry:
enabled: true
max-attempts: 3
initial-interval: 5000
direct:
acknowledge-mode: manual
3 创建消费者
package com.pactera.business.mq;
import com.pactera.business.task.DataSynTask;
import com.pactera.utlis.JsonUtils;
import com.pactera.utlis.LoggerHelper;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
@PropertySource("classpath:application.yml")
public class FirstConsumer {
@Autowired
private DataSynTask dataSynTask;
@RabbitHandler
@RabbitListener(queues = "${mq.energyqueue}", containerFactory =
"rabbitListenerContainerFactory")
public void process(String sycMsg, Channel channel, Message message) throws IOException {
System.out.println("energy Queue received msg : " + sycMsg);
try {
Map<String, Object> paramMap = JsonUtils.JsonToMap2(sycMsg);
// 消费生产者队列里的消息信息
dataSynTask.sycUserOrg(paramMap, 2);
System.out.println("energy Queue 消费消息--成功 : " + sycMsg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
LoggerHelper.info(this.getClass(), "消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
LoggerHelper.info(this.getClass(), "消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
package com.pactera.business.mq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import java.util.HashMap;
@Configuration
@PropertySource("classpath:application.yml")
public class RabbitConfig {
@Value("${mq.energyqueue}")
private String energyqueue;
@Value("${mq.directexchange}")
private String directExchange;
@Value("${mq.directroutingkey}")
private String directroutingkey;
// 创建队列
@Bean("energyqueue")
public Queue getFirstQueue() {
//队列持久化
return new Queue(energyqueue, true);
}
// 创建交换机
@Bean("directExchange")
public DirectExchange getDirectExchange() {
//交换机持久化
return new DirectExchange(directExchange, true, false, new HashMap<>());
}
// 定义绑定关系
@Bean
public Binding bindFirst(@Qualifier("energyqueue") Queue queue, @Qualifier(
"directExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(directroutingkey);
}
/**
* 在消费端转换JSON消息
* 监听类都要加上containerFactory属性
*
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setAutoStartup(true);
return factory;
}
}
关键词:
Channel:是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Channel:的原理是一条线程一条通道,多条线程多条通道同用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。
Publisher:向交换器发布消息的客户端应用程序,即消息的生产者
Consumer:从消息队列中取得消息的客户端应用程序,即消息的消费者
Queue(队列)是RabbitMQ的内部对象,用于存储消息,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者可以从Queue中获取消息并消费。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。(另外需要注意的是 生产者不是跟队列直接相连的,中间还有个exchange,这里省略了)
routing key:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。(Exchange type 就是 exchange的路由类型, binding key就是exchange与queue绑定时的一个key,相当于给这个绑定取个名字.这个名字是跟 routing key 相同或者包含的)
Binding:RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
Binding key:在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。 在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
Exchange:用来接收生产者发送的消息并将这些消息路由给服务器中的队列。(根据Exchange Type来决定 怎么分发到不同的Queue上)
Exchange Types:RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。另模式其实还有更多,但是常用的就这几种
fanout(广播模式):fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。即每个Queue都会收到相同的消息.
direct(完全匹配模式):direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
RPC:MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ中实现RPC的机制是:
1:客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
2:服务器端收到消息并处理
3:服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
4:客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理
更多推荐
所有评论(0)