我们知道Broker 是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。因此,RocketMQ的所有消息数据都是存放在Broker上的。本文讲解Broker存储机制。

rocketmq事务消息的应用场景(Broker中消息存储机制)(1)

欢迎关注《Apache RocketMQ 深入浅出》系列文章,架构师将循序渐进地讲解Apache RocketMQ的开发实践。

1. Apache RocketMQ 入门介绍和整体架构图

2. 介绍新版RocketMQ v4.9.3 下载、安装、配置的完成过程

3. 启动和停止RocketMQ服务进程、测试消息的发送和消费

4. Spring Boot集成RocketMQ :生产者和消费者开发入门实践

5. RocketMQ 可视化管理界面Dashboard的搭建和使用

6. 了解Apache RocketMQ 中的消息类型和消费模式

7. Spring Boot 集成RocketMQ:使用rocketmq-spring-boot-starter 生产和消费消息

8. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送同步、异步和单向消息

9. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送顺序消息

10. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送延时消息

11. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送事务消息(1)

12. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送事务消息(2)

13. RocketMQ 消费模式:集群消费模式和广播消费模式

14. RocketMQ消费端Push和Pull两种消费方式:拉模式开发示例

15. RocketMQ消费端Push和Pull两种消费方式:推模式开发示例

16. RocketMQ中高级特性-消息过滤和标签Tag开发实战(1)

17. RocketMQ中高级特性-消息过滤和标签Tag开发实战(2)

18. RocketMQ 中MessageExt 详解和开发实战

19. RocketMQ 消息重试机制开发实战

.....

一、Broker存储目录结构

Broker 在安装启动后会自动生成若干存储文件,如图所示:

rocketmq事务消息的应用场景(Broker中消息存储机制)(2)

commitlog :文件名是一个20个字符,代表该文件存储的起始偏移量,文件大小通过MappedFileSizeCommitLog配置。

consumequeue :目录中包含该Broker 上所有的Topic 对应的消费队列文件信息。每个消费队列其实是commitlog 的一个索引,用来作为消费者拉取消息,更新点位使用。

index :按照消息key创建的hash索引,文件名是创建时的时间戳命名的。

config :保存了当前Broker中的全部的Topic、订阅关系和消费进度,这些数据会定时的从内存中持久化到磁盘,以便宕机后恢复。

abort :Broker是否异常关闭的标志,正常启动会删除该文件。

checkpoint :broker最后一次正常运行的状态,保存了最后一次刷盘时间,最后一次正确索引的时间。

二、Broker 消息存储架构图

我们先看看RocketMQ官方文档中的Broker消息存储架构图,如图所示。

rocketmq事务消息的应用场景(Broker中消息存储机制)(3)

Broker 通过CommitLog、ConsumeQueue、IndexFile 等来组织存储消息。

三、CommitLog、ConsumeQueue、IndexFile

CommitLog是消息存放的物理文件,是消息主体以及元数据的存储主体。

每台broker上的commitlog被本机所有的queue共享,不做任何区分。用于存储Producer端写入的消息主体内容,消息内容不是定长的,文件顺序写,随机读。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。

消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

rocketmq事务消息的应用场景(Broker中消息存储机制)(4)

其实CommitLog 只有一个文件,为了方便保存和读写被切分为多个子文件。

ConsumeQueue是消息的逻辑消费队列,相当于字典的目录,引入的目的主要是提高消息消费的性能。由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的,Consumer即可根据ConsumeQueue来查找待消费的消息。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,每个topic下的每个queue都有一个对应的consumequeue文件。

具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

rocketmq事务消息的应用场景(Broker中消息存储机制)(5)

consumequeue文件存储单元格式:

同时,consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。

IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法,如果一个消息包含key值的话,会使用IndexFile存储消息索引。Index文件的存储位置是:$HOME \store\index${fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引。

如下图所示为IndexFile文件结构:

rocketmq事务消息的应用场景(Broker中消息存储机制)(6)

索引文件根据key查找对应消息主要流程:

四、Broker如何保证高效存储?

内存映射机制与高效写磁盘。RocketMQ 在存储设计中通过内存映射、顺序写文件等方式实现了高吞吐。

RocketMQ 的基本数据结构:

org.apache.rocketmq.store.CommitLog:RocketMQ 对存储消息的物理文件的抽象实现,也就是物理 CommitLog 文件的具体实现。

org.apache.rocketmq.store.MappedFile:CommitLog 文件在内存中的映射文件,映射文件同时具有内存的写入速度和磁盘一样可靠的持久化方式。

org.apache.rocketmq.store.MappedFileQueue:映射文件队列中有全部的 CommitLog 映射文件,第一个映射文件为最先过期的文件,最后一个文件是最后过期的文件,最新的消息总是写入最后一个映射文件中。

CommitLog、MappedFile、MappedFileQueue 与物理 CommitLog 文件的关系如下:

rocketmq事务消息的应用场景(Broker中消息存储机制)(7)

每个 MappedFileQueue 包含多个 MappedFile,就是真实的物理 CommitLog文件,Java 通过 java.nio.MappedByteBuffer 来实现文件的内存映射,即文件读写都是通过 MappedByteBuffer(其实是 Page Cache)来操作的。

MappedByteBuffer 实现了零拷贝技术,即Java 进程映射到内核态内存,原来内核态内存与用户态内存的互相拷贝过程就消失了。

在消息系统中,用户关心的往往都是最新的数据。理论上,基本的操作都在Page Cache 中,Page Cache的操作速度和内存基本持平,所以速度非常快。

五、文件刷盘机制

RocketMQ 首先将消息数据写入到操作系统 Page Cache,然后定时将数据刷入磁盘,也就是刷盘。

RocketMQ 支持2种刷盘方式,在 Broker 启动时:

同步刷盘服务:在 Broker 存储消息到 Page Cache 后,同步将 Page Cache 刷到磁盘,再返回客户端消息并写入结果,具体过程如下所示:

rocketmq事务消息的应用场景(Broker中消息存储机制)(8)

异步刷盘服务:在 Broker 存储消息到 Page Cache 后,立即返回客户端写入结果,然后异步刷盘服务将 Page Cache 异步刷盘到磁盘。

rocketmq事务消息的应用场景(Broker中消息存储机制)(9)

,