1 MQ 消息队列

MQ全称为Message Queue,即消息队列;RabbitMQ (Messaging that just works — RabbitMQ)由erlang语言开发,基于AMQP协议实现的消息队列; 

思考:为什么需要用到消息队列呢?它有什么好处呢?

传统模式

  • 账号注册完成之后,发送短信,再发送邮件;整个流程都是主线程执行,一个花了150ms。
  • 缺点:代码臃肿、执行时间相对长、如果后期新增发微信,则不好扩展。

多线程模式

  • 执行完成主业务(账号注册),则通过创建子线程来处理次要业务。它只需要花50ms即可给用户响应。
  • 优点:响应速度比较快
  • 缺点:1)代码还是臃肿;2)需要扩展性不好,比如后期新增发送微信功能还得改源代码;3)需要实现容错比较复杂,比如子线程处理错误,则怎么实现补偿机制;4)子线程和主业务代码在同一台服务器,如果并发量很高的情况,其实性能还是受限。

消息队列模式

  • 执行完成主业务(账号注册),则发送消息给消息队列,由消息队列的消费端监听并处理后续业务。它只需要花50ms即可给用户响应。
  • 优点:1)响应速度比较快;2)业务彻底解耦,编码变的更加简单;3)由于服务都是独立部署在不同服务器,所以性能更加的高。4)需求扩展性好,比如后期新增发送微信功能,则只需要创建一个微信服务监听消息队列即可。5)可以灵活的处理容错性。
  • 缺点:架构变的更加复杂了;服务数量比较多。

常见的其它消息队列 :ActiveMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis(也可做消息队列)

特性ActiveMQRabbitMQRocketMQKafka
语言JavaErlangJavaScala
吞吐量万级万级10万级10万级
速度毫秒微秒毫秒毫秒
集群主从架构主从架构分布式架构分布式架构
场景---大数据实时处理
  • 性能排序:Kafka>RocketMQ>RabbitMQ>ActiveMQ
  • 部署难度:ActiveMQ<RabbitMQ<Kafka<RocketMQ

消息队列的优势:

1)解耦:系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

 2)异步:如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能

 3)削峰:如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。

2 RocketMQ

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。

NameServer:主要负责对于源数据的管理,包括了对于Topic和路由信息的管理。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。

Broker:负责存储消息,转发消息。单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer。

2.1 环境安装

快速开始 | RocketMQ

1)单集群启动

# 启动NameServer

nohup sh bin/mqnamesrv &

# 验证Name Server 是否启动成功

tail -f ~/logs/rocketmqlogs/namesrv.log

NameServer成功启动后,我们启动Broker和Proxy。

nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

tail -f ~/logs/rocketmqlogs/proxy.log 

关闭服务器

sh bin/mqshutdown broker

sh bin/mqshutdown namesrv

2)管控台

https://github.com/apache/rocketmq-dashboard

java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar

2.2 rocketmq

2.2.1 生产者与消费者

1)生产者

public class Send {
    public static void main(String[] args) throws Exception{

        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("producer-demo");
        //连接nameserver
        defaultMQProducer.setNamesrvAddr("192.168.67.5:9876");
        //启动生产者
        defaultMQProducer.start();
        //设置消息发送的目的地
        String topic = "producer-demo";
        //发送消息
        for(int i=0 ; i<10 ; i++){
            Message msg = new Message(topic,("rocketmq消息 " + i).getBytes(StandardCharsets.UTF_8));
	  //发送到broker
            SendResult send = defaultMQProducer.send(msg);
            System.out.println(i + "发送状态:" + send.getSendStatus());
        }
        //关闭资源
        defaultMQProducer.shutdown();
    }

2)消费者

public class Custom {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumer01");
        defaultMQPushConsumer.setNamesrvAddr("192.168.67.5:9876");
        //设置订阅主题
        defaultMQPushConsumer.subscribe("producer-demo","*");
        //设置消息监听器
        defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    String s = new String(messageExt.getBody(), StandardCharsets.UTF_8);
                    System.out.println("thread: " + Thread.currentThread() + ",content: " + s );
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        defaultMQPushConsumer.start();
    }
}

2.2.2 消息消费模式

(1)集群模式

一个ConsumerGroup中的Consumer实例平均分摊消费消息。

(2)广播模式

一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次。

2.3 SpringBoot配置

2.3.1 生产者

2.3.2 消费者

@Component
@RocketMQMessageListener(consumerGroup = "tConBootGroup",topic = "TestTopic")
public class TestTopicListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        String mess = new String(messageExt.getBody(), Charset.defaultCharset());
        System.out.println("接受消息:" + mess);
    }
}

2.3.3 异步消息发送

2.3.4 负载均衡测试

2.3.5 顺序消息发送

2.3.6 TAG过滤

2.3.7 SQL92过滤

@RocketMQMessageListener(consumerGroup = "tConBootGroup",topic = "TestTopic",selectorType = SelectorType.SQL92,selectorExpression = "age >= 96 and grade >95")

3 ActiveMQ

ActiveMQ主要有两种模式的消息,分别是Queue(队列)和Topic(订阅)。

  • Queue,每个消费者消费的数据不一样,一个消息只能被一个消费者消费
  • Topic,每个消费者消费的数据是一样的

3.1 安装

#1.解压
tar -zxvf apache-activemq-5.12.0-bin.tar.gz

#2.启动
cd apache-activemq-5.12/bin
./activemq start

#3.停止
./activemq stop

#4.查看状态
./activemq status

ActiveMQ有两个核心端口分别是8161(后台系统的端口),61616(客户端通讯端口)

启动完ActiveMQ之后,可以在浏览器登录后台系统:

http://ip地址:8161,账号和密码都是admin

3.2 消息提供者

第一步:导入坐标

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

第二步:配置文件

spring.activemq.broker-url=tcp://192.168.1.8:61616
spring.activemq.password=admin
spring.activemq.user=admin

第三步:发送Queue消息

@Component
public class Producer {  
	@Autowired
	private JmsMessagingTemplate jmt;
   
	public void send(String content) {
        //注意:项目启动的时候,会在后台自动创建queue名称
		jmt.convertAndSend(new ActiveMQQueue("队列名称"),content);
	}   
}

第四步:发送Topic消息

@Component
public class Producer {
	@Autowired
	private JmsMessagingTemplate jmt;
   
	public void send(String content) {
        //注意:项目启动的时候,会在后台自动创建topic名称
		jmt.convertAndSend(new ActiveMQTopic("sample.topic"),content);
	}
}

提示:SpringBoot默认发送Queue消息

#如果为True,则是Topic;如果是false或者默认,则是queue
spring.jms.pub-sub-domain=true

3.3 消息消费者

第一步:导入坐标

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

第二步:配置文件

spring.activemq.broker-url=tcp://192.168.1.8:61616
spring.activemq.password=admin
spring.activemq.user=admin

第三步:监听消息

@Component
public class Consumer {
    @JmsListener(destination = "队列名称")
    public void receiveQueue(String text) {
        System.out.println(text);
    }
}

提示:SpringBoot可以同时监听Queue和Topic消息,不需要任何配置

注意:

消息队列中间件宕机了,重启之后会不会丢失数据呢?

  • 首先,ActiveMQ默认有三种持久化方式,分别是内存持久化、日志持久化和JDBC持久化,内存持久化是不安全的;日志和JDBC则是安全的;ActiveMQ默认是日志持久化。

  • Queue重启之后不会丢失数据,直到消费被消费成功才被删除;Topic无法再消费之前的消息,所以重启会丢失。

消息队列会不会出现重复消息的问题?什么情况会导致重复消费?

  • 正常情况下,多个消费端监听同一个queue,同一条消息不会被多个消费端同时消费,只有一个消费端能消费成功。

  • 但是,如果由于网络原因,消息队列可能无法及时收到消费端的应答,会重发消息,这样的话消费端相当于消费了两次。

  • 解决方案:消费端每次接受消息的时候,先根据msgId去Redis查询是否存在;如果存在则不处理;如果不存在,则把消息ID存储到Redis。

什么是死信队列?它有什么作用?

  • 消息被消费失败的时候,则消息队列默认重发6次,如果还是失败则把该消息加入到死信队列里面;或者消息过期也会被加入死信队列。
  • 默认的私信队列是ActiveMQ.DLQ,一般为了方便管理,针对每个队列都创建一个相应的死信队列。
  • 为了数据的一致性,可以对死信队列进行监听,进行数据的补偿处理
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐