消息中间件

MQ概念Message Queue

消息队列,通常在分布式集群中充当消息中间件,负责在多个工程和应用之间传递消息

MQ的产品

RabbitMq,ActiveMq,Kafka,RocketMq,Redis(消息订阅和发布-MQ:小型)

为什么需要MQ?

在原来的项目中,我们使用过HTTPClient进行系统间的通信;但是使用HTTPClient和WebService都是同步请求,被调用方没结束,那么调用方就会处于一个持续堵塞的状态,不能进行后续的工作,使用MQ作为消息中间件之后,就可以消除这种依赖,起到异步解耦的作用,调用方只需要将消息放入MQ中,消费方什么时候调用,调用多长时间都和调用方无关了;

什么是RabbitMQ?

RabbitMQ是一种实现了MQ设计理念的产品。像ActiveMQ(JMS),Kafka也是类似的产品。
RabbitMQ是一个基于ErLang语言和AMQP(Advanced Message
Queuing Protocol)传输协议开发的高并发的消息队列服务程序

RabbitMq的安装

1、创建目录
创建目录来存放rabbitmq的相关文件
cd /usr/local/
mkdir rabbitmq
cd rabbitmq
2、安装erlang
上传erlang 安装包,通过命令安装
rpm -ivh erlang-20.1.7-1.el6.x86_64.rpm
3、安装rabbitMQ,rabbitmq-server
上传我们准备好安装包 rpm -ivh
4、启动rabbitMQ服务

启动停止命令:
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
5、拷贝配置文件,并设置用户信息

cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
cd /etc/rabbitmq
mv rabbitmq.config.example rabbitmq.config
vim/etc/rabbitmq/rabbitmq.config

将{loopback_users, []}前面的注解“%%”去掉
6、开启web界面管理工具,并重启rabbitMQ

在当前目录执行即可
rabbitmq-plugins enable rabbitmq_management
7、修改防火墙配置文件:开放15672 和 5672端口
15672:网页访问mq管理系统需要的端口
5672:Java程序连接mq时用到的端口
8、通过浏览器访问管理界面
http://ip:15672

用户名密码为guest
#上传erlang 安装包,通过命令安装
rpm -ivh erlang-20.1.7-1.el6.x86_64.rpm

RabbitMq的工作模型

1.HelloWorld模型
P->消息队列->C
2.工作模型
P->消息队列->C1,C2…
多个消费者是轮询消费
3.发布订阅模式(重点)
X(路由/交换机)::起到一个消息分发的作用(默认:type=fanout)

		 ->消息队列1->C1
P->X->	{					
		 ->消息队列2->C2
		...

是开发中最基础的模型
4.Routing
X:路由器的类型不同,type=direct,有路由键的概念,发布的消息会根据某些路由的路由键有选择性的发布到指定的消息队列

		-路由键->消息队列1->C1
P->X->{	
		-路由键->消息队列2->C2
		...

5.Topic
X:路由器的类型不同,type=topic支持通配符*/#

		-路由键->消息队列1->C1
P->X->{	
		-路由键->消息队列2->C2
		...

6.RPC(了解)
在这里插入图片描述

RabbitMq的概念

Channel:管道,在RabbitMq中,通过连接可以获得管道,管道对象是操作一切的核心对象(队列,路由,绑定,消费,发送…)
Queue:队列,存放消息的对象,Queue可以进行消息的挤压
Exchange:交换机,消息分发的组件,交换机没有存储消息的能力

Channel中的常用方法
1.queueDeclare:声明一个队列(重要)

参数一:String queue,队列名称

参数二:Boolean durable,是否持久化
非持久化的队列在rabbitmq重启后会丢失,消息肯定也丢失

参数三:Boolean exclusive,是否为排他队列
排他队列只对创建这个队列的连接可见,一旦连接断开,排他队列自动删除(不管是否设置持久化),注意:排他队列是对当前连接可见,一个连接创建的多个队列对其中的排他队列是可见的;

参数四:Boolean autoDelete,是否自动删除
一旦有消费者绑定到队列上,如果消费者全部解绑,队列会自动删除

参数五:Map<Object,String> arguments,一些额外的属性配置

2.exchangeDeclare:声明一个路由(交换机)
常用参数

参数一:String exchange,路由名称

参数二:String type,路由的类型(fanout|direct|topic|headers)

其他不常用参数:

参数三:Boolean durable,是否持久化
非持久化的路由在rabbitmq重启后会丢失

参数四:Boolean autoDelete,是否自动删除
当一个路由如果绑定了一个或者多个队列或者路由,如果后面又将所有的绑定全部解除,则该路由就会自动删除

参数五:Boolean internel,是否为内置路由
一个内置路由,提供者无法直接向内置路由发送消息,内置路由只能接受其他路由发送的消息

参数六:Map<Object,String> arguments,一些额外的属性配置

3.queueBind:队列绑定到交换机

参数一:String queue,需要绑定的队列名称
参数二:String exechange,绑定到的交换机名称
参数三:String routingKey,路由键(fanout交换机不支持路由键)
参数四:Map<Object,String> arguments,一些额外的属性配置

RabbitMq进阶

在rabbitmq可以给消息和队列设置过期时间

#####1.给消息设置过期时间(重要)
a)通过队列的属性给队列中的消息设置过期时间(某一队列中所有消息)

Map<String,Object> map = new HashMap<>();
//该队列中的消息发布后五秒后会过期
map.put("x-message-ttl",5000);
queueDeclare("queueName1",false,false,false,map);

b)通过消息本身的属性设置过期时间(某一条消息)

AMQP.BasicProperties prop = new AMQP.BasicProperties()
AMQP.BasicProperties.Builder builder = prop.bulider();
//设置过期时间
builder.expiration("5000");
chanle.publishProvider("queueName1",builder.bulid(),"hello ttl".getBytes("utf-8"));
注意:

1.通过队列属性设置消息的过期时间,对队列中所有消息有效

2.通过消息本身的属性设置过期时间只对该消息本身有效

3.如果通过队列的属性设置消息的过期时间,则过期的消息会立刻被队列移除,因为这个时候过期的消息一定在队头,因此rabbitmq只需要定期扫描队头就可以了

4.如果通过消息本身的属性设置过期时间,当消息过期时,恰好在队头的话,就会立刻移除,如果在队中的话,不会被立刻移除,当它在队头的时候才会移除

注:RabbitMq出于性能考虑肯定不会扫描全部消息判断全部过期时间

5.当给队列的属性设置过期时间时,如果x-message-ttl设置为0,表示一个消息如果投递到队列中,如果该队列有消费者绑定,则会立刻消费该消息,如果没有消费者绑定,则消息会立刻丢失

2.给队列设置过期时间(意义不大)
Map<String,Object> map = new HashMap<>();
//设置队列的过期时间
map.put("x-expires",5000);
queueDeclare("queueName1",false,false,false,map);

当给一个队列设置过期时间后,如果在有效期内,没有消费者绑定该队列,同时也没有提供将消息发送到该队列,则该队列自动删除;
如果该队列中有积压的消息,则队列到期后也不会被删除;

队列的类型
1.死信队列:专用于接受死信消息
概念:

当某个消息变成了死信消息时.会发送给绑定在这个队列上的死信路由,如果还有一个队列绑定在死信路由上,则这个队列是死信队列

其实队列本身并没有分类,而是出于业务逻辑我们对于不同队列的作用不同进行了不同的称呼

死信队列可以被当做处理一些本来会被丢失的消息,有点类似于一个垃圾回收站,但是在某些应用场景下又不全是,比如订单回库的时候这些信息又是必不可少的

什么时候一个消息会变成死信消息?

1.消息过期,同时该消息是队头
2.队列已满,添加消息时,新消息并不会被拒绝,而是队头的消息就变成了死信消息出队

Map<String,Object> map = new HashMap<>();
//设置队列最大有多少个消息
map.put("x-max-length",2000);
//设置队列最大字节数
map.put("x-max-length-bytes",1024 * 1024);
channel.queueDeclare("queueName1",false,false,false,map);

3.消息被消费者拒绝,同时requeue设置为false

简单的死信路由
//1.创建一个死信路由
channel.exchangeDeclare("dx_exchange","fanout");
//2.创建一个死信队列
channel.queueDeclare("dx_queue",false,false,false,map);
//3.将死信队列绑定到死信路由
channel.queueBind("dx_queue","dx_exchange","");
//4.将死信路由设置给普通队列
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","dx_exchange");

//普通队列channel.queueDeclare("queue",false,false,false,map);
//普通路由
channel.exchangeDeclare("exchange","fanout");
//普通队列和普通路由绑定
channel.queueBind("queue","exchange","");
2.延迟队列
概念:

过了一个规定时间之后,消费者才能消费队列中的消息

rabbitmq本身没有提供延迟队列,我们通过死信队列+ttl失效时间实现延迟队列

可以把一个设置了失效时间的普通队列和其死信路由+死信队列看做一个整体,消费者消费这个死信队列实现了定时消费死信消息

绑定死信路由的普通队列的必须没有消费者,消费者在死信队列消费消息

延迟消息运用场景:

下单:通常用户下单后一定的付款时间,当付款时间到了之后,如果已经付款直接关闭订单,如果没有付款,订单关闭,交易失败,商品库存加回数据库;

消息持久化

消息持久化是消息安全性的重要保证
rabbitmq分为三部分:
1.路由持久化druable=true
2.队列持久化druable=true,队列持久化不意味着消息持久化
3.消息持久化:
方式一:

chanle.publishProvider("queueName1",MessageProperties.PERSIST_TEXT_PLAIN,"hello ttl".getBytes("utf-8"));

方式二:

AMQP.BasicProperties prop = new AMQP.BasicProperties()
AMQP.BasicProperties.Builder builder = prop.bulider().deliverMode(2).build();
chanle.publishProvider("queueName1",builder.bulid(),"hello".getBytes("utf-8"));
注意:

1.如果设置了消息持久化,没有设置队列持久化,是没有任何意义的
2.持久化消息,rabbit会写入硬盘,操作硬盘比操作内存会慢很多,用持久化消息会消耗rabbitmq的性能,因此如果对消息的可靠性没绝对的要求,最好还是不要设置持久化消息,用以提高服务器整体的吞吐量

思考:

如果一个rabbitmq的服务设置了路由持久化,队列持久化和消息持久化,能否保证数据的绝对可靠?
不能保证,有两个原因:
1.提供者提供消息的时候
提供者提供消息后可能rabbitmq来不及进行持久化就发生了宕机,此时提供者认为消息已经发送成功,所以消息丢失了
2.消费者消费消息的时候
当消费者来不及消费时就发生了宕机,仍然没有消费消息,但是rabbitmq磁盘中已经没有了该消息

综上,RabbitMq就有消息确认机制保证数据的强一致性

消息确认机制

1.生产者消息确认机制

概念:

当生产者将消息发送出去后,如何确保消息正常到MQ服务器?默认情况下,发送消息的操作是不会返回任何消息给生产者,确保消息的正确送达。如果消息在发送到MQ服务器途中就丢失了,则持久化也保证不了消息的安全性。为了解决这个问题,Rabbitmq引入了生产者确认机制

生产者消息确认机制的实现两种方法:

a)事务机制(重量级):

  • channel.txSelect();将当前管道设置成事务模式
  • channel.txCommit();提交事务,提交事务成功,可以确保消息一定到达MQ服务器
  • channel.txRollback();回滚事务
channel.txSelect();
try {
	channel.basicPublish("tx_exchange", "", null, "message".getBytes());
	channel.txCommit();
} catch (Exception e){
	e.printStackTrace();
	channel.txRollback();
	//消息的重试机制
}
注意:事务机制虽然可以确保消息的正确到达,但是对服务器性能耗损比较大,一般都不会使用这种方案

b)发送方确认机制(轻量级):
发送方确认机制(publisher confirm):
前面提到的事务机制,虽然可以保证消息的正确发送,但是严重影响rabbitmq的性能,因此引入了一种轻量级的消息确认机制 – publisher confirm

publisher confirm机制:
首先将管道设置为confirm模式,然后在该管道上发送的所有消息,都会被指派一个唯一ID(从1开始递增),当消息投递到队列上后,服务器会发送一个确认信息(携带消息的ID)给生产者,生产者就知道消息已经正确达到服务,如果因为服务器的问题导致消息丢失,服务器也会发送一个失败的消息给生产者,生产者就可以尝试进行重试逻辑。 这一整个过程可以是一个同步过程也可以是一个异步过程。

不推荐使用:同步模式

try {
	//将管道设置为confirm模式
    channel.confirmSelect();
    channel.basicPublish("tx_exchange", "", null, ("message").getBytes());
    if(channel.waitForConfirms()){
        System.out.println("消息正常发送!");
    } else {
        System.out.println("消息发送异常,进行重试逻辑!");
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

推荐使用:异步模式

//设置confirm模式
channel.confirmSelect();
//添加异步回调监听
channel.addConfirmListener(new ConfirmListener() {
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        //消息确认
        System.out.println("消息确认id:" + deliveryTag);
        //从缓存中剔除消息
        //单条消息确认失败
    }

    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        //消息异常
        System.out.println("消息失败id:" + deliveryTag);
        //引入重试逻辑
        //批处理消息确认失败
    }
});

for(int i = 0; i < 100000; i++){
    long id = channel.getNextPublishSeqNo();
    System.out.println("获得消息id:" + id) ;
    channel.basicPublish("tx_exchange", "", null, ("message" + i).getBytes());
    //将消息添加进缓存中
}

2.消费者消息确认和拒绝机制

//第一个参数表示消息标识
//第二个参数表示是否批量确认消息
channel.basicAsk(long id,Boolean multipt)
//第一个参数表示消息标识
//第二个参数表示拒绝消息后,消息是否重新放回队列
channel.basicReject(long id,Boolean requeue)

注意:如果requeue设为false,则该消息会变成死信消息,如果requeue设置为true则会重新投递到消费端,如果只有一个消费端就会变成(拒绝消费-重新投递)的一个死循环,通常如果需要将requeue设置为true切记保证至少有两个以上的消费方

消费者端设置消息的限制:
//当前消费者最多从rabbitmq服务器领取10条十条未确认的消息
//只有消息确认消费了rabbitmq才能再推一条给消费者
channel.basicQos(10);

channel.basicQos会限制当前管道上的消费者所能保持的最大未确认消息数量,如果有一个消息被确认,则队列会立刻再推送下一个消息到消费者端,依次类推

实际开发中根据服务器的性能进行适当的设置

消息确认简单Demo
Provider.java

public class Provider {

    private static TreeMap<Long, Object> treeMap = new TreeMap<>();

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //创建队列
        channel.queueDeclare("queue1", false, false, false, null);

        //创建路由
        channel.exchangeDeclare("exchange1", "fanout");

        //路由和队列绑定
        channel.queueBind("queue1", "exchange1", "");

        //将管道设置为事务模式
//        channel.txSelect();
//        try {
//            //发送消息 - 路由
//            channel.basicPublish("exchange1", "",
//                    null, "Hello!!".getBytes("utf-8"));
//            channel.txCommit();
//            System.out.println("消息正常发送!");
//        } catch (IOException e) {
//            channel.txRollback();
//            //引入消息的重新发送机制
//            System.out.println("消息发送失败,尝试重发!");
//        }

        //publish confirm

        // ----- 同步
//        channel.confirmSelect();//将管道设置为confirm模式
//        //发送消息 - 路由
//        channel.basicPublish("exchange1", "",
//                null, "Hello!!".getBytes("utf-8"));
//        //同步等待消息确认
//        //channel.waitForConfirms() 方法返回true表示消息发送成功 false表示消息发送失败
//        try {
//            if(channel.waitForConfirms()){
//                //发送成功
//            } else {
//                //发送失败
//                //引入重试机制
//            }
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }

        // ----- 异步
        //准备一个消息缓存集合

        channel.confirmSelect();//将管道设置为confirm模式
        //添加异步的回调监听
        channel.addConfirmListener(new ConfirmListener() {

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                //确定消息已经正常发送
                //deliveryTag - 确认消息发送成功的消息id
                //multiple - 是否为批量操作,如果为false,表示当前这条deliveryTag的消息发送成功,
                // 如果为true,表示当前这条deliveryTag的消息之前的所有消息发送成功

                if(!multiple){
                    //单挑确认
                    treeMap.remove(deliveryTag);
                } else {
                    //批量确认 - deliveryTag 5   1 2 3 4 5
                    treeMap = (TreeMap<Long, Object>) treeMap.tailMap(deliveryTag + 1);
                }
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                //消息发送异常
                //deliveryTag - 确认消息发送异常的消息id
                //multiple - 是否为批量操作,如果为false,表示当前这条deliveryTag的消息发送失败,
                // 如果为true,表示当前这条deliveryTag的消息之前的所有消息发送失败

                if(!multiple){
                    //单条失败
                    //重试 - msg
                    Object msg = treeMap.get(deliveryTag);
                    //重试机制
                } else {
                    //批量失败  5    1 2 3 4 5
                    TreeMap<Long, Object> map = (TreeMap<Long, Object>) treeMap.headMap(deliveryTag);
                    //重试机制 - map
                }
            }
        });


            //本条消息发送的id
            long number = channel.getNextPublishSeqNo();
            //发送消息 - 路由
            channel.basicPublish("exchange1", "",
                    null, ("Hello World").getBytes("utf-8"));
            //发送完一个消息就将该消息缓存到treemap中
            treeMap.put(number, ("Hello World"));


        connection.close();
    }
}

Consumer.java

public class Consumer {

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        //创建队列
//        channel.queueDeclare("queue1", false, false, true, null);

        //限制消费的条数
        channel.basicQos(300);
        //当前消费者最多从rabbitmq服务器领取10条未确认的消息,一般有一条消息确认,rabbitmq再推一条给消费者
        channel.basicConsume("queue1", false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:" + new String(body, "utf-8"));
                //xxxxxx

                //手动确认
                if(!new String(body, "utf-8").equals("Hello World")){
                    //确认消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } else {
                    System.out.println("拒绝了这个消息~!!!");
                    //拒绝消息
                    channel.basicReject(envelope.getDeliveryTag(), true);
                }

            }
        });

    }
}
RabbitMq的应用场景
1服务间的异步通信

HttpClient

2.保持消息的消费顺序
3.定时任务处理

订单关闭时间

4.请求削峰

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全瘫痪。

SpringBoot整合RabbitMq

引入依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置:

spring:
  rabbitmq:
    host: 192.168.226.144
    port: 5672
    username: admin
    password: admin
    virtual-host: /admin_host
设置不同模式:
1.简单的消息模式

提供者

@Bean
public Queue getQueue(){
	return new Queue("SimpleQueue");
}

编写消息处理器

@Component
@RabbitListener(queues = "SimpleQueue")
public class MyRabbitHandler {

    @RabbitHandler
    public void handler(String message){
        System.out.println(message);
    }
}
2.Fanout交换机模式

提供者:

  • 配置交换机与队列绑定
@Configuration
public class Config {

    @Bean
    public Queue getQueueOne(){
        return new Queue("one");
    }

    @Bean
    public Queue getQueueTow(){
        return new Queue("tow");
    }

    @Bean
    public FanoutExchange getFanoutExchange(){
        return new FanoutExchange("fanout_exchange");
    }

    @Bean
    public Binding bindingExchangeOne(Queue getQueueOne, FanoutExchange getFanoutExchange){
        return BindingBuilder.bind(getQueueOne).to(getFanoutExchange);
    }

    @Bean
    public Binding bindingExchangeTow(Queue getQueueTow,FanoutExchange getFanoutExchange){
        return BindingBuilder.bind(getQueueTow).to(getFanoutExchange);
    }
}
  • 注入RabbitMQ模板对象
@Autowired
private RabbitTemplate rabbitTemplate;
rabbitTemplate.convertAndSend("fanout_exchange","","Hello RabbitMQ");

消费者

  • 配置交换机与队列绑定
与提供者想相同,解耦,不需要先提供提供者
  • 配置监听者
@Component
public class MyRabbitHandler {
   @RabbitHandler
   @RabbitListener(queues = "one")
   public void processOne(String str){
       System.out.println("队列one接收到对象:"+ str);
   }

   @RabbitHandler
   @RabbitListener(queues = "tow")
   public void processTow(String str){
       System.out.println("队列tow接收到对象:" + str);
   }
}

3.Topic交换机
提供者

  • 配置交换机与队列绑定
@Configuration
public class TopicRabbitConfig {

    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("topic_exchange");
    }

    @Bean
    public Queue getQueue(){
        return new Queue("topic_queue");
    }

    @Bean
    public Binding bindingExchange(Queue getQueue,TopicExchange getTopicExchange){
        return BindingBuilder.bind(getQueue).to(getTopicExchange).with("a.*");
    }

}
  • 注入RabbitMQ模板对象
@Autowired
private RabbitTemplate rabbitTemplate;

rabbitTemplate.convertAndSend("topic_exchange","a.nba","Hello RabbitMQ1");
rabbitTemplate.convertAndSend("topic_exchange","a.nba.kobe","Hello RabbitMQ2");

消费者

  • 配置监听者
 @RabbitHandler
 @RabbitListener(queues = "topic_queue")
 public void processTopic(String str){
     System.out.println("str:" + str);
 }
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐