我们知道Broker 是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。因此,RocketMQ的所有消息数据都是存放在Broker上的。本文讲解Broker存储机制。
欢迎关注《Apache RocketMQ 深入浅出》系列文章,架构师将循序渐进地讲解Apache RocketMQ的开发实践。
1. Apache RocketMQ 入门介绍和整体架构图
2. 介绍新版RocketMQ v4.9.3 下载、安装、配置的完成过程
3. 启动和停止RocketMQ服务进程、测试消息的发送和消费
4. Spring Boot集成RocketMQ :生产者和消费者开发入门实践
5. RocketMQ 可视化管理界面Dashboard的搭建和使用
6. 了解Apache RocketMQ 中的消息类型和消费模式
7. Spring Boot 集成RocketMQ:使用rocketmq-spring-boot-starter 生产和消费消息
8. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送同步、异步和单向消息
9. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送顺序消息
10. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送延时消息
11. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送事务消息(1)
12. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送事务消息(2)
13. RocketMQ 消费模式:集群消费模式和广播消费模式
14. RocketMQ消费端Push和Pull两种消费方式:拉模式开发示例
15. RocketMQ消费端Push和Pull两种消费方式:推模式开发示例
16. RocketMQ中高级特性-消息过滤和标签Tag开发实战(1)
17. RocketMQ中高级特性-消息过滤和标签Tag开发实战(2)
18. RocketMQ 中MessageExt 详解和开发实战
19. RocketMQ 消息重试机制开发实战
.....
一、Broker存储目录结构Broker 在安装启动后会自动生成若干存储文件,如图所示:
commitlog :文件名是一个20个字符,代表该文件存储的起始偏移量,文件大小通过MappedFileSizeCommitLog配置。
consumequeue :目录中包含该Broker 上所有的Topic 对应的消费队列文件信息。每个消费队列其实是commitlog 的一个索引,用来作为消费者拉取消息,更新点位使用。
index :按照消息key创建的hash索引,文件名是创建时的时间戳命名的。
config :保存了当前Broker中的全部的Topic、订阅关系和消费进度,这些数据会定时的从内存中持久化到磁盘,以便宕机后恢复。
abort :Broker是否异常关闭的标志,正常启动会删除该文件。
checkpoint :broker最后一次正常运行的状态,保存了最后一次刷盘时间,最后一次正确索引的时间。
二、Broker 消息存储架构图我们先看看RocketMQ官方文档中的Broker消息存储架构图,如图所示。
Broker 通过CommitLog、ConsumeQueue、IndexFile 等来组织存储消息。
三、CommitLog、ConsumeQueue、IndexFileCommitLog是消息存放的物理文件,是消息主体以及元数据的存储主体。
每台broker上的commitlog被本机所有的queue共享,不做任何区分。用于存储Producer端写入的消息主体内容,消息内容不是定长的,文件顺序写,随机读。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。
消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
其实CommitLog 只有一个文件,为了方便保存和读写被切分为多个子文件。
ConsumeQueue是消息的逻辑消费队列,相当于字典的目录,引入的目的主要是提高消息消费的性能。由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的,Consumer即可根据ConsumeQueue来查找待消费的消息。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,每个topic下的每个queue都有一个对应的consumequeue文件。
具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
consumequeue文件存储单元格式:
- CommitLogOffset:是指这条消息在Commit Log文件中的起始物理偏移量。
- msgSize:存储中消息的大小。
- tagsCode:消息Tag的HashCode值。主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)。
同时,consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。
IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法,如果一个消息包含key值的话,会使用IndexFile存储消息索引。Index文件的存储位置是:$HOME \store\index${fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引。
如下图所示为IndexFile文件结构:
索引文件根据key查找对应消息主要流程:
- 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
- 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)
- 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
内存映射机制与高效写磁盘。RocketMQ 在存储设计中通过内存映射、顺序写文件等方式实现了高吞吐。
RocketMQ 的基本数据结构:
org.apache.rocketmq.store.CommitLog:RocketMQ 对存储消息的物理文件的抽象实现,也就是物理 CommitLog 文件的具体实现。
org.apache.rocketmq.store.MappedFile:CommitLog 文件在内存中的映射文件,映射文件同时具有内存的写入速度和磁盘一样可靠的持久化方式。
org.apache.rocketmq.store.MappedFileQueue:映射文件队列中有全部的 CommitLog 映射文件,第一个映射文件为最先过期的文件,最后一个文件是最后过期的文件,最新的消息总是写入最后一个映射文件中。
CommitLog、MappedFile、MappedFileQueue 与物理 CommitLog 文件的关系如下:
每个 MappedFileQueue 包含多个 MappedFile,就是真实的物理 CommitLog文件,Java 通过 java.nio.MappedByteBuffer 来实现文件的内存映射,即文件读写都是通过 MappedByteBuffer(其实是 Page Cache)来操作的。
MappedByteBuffer 实现了零拷贝技术,即Java 进程映射到内核态内存,原来内核态内存与用户态内存的互相拷贝过程就消失了。
在消息系统中,用户关心的往往都是最新的数据。理论上,基本的操作都在Page Cache 中,Page Cache的操作速度和内存基本持平,所以速度非常快。
五、文件刷盘机制RocketMQ 首先将消息数据写入到操作系统 Page Cache,然后定时将数据刷入磁盘,也就是刷盘。
RocketMQ 支持2种刷盘方式,在 Broker 启动时:
- 配置 flushDiskType = SYNC_FLUSH 表示同步刷盘
- 配置 flushDiskType = ASYNC_FLUSH 表示异步刷盘
同步刷盘服务:在 Broker 存储消息到 Page Cache 后,同步将 Page Cache 刷到磁盘,再返回客户端消息并写入结果,具体过程如下所示:
异步刷盘服务:在 Broker 存储消息到 Page Cache 后,立即返回客户端写入结果,然后异步刷盘服务将 Page Cache 异步刷盘到磁盘。
,