前言

最近公司在使用MQ消息队列,作为一个强大的消息存储中间件,我主要用于远程数据传送和回调,消息队列在互联网技术存储方面使用如此广泛,对于面试后端技术的面试官都会在消息队列的使用和原理方面无死角的盘问,于是我想根据个人的看法写了这篇面试必备的消息队列。

在实际开发过程中,我们使用比较高的消息队列中间件有哪些?而我在工作当中所用过的消息队列中间件主要有 RocketMQ、Kafka和RabbitMQ,而我主要讲的是RocketMQ消息队列中间件,因为公司使用的都是阿里系的技术框架组件等等,而且RocketMQ也是经过双十一的验证的,还是有点牛逼的哈。

正文

RocketMQ简介

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

如果需要看RocketMQ的源码:https://github.com/apache/rocketmq

在这里插入图片描述
在这里插入图片描述在这里插入图片描述

下面解释一下核心模块功能的作用:

broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息
client:包含producer端和consumer端,发送消息和接收消息的过程。
namesrv:NameServer,类似SOA服务的注册中心,这里保存着消息的TopicName,队列等运行时的meta信息。一般系统分dataNode和nameNode,这里是nameNode。
common:通用的常量枚举、基类方法或者数据结构,按描述的目标来分包通俗易懂。包名有:admin,consumer,filter,hook,message等。
remoting:用Netty4写的客户端和服务端,fastjson做的序列化,自定义二进制协议。
store:消息、索引存储等。
filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!(一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件)。
rocketmq-tools:命令行工具。

在这里插入图片描述

RocketMq的组成部分:
NameServer、Producer、Consumer和Broker的四大核心功能。从流程图中我们可以看出RocketMQ都是集群部署的,这是他吞吐量大,高可用的原因之一,支持单 master 模式、多master模式、多master多slave异步复制模式、多 master多slave同步双写模式,下面解释一下这几种模式:

单 master 模式: 也就是只有一个 master 节点,如果master节点挂掉了,会导致整个服务不可用,不宜生产环境使用,只适合个人学习使用。

多 master 模式:集群中不存在Slave节点,集群中全部节点都是Master节点。优势是配置简单,单个Master宕机或重启对应用没有影响。可是若是某个节点的Master宕机以后,该节点上未被消费的消息在节点恢复以前没法订阅和消费,实时性受到影响。

多 master 多 slave 异步复制模式在多master模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备模式,在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样,但是使用异步复制的同步方式有可能会有消息丢失的问题。

多 master 多 slave 同步双写模式:同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。
优点:同步双写的同步模式能保证数据不丢失。
缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。
刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)
注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。

下面讲解各部分组成结构功能:

Name Server:

一个几乎无状态节点的注册中心,比zookeeper更轻量级,更好用,可集群部署,节点之间无任务信息同步,负责维护Producer和Consumer的配置信息、状态信息,并且协调各个角色的协同执行。

NameServer的作用主要是维持心跳和提供Topic和Broker的关系数据,它的流程是Broker向 NameServer发心跳时, 会带上当前自己所负责的所有Topic信息,如果Topic个数太多,会导致一次心跳中,就Topic的数据就几十M,网络情况差的话, 网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败,每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。

RocketMQ 以前使用的是ZooKeeper ,后来改为了自己实现的NameServer,这个后面讲为什么不用了。

Producer:

Producer消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟.

Producer 发送消息,RocketMQ 提供了三种模式。

同步发送:

Producer 向 Broker发送消息,阻塞当前线程等待 Broker发回响应之后才发下一个数据包。一般用于重要通知消息

异步发送:

Producer 首先构建一个向Broker发送消息的任务,把该任务提交给线程池,不等Broker发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景。

OneWay 发送:

单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景。

Producer如果发送消息呢?比如Producer轮询某Topic下的所有队列的方式来实现发送方的负载均衡

在这里插入图片描述

Broker:

Broker主要负责消息的存储、投递和查询以及服务高可用保证
实现Broker的五大模块:在这里插入图片描述
1、Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
2、Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
3、Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
4、HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
5、Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
这几个模块知道就好,就不分析了,如果小伙伴们想的话可以自己动手去了解一下。

Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,Broker负责消息存储,支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。而且Broker底层的通信和连接都是基于Netty实现的。

Consumer:

Consumer与NameServer集群中的其中一个节点建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

我们知道RocketMQ在消费端有push和pull两种模式,pull模式需要我们手动调用Consumer拉消息,而push模式则只需要我们提供一个listener即可实现对消息的监听,实际RocketMQ的push模式是基于pull模式实现的,它没有实现真正的push,两种模式如下:

pull:主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程。

push:封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。

订阅消息

在这里插入图片描述

接下来是RocketMQ的消息领域模型图:

RocketMQ的消息领域模型图

Message:

一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址,一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。

Topic:

Topic(主题)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。

Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有N个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。

一个 Topic 也可以被N个消费者订阅。

Tag:

可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。

Group:

分组,一个组可以订阅多个Topic,分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的

Queue:

在Kafka中叫Partition,每个Queue内部是有序的,在RocketMQ中分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。

Message Queue:

Message Queue(消息队列),主题被划分为一个或多个子主题,即消息队列。

一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。

消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。

Offset:

在RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset 来访问,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。
也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。

结合部署架构图,完整的工作流程如下:

启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

RocketMQ的应用场景
1.削峰填谷:

用户发起大量的请求到需要进行秒杀商品的业务处理系统,秒杀业务的处理系统按照逻辑处理将满足秒杀条件的请求发送至消息队列 RocketMQ,下游的通知系统订阅消息队列 RocketMQ 的秒杀相关消息。再将秒杀成功的消息发送到相应用户,用户收到秒杀成功的通知。

2.异步解耦:

通过上、下游业务系统的松耦合设计,比如:交易系统的下游子系统(如积分等)出现不可用甚至宕机,都不会影响到核心交易系统的正常运转。

3.顺序消息:

与FIFO原理类似,MQ提供的顺序消息即保证消息的先进先出,可以应用于交易系统中的订单创建、支付、退款等流程。

4.分布式事务消息:

比如阿里的交易系统、支付红包等场景需要确保数据的最终一致性,需要引入 MQ 的分布式事务,既实现了系统之间的解耦,又可以保证最终的数据一致性。

RocketMQ为什么不使用ZooKeeper?

首先,讲解RocketMQ为什么不使用ZooKeeper之前需要有个参照的对象,而KafKa就是使用ZookKeeper,那RocketMQ和KafKa有什么不同呢,为什么RocketMQ不使用ZooKeeper。而KafKa使用呢。下面我们就来简单的说一下,看图通俗易懂就好了。

KafKa架构图:

在这里插入图片描述
在Kafka中,是1个topic有多个partition分区,每个partition有1个master或者多个slave,每台机器既是Master,也是Slave。

RocketMQ的架构图

在这里插入图片描述
不同于Kafka的,1台机器可以同时是Master和Slave。在RocketMQ里面,1台机器要么是Master,要么是Slave,在初始配置时候就已经实现了。

非常简单的就是:
1、在KafKa里面,Maser/Slave是选举出来的,而RocketMQ不需要选举。

Kafka:

在Kafka里面,Master/Slave的选举,有2步:
第1步:先通过ZooKeeper在所有机器中,选举出一个KafKaController。
第2步:再由这个Controller,决定每个partition的Master是谁,Slave是谁。
这里的Master/Slave是动态的,也就是说:当Master挂了之后,会有1个Slave切换成Master。

RocketMQ:

RocketMQ中不需要选举,Master/Slave的角色也是固定的。当一个Master挂了之后,你可以写到其他Master上,但不会说一个Slave切换成Master。

最后说一下为了预防服务宕机以后数据的丢失,Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中,有4种解决得方案:

首先不管是同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH, ASYNC_FLUSH中的一个。
刷盘:

同步刷盘:

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻
通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

同步刷盘流程图:
在这里插入图片描述

异步刷盘:

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入。

异步刷盘刷盘流程图,主要有两种方式:
1、异步刷盘未开启堆外缓存
在这里插入图片描述

2、异步刷盘开启堆外缓存
在这里插入图片描述
异步刷盘未开启transientStorePoolEnable时,消息追加到mappedByteBuffer中,异步线程刷调用mappedByteBuffer.force落盘;异步刷盘开启transientStorePoolEnable时,消息写入wrtieBuffer中,异步线程将消息提交到fileChannel,然后异步线程调用fileChannel.force落盘。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐