消息中间件—RocketMQ 消息存储(一)


本文地址:http://www.6aiq.com/article/1563130479801
知乎专栏 点击关注
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出

文章摘要:MQ 分布式消息队列大致流程在于消息的一发一收一存,本篇将为大家主要介绍下 RocketMQ 存储部分的架构

消息存储是 MQ 消息队列中最为复杂和最为重要的一部分,所以小编也就放在 RocketMQ 系列篇幅中最后一部分来进行阐述和介绍。本文先从目前几种比较常用的 MQ 消息队列存储方式出发,为大家介绍 RocketMQ 选择磁盘文件存储的原因。然后,本文分别从 RocketMQ 的消息存储整体架构和 RocketMQ 文件存储模型层次结构两方面进行深入分析介绍。使得大家读完本文后对 RocketMQ 消息存储部分有一个大致的了解和认识。

这里先回顾往期 RocketMQ 技术分享的篇幅(如果有童鞋没有读过之前的文章,建议先好好读下之前小编写的篇幅或者其他网上相关的博客,把 RocketMQ 消息发送和消费部分的流程先大致搞明白):

消息中间件—RocketMQ 的 RPC 通信(一)
消息中间件—RocketMQ 的 RPC 通信(二)
消息中间件—RocketMQ 消息发送
消息中间件—RocketMQ 消息消费(一)
消息中间件—RocketMQ 消息消费(二)(push 模式实现)
消息中间件—RocketMQ 消息消费(三)(消息消费重试)

一、MQ 消息队列的一般存储方式

当前业界几款主流的 MQ 消息队列采用的存储方式主要有以下三种方式:

**(1)分布式 KV 存储:** 这类 MQ 一般会采用诸如 levelDB、RocksDB 和 Redis 来作为消息持久化的方式,由于分布式缓存的读写能力要优于 DB,所以在对消息的读写能力要求都不是比较高的情况下,采用这种方式倒也不失为一种可以替代的设计方案。消息存储于分布式 KV 需要解决的问题在于如何保证 MQ 整体的可靠性?

**(2)文件系统:** 目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机 / 物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。小编认为,消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署 MQ 机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

**(3)关系型数据库 DB:**Apache 下开源的另外一款 MQ—ActiveMQ(默认采用的 KahaDB 做消息存储)可选用 JDBC 的方式来做消息持久化,通过简单的 xml 配置信息即可实现 JDBC 消息存储。由于,普通关系型数据库(如 Mysql)在单表数据量达到千万级别的情况下,其 IO 读写性能往往会出现瓶颈。因此,如果要选型或者自研一款性能强劲、吞吐量大、消息堆积能力突出的 MQ 消息队列,那么小编并不推荐采用关系型数据库作为消息持久化的方案。在可靠性方面,该种方案非常依赖 DB,如果一旦 DB 出现故障,则 MQ 的消息就无法落盘存储会导致线上故障;

因此,综合上所述从存储效率来说, 文件系统 > 分布式 KV 存储 > 关系型数据库 DB,直接操作文件系统肯定是最快和最高效的,而关系型数据库 TPS 一般相比于分布式 KV 系统会更低一些(简略地说,关系型数据库本身也是一个需要读写文件 server,这时 MQ 作为 client 与其建立连接并发送待持久化的消息数据,同时又需要依赖 DB 的事务等,这一系列操作都比较消耗性能),所以如果追求高效的 IO 读写,那么选择操作文件系统会更加合适一些。但是如果从易于实现和快速集成来看,关系型数据库 DB> 分布式 KV 存储 > 文件系统,但是性能会下降很多。

另外,从消息中间件的本身定义来考虑,应该尽量减少对于外部第三方中间件的依赖。一般来说依赖的外部系统越多,也会使得本身的设计越复杂,所以小编个人的理解是采用文件系统作为消息存储的方式,更贴近消息中间件本身的定义。

二、RocketMQ 消息存储整体架构

0907c230e4e34bd79f2b94453836db30.png
RokcetMQ 存储设计架构.jpg

(1)RocketMQ 消息存储结构类型及缺点

上图即为 RocketMQ 的消息存储整体架构,RocketMQ 采用的是混合型的存储结构,即为 Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog)来存储。而 Kafka 采用的是独立型的存储结构,每个队列一个文件。这里小编认为,RocketMQ 采用混合型存储结构的缺点在于,会存在较多的随机读操作,因此读的效率偏低。同时消费消息需要依赖 ConsumeQueue,构建该逻辑消费队列需要一定开销。

(2)RocketMQ 消息存储架构深入分析

从上面的整体架构图中可见,RocketMQ 的混合型存储结构针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构,Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中。只要消息被刷盘持久化至磁盘文件 CommitLog 中,那么 Producer 发送的消息就不会丢失。正因为如此,Consumer 也就肯定有机会去消费这条消息,至于消费的时间可以稍微滞后一些也没有太大的关系。退一步地讲,即使 Consumer 端第一次没法拉取到待消费的消息,Broker 服务端也能够通过长轮询机制等待一定时间延迟后再次发起拉取消息的请求。



这里,RocketMQ 的具体做法是,使用 Broker 端的后台服务线程—ReputMessageService 不停地分发请求并异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引文件)数据(ps:对于该服务线程在消息消费篇幅也有过介绍,不清楚的童鞋可以跳至消息消费篇幅再理解下)。然后,Consumer 即可根据 ConsumerQueue 来查找待消费的消息了。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。而 IndexFile(索引文件)则只是为了消息查询提供了一种通过 key 或时间区间来查询消息的方法(ps:这种通过 IndexFile 来查找消息的方法不影响发送与消费消息的主流程)。

(3)PageCache 与 Mmap 内存映射

这里有必要先稍微简单地介绍下 page cache 的概念。系统的所有文件 I/O 请求,操作系统都是通过 page cache 机制实现的。对于操作系统来说,磁盘文件都是由一系列的数据块顺序组成,数据块的大小由操作系统本身而决定,x86 的 linux 中一个标准页面大小是 4KB。

操作系统内核在处理文件 I/O 请求时,首先到 page cache 中查找(page cache 中的每一个数据块都设置了文件以及偏移量地址信息),如果未命中,则启动磁盘 I/O,将磁盘文件中的数据块加载到 page cache 中的一个空闲块,然后再 copy 到用户缓冲区中。

page cache 本身也会对数据文件进行预读取,对于每个文件的第一个读请求操作,系统在读入所请求页面的同时会读入紧随其后的少数几个页面。因此,想要提高 page cache 的命中率(尽量让访问的页在物理内存中),从硬件的角度来说肯定是物理内存越大越好。从操作系统层面来说,访问 page cache 时,即使只访问 1k 的消息,系统也会提前预读取更多的数据,在下次读取消息时, 就很可能可以命中内存。

在 RocketMQ 中,ConsumeQueue 逻辑消费队列存储的数据较少,并且是顺序读取,在 page cache 机制的预读取作用下,Consume Queue 的读性能会比较高近乎内存,即使在有消息堆积情况下也不会影响性能。而对于 CommitLog 消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统 IO 调度算法,比如设置调度算法为“Noop”(此时块存储采用 SSD 的话),随机读的性能也会有所提升。

另外,RocketMQ 主要通过 MappedByteBuffer 对文件进行读写操作。其中,利用了 NIO 中的 FileChannel 模型直接将磁盘上的物理文件直接映射到用户态的内存地址中(这种 Mmap 的方式减少了传统 IO 将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(这里需要注意的是,采用 MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是一次只能映射 1.5~2G 的文件至用户态的虚拟内存,这也是为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了)。

三、RocketMQ 文件存储模型层次结构

b5169aef325f4ccd8d3e7f3d05f05d9f.png

RocketMQ 文件存储模型结构.jpg
RocketMQ 文件存储模型层次结构如上图所示,根据类别和作用从概念模型上大致可以划分为 5 层,下面将从各个层次分别进行分析和阐述:

(1)RocketMQ 业务处理器层:Broker 端对消息进行读取和写入的业务逻辑入口,这一层主要包含了业务逻辑相关处理操作(根据解析 RemotingCommand 中的 RequestCode 来区分具体的业务操作类型,进而执行不同的业务处理流程),比如前置的检查和校验步骤、构造 MessageExtBrokerInner 对象、decode 反序列化、构造 Response 返回对象等;

(2)RocketMQ 数据存储组件层;该层主要是 RocketMQ 的存储核心类—DefaultMessageStore,其为 RocketMQ 消息数据文件的访问入口,通过该类的“putMessage()”和“getMessage()”方法完成对 CommitLog 消息存储的日志数据文件进行读写操作(具体的读写访问操作还是依赖下一层中 CommitLog 对象模型提供的方法);另外,在该组件初始化时候,还会启动很多存储相关的后台服务线程,包括 AllocateMappedFileService(MappedFile 预分配服务线程)、ReputMessageService(回放存储消息服务线程)、HAService(Broker 主从同步高可用服务线程)、StoreStatsService(消息存储统计服务线程)、IndexService(索引文件服务线程)等;

(3)RocketMQ 存储逻辑对象层:该层主要包含了 RocketMQ 数据文件存储直接相关的三个模型类 IndexFile、ConsumerQueue 和 CommitLog。IndexFile 为索引数据文件提供访问服务,ConsumerQueue 为逻辑消息队列提供访问服务,CommitLog 则为消息存储的日志数据文件提供访问服务。这三个模型类也是构成了 RocketMQ 存储层的整体结构(对于这三个模型类的深入分析将放在后续篇幅中);

(4)封装的文件内存映射层:RocketMQ 主要采用 JDK NIO 中的 MappedByteBuffer 和 FileChannel 两种方式完成数据文件的读写。其中,采用 MappedByteBuffer 这种内存映射磁盘文件的方式完成对大文件的读写,在 RocketMQ 中将该类封装成 MappedFile 类。这里限制的问题在上面已经讲过;对于每类大文件(IndexFile/ConsumerQueue/CommitLog),在存储时分隔成多个固定大小的文件(单个 IndexFile 文件大小约为 400M、单个 ConsumerQueue 文件大小约 5.72M、单个 CommitLog 文件大小为 1G),其中每个分隔文件的文件名为前面所有文件的字节大小数 +1,即为文件的起始偏移量,从而实现了整个大文件的串联。这里,每一种类的单个文件均由 MappedFile 类提供读写操作服务(其中,MappedFile 类提供了顺序写 / 随机读、内存数据刷盘、内存清理等和文件相关的服务);

(5)磁盘存储层:主要指的是部署 RocketMQ 服务器所用的磁盘。这里,需要考虑不同磁盘类型(如 SSD 或者普通的 HDD)特性以及磁盘的性能参数(如 IOPS、吞吐量和访问时延等指标)对顺序写 / 随机读操作带来的影响(ps:小编建议在正式业务上线之前做好多轮的性能压测,具体用压测的结果来评测);

四、总结

RocketMQ 的 RocketMQ 消息存储(一)篇幅就先分析到这儿了。RocketMQ 消息存储部分的内容与其他所有篇幅(RocketMQ 的 Remoting 通信、普通消息发送和消息消费部分)相比是最为复杂的,需要读者反复多看源码并多次对消息读和写进行 Debug(可以通过在 Broker 端的 SendMessageProcessor/PullMessageProcesssor/QueryMessaageProcessor 几个业务处理器入口,在其重要方法中打印相关重要属性值的方式或者一步步地 Debug 代码,来仔细研究下其中的存储过程),反复几次后才可以对消息存储这部分有一个较为深刻的理解,同时也有助于提高对 RocketMQ 的整体理解。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。


本文地址:http://www.6aiq.com/article/1563130479801
知乎专栏 点击关注
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出