目录

1、RocketMQ的存储架构

1.1存储特点

1.2为什么要这样设计?

 2、CommitLog文件

 3、ConsumeQueue文件

4、消息存储方式

5、RocketMQ消息存储结构类型及缺点

6、RocketMQ消息存储架构深入分析

7、PageCache与Mmap内存映射

8、RocketMQ文件存储模型层次结构

9、RocketMQ文件存储模型层次结构 

10、总结


1、RocketMQ的存储架构

1.1存储特点

  • 消息主体以及元数据都存储在CommitLog当中
  • Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
  • 每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。

1.2为什么要这样设计?

rocketMQ的设计理念很大程度借鉴了kafka,所以有必要介绍下kafka的存储结构设计:

存储特点:和RocketMQ类似,每个Topic有多个partition(queue),kafka的每个partition都是一个独立的物理文件,消息直接从里面读写。
根据之前阿里中间件团队的测试,一旦kafka中Topic的partitoin数量过多,队列文件会过多,会给磁盘的IO读写造成很大的压力,造成tps迅速下降。所以RocketMQ进行了上述这样设计,consumerQueue中只存储很少的数据,消息主体都是通过CommitLog来进行读写。
那么RocketMQ这样处理有什么优缺点?

  • 优点:1、队列轻量化,单个队列数据量非常少。2、对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。
  • 缺点:写虽然完全是顺序写,但是读却变成了完全的随机读。读一条消息,会先读ConsumeQueue,再读CommitLog,增加了开销。要保证CommitLog与ConsumeQueue完全的一致,增加了编程的复杂度。

以上缺点如何克服:

  • 随机读,尽可能让读命中page cache,减少IO读操作,所以内存越大越好。如果系统中堆积的消息过多,读数据要访问磁盘会不会由于随机读导致系统性能急剧下降,答案是否定的。
  • 访问page cache 时,即使只访问1k的消息,系统也会提前预读出更多数据,在下次读时,就可能命中内存。
  • 随机访问Commit Log磁盘数据,系统IO调度算法设置为NOOP方式,会在一定程度上将完全的随机读变成顺序跳跃方式,而顺序跳跃方式读较完全的随机读性能会高5倍以上。
  • 另外4k的消息在完全随机访问情况下,仍然可以达到8K次每秒以上的读性能。
  • 由于Consume Queue存储数据量极少,而且是顺序读,在PAGECACHE预读作用下,Consume Queue的读性能几乎与内存一致,即使堆积情况下。所以可认为Consume Queue完全不会阻碍读性能。
  • Commit Log中存储了所有的元信息,包含消息体,类似于Mysql、Oracle的redolog,所以只要有Commit Log在,Consume Queue即使数据丢失,仍然可以恢复出来。

 2、CommitLog文件

要想知道RocketMQ如何存储消息,我们先看看CommitLog。在RocketMQ中,所有topic的消息都存储在一个称为CommitLog的文件中,该文件默认最大为1GB,超过1GB后会轮到下一个CommitLog文件。通过CommitLog,RocketMQ将所有消息存储在一起,以顺序IO的方式写入磁盘,充分利用了磁盘顺序写减少了IO争用提高数据存储的性能,消息在CommitLog中的存储格式如下:

  • 4字节表示消息的长度,消息的长度是整个消息体所占用的字节数的大小
  • 4字节的魔数,是固定值,有MESSAGE_MAGIC_CODE和BLANK_MAGIC_CODE
  • 4字节的CRC,是消息体的校验码,用于防止网络、硬件等故障导致数据与发送时不一样带来的问题
  • 4字节的queueId,表示消息发到了哪个MessageQueue(逻辑上相当于kafka的partition)
  • 4字节的flag,flag是创建Message对象时由生产者通过构造器设定的flag值
  • 8字节的queueOffset,表示在queue中的偏移量
  • 8字节的physicalPosition,表示在存储文件中的偏移量
  • 4字节sysFlag,是生产者相关的信息标识,具体生产逻辑可以看相关代码
  • 8字节消息创建时间
  • 8字节消息生产者的host
  • 8字节消息存储时间
  • 8字节消息存储的机器的host
  • 4字节表示重复消费次数
  • 8字节消息事务相关偏移量
  • 4字节表示消息体的长度
  • 消息休,不是固定长度,和前面的4字节的消息体长度值相等
  • 1字节表示topic的长度,因此topc的长度最多不能超过127个字节,超过的话存储会出错(有前置校验)
  • Topic,存储topic,因为topic不是固定长度,所以这里所占的字节是不固定的,和前一个表示topic长度的字节的值相等
  • 2字节properties的长度,properties是创建消息时添加到消息中的,因此,添加在消息中的poperties不能太多太大,所有的properties的kv对在拼接成string后,所占的字节数不能超过2^15-1
  • Properties的内容,也不是固定长度,和前面的2字节properties长度的值相同

 3、ConsumeQueue文件

一个ConsumeQueue表示一个topic的一个queue,类似于kafka的一个partition,但是rocketmq在消息存储上与kafka有着非常大的不同,RocketMQ的ConsumeQueue中不存储具体的消息,具体的消息由CommitLog存储,ConsumeQueue中只存储路由到该queue中的消息在CommitLog中的offset,消息的大小以及消息所属的tag的hash(tagCode),一共只占20个字节,整个数据包如下:

4、消息存储方式

前文已经描述过,RocketMQ的消息存储由CommitLog和ConsumeQueue两部分组成,其中CommitLog用于存储原始的消息,而ConsumeQueue用于存储投递到某一个queue中的消息的位置信息,消息的存储如下图所示:

消费者在读取消息时,先读取ConsumeQueue,再通过ConsumeQueue中的位置信息读取CommitLog,得到原始的消息。

5、RocketMQ消息存储结构类型及缺点

上图即为RocketMQ的消息存储整体架构,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。而Kafka采用的是独立型的存储结构,每个队列一个文件。这里小编认为,RocketMQ采用混合型存储结构的缺点在于,会存在较多的随机读操作,因此读的效率偏低。同时消费消息需要依赖ConsumeQueue,构建该逻辑消费队列需要一定开销。

6、RocketMQ消息存储架构深入分析

从上面的整体架构图中可见,RocketMQ的混合型存储结构针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息,至于消费的时间可以稍微滞后一些也没有太大的关系。退一步地讲,即使Consumer端第一次没法拉取到待消费的消息,Broker服务端也能够通过长轮询机制等待一定时间延迟后再次发起拉取消息的请求。
这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据(ps:对于该服务线程在消息消费篇幅也有过介绍,不清楚的童鞋可以跳至消息消费篇幅再理解下)。然后,Consumer即可根据ConsumerQueue来查找待消费的消息了。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。而IndexFile(索引文件)则只是为了消息查询提供了一种通过key或时间区间来查询消息的方法(ps:这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程)。

7、PageCache与Mmap内存映射

这里有必要先稍微简单地介绍下page cache的概念。系统的所有文件I/O请求,操作系统都是通过page cache机制实现的。对于操作系统来说,磁盘文件都是由一系列的数据块顺序组成,数据块的大小由操作系统本身而决定,x86的linux中一个标准页面大小是4KB。
操作系统内核在处理文件I/O请求时,首先到page cache中查找(page cache中的每一个数据块都设置了文件以及偏移量地址信息),如果未命中,则启动磁盘I/O,将磁盘文件中的数据块加载到page cache中的一个空闲块,然后再copy到用户缓冲区中。
page cache本身也会对数据文件进行预读取,对于每个文件的第一个读请求操作,系统在读入所请求页面的同时会读入紧随其后的少数几个页面。因此,想要提高page cache的命中率(尽量让访问的页在物理内存中),从硬件的角度来说肯定是物理内存越大越好。从操作系统层面来说,访问page cache时,即使只访问1k的消息,系统也会提前预读取更多的数据,在下次读取消息时, 就很可能可以命中内存。
在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue的读性能会比较高近乎内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Noop”(此时块存储采用SSD的话),随机读的性能也会有所提升。
另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型直接将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了)。

8、RocketMQ文件存储模型层次结构

9、RocketMQ文件存储模型层次结构 

RocketMQ文件存储模型层次结构如上图所示,根据类别和作用从概念模型上大致可以划分为5层,下面将从各个层次分别进行分析和阐述:

  1. RocketMQ业务处理器层:Broker端对消息进行读取和写入的业务逻辑入口,这一层主要包含了业务逻辑相关处理操作(根据解析RemotingCommand中的RequestCode来区分具体的业务操作类型,进而执行不同的业务处理流程),比如前置的检查和校验步骤、构造MessageExtBrokerInner对象、decode反序列化、构造Response返回对象等;
  2. RocketMQ数据存储组件层;该层主要是RocketMQ的存储核心类—DefaultMessageStore,其为RocketMQ消息数据文件的访问入口,通过该类的“putMessage()”和“getMessage()”方法完成对CommitLog消息存储的日志数据文件进行读写操作(具体的读写访问操作还是依赖下一层中CommitLog对象模型提供的方法);另外,在该组件初始化时候,还会启动很多存储相关的后台服务线程,包括AllocateMappedFileService(MappedFile预分配服务线程)、ReputMessageService(回放存储消息服务线程)、HAService(Broker主从同步高可用服务线程)、StoreStatsService(消息存储统计服务线程)、IndexService(索引文件服务线程)等;
  3. RocketMQ存储逻辑对象层:该层主要包含了RocketMQ数据文件存储直接相关的三个模型类IndexFile、ConsumerQueue和CommitLog。IndexFile为索引数据文件提供访问服务,ConsumerQueue为逻辑消息队列提供访问服务,CommitLog则为消息存储的日志数据文件提供访问服务。这三个模型类也是构成了RocketMQ存储层的整体结构(对于这三个模型类的深入分析将放在后续篇幅中);
  4. 封装的文件内存映射层:RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel两种方式完成数据文件的读写。其中,采用MappedByteBuffer这种内存映射磁盘文件的方式完成对大文件的读写,在RocketMQ中将该类封装成MappedFile类。这里限制的问题在上面已经讲过;对于每类大文件(IndexFile/ConsumerQueue/CommitLog),在存储时分隔成多个固定大小的文件(单个IndexFile文件大小约为400M、单个ConsumerQueue文件大小约5.72M、单个CommitLog文件大小为1G),其中每个分隔文件的文件名为前面所有文件的字节大小数+1,即为文件的起始偏移量,从而实现了整个大文件的串联。这里,每一种类的单个文件均由MappedFile类提供读写操作服务(其中,MappedFile类提供了顺序写/随机读、内存数据刷盘、内存清理等和文件相关的服务);
  5. 磁盘存储层:主要指的是部署RocketMQ服务器所用的磁盘。这里,需要考虑不同磁盘类型(如SSD或者普通的HDD)特性以及磁盘的性能参数(如IOPS、吞吐量和访问时延等指标)对顺序写/随机读操作带来的影响(ps:小编建议在正式业务上线之前做好多轮的性能压测,具体用压测的结果来评测);


10、总结


RocketMQ消息存储部分的内容与其他所有篇幅(RocketMQ的Remoting通信、普通消息发送和消息消费部分)相比是最为复杂的,需要读者反复多看源码并多次对消息读和写进行Debug(可以通过在Broker端的SendMessageProcessor/PullMessageProcesssor/QueryMessaageProcessor几个业务处理器入口,在其重要方法中打印相关重要属性值的方式或者一步步地Debug代码,来仔细研究下其中的存储过程),反复几次后才可以对消息存储这部分有一个较为深刻的理解,同时也有助于提高对RocketMQ的整体理解。 

 原文:

RocketMQ消息存储原理总结(一)

RocketMQ架构原理解析(一):整体架构

RocketMQ架构原理解析(二):消息存储

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐