消息中间件—RocketMQ 消息存储(二)


本文地址:http://www.6aiq.com/article/1563130737288
知乎专栏 点击关注
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出

消息中间件—RocketMQ 的 RPC 通信(一)
消息中间件—RocketMQ 的 RPC 通信(二)
消息中间件—RocketMQ 消息发送
消息中间件—RocketMQ 消息消费(一)
消息中间件—RocketMQ 消息消费(二)(push 模式实现)
消息中间件—RocketMQ 消息消费(三)(消息消费重试)
消息中间件—RocketMQ 消息存储(一)

9478eb5ce2f04fccb3b7d46dccb5f5ab.png

RokcetMQ 文件存储设计架构 _v2.jpg
上面图中假设 Consumer 端默认设置的是同一个 ConsumerGroup,因此 Consumer 端线程采用的是负载订阅的方式进行消费。从架构图中可以总结出如下几个关键点:

(1)消息生产与消息消费相互分离,Producer 端发送消息最终写入的是 CommitLog(消息存储的日志数据文件),Consumer 端先从 ConsumeQueue(消息逻辑队列)读取持久化消息的起始物理位置偏移量 offset、大小 size 和消息 Tag 的 HashCode 值,随后再从 CommitLog 中进行读取待拉取消费消息的真正实体内容部分;

(2)RocketMQ 的 CommitLog 文件采用混合型存储(所有的 Topic 下的消息队列共用同一个 CommitLog 的日志数据文件),并通过建立类似索引文件—ConsumeQueue 的方式来区分不同 Topic 下面的不同 MessageQueue 的消息,同时为消费消息起到一定的缓冲作用(只有 ReputMessageService 异步服务线程通过 doDispatch 异步生成了 ConsumeQueue 队列的元素后,Consumer 端才能进行消费)。这样,只要消息写入并刷盘至 CommitLog 文件后,消息就不会丢失,即使 ConsumeQueue 中的数据丢失,也可以通过 CommitLog 来恢复。

(3)RocketMQ 每次读写文件的时候真的是完全顺序读写么?这里,发送消息时,生产者端的消息确实是顺序写入 CommitLog;订阅消息时,消费者端也是顺序读取 ConsumeQueue,然而根据其中的起始物理位置偏移量 offset 读取消息真实内容却是随机读取 CommitLog。 在 RocketMQ 集群整体的吞吐量、并发量非常高的情况下,随机读取文件带来的性能开销影响还是比较大的,那么这里如何去优化和避免这个问题呢?后面的章节将会逐步来解答这个问题。

这里,同样也可以总结下 RocketMQ 存储架构的优缺点:

(1)优点:

a、ConsumeQueue 消息逻辑队列较为轻量级;

b、对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致 IOWAIT 增高;

(2)缺点:

a、对于 CommitLog 来说写入消息虽然是顺序写,但是读却变成了完全的随机读;

b、Consumer 端订阅消费一条消息,需要先读 ConsumeQueue,再读 Commit Log,一定程度上增加了开销;

二、RocketMQ 存储关键技术—再谈 Mmap 与 PageCache

上篇中已经对 Mmap 内存映射技术(具体为 JDK NIO 的 MappedByteBuffer)和 PageCache 概念进行了一定的深入分析。本节在回顾这两种技术的同时,从其他的维度来阐述上篇未涉及的细节点。

1.1、Mmap 内存映射技术—MappedByteBuffer

(1)Mmap 内存映射技术的特点

Mmap 内存映射和普通标准 IO 操作的本质区别在于它并不需要将文件中的数据先拷贝至 OS 的内核 IO 缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读 / 写操作一样。只有当缺页中断发生时,直接将文件从磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝。对于容量较大的文件来说(文件大小一般需要限制在 1.5~2G 以下),采用 Mmap 的方式其读 / 写的效率和性能都非常高。

95198bb541804e5cae1edbdd5dd6e959.png
(2)JDK NIO 的 MappedByteBuffer 简要分析

从 JDK 的源码来看,MappedByteBuffer 继承自 ByteBuffer,其内部维护了一个逻辑地址变量—address。在建立映射关系时,MappedByteBuffer 利用了 JDK NIO 的 FileChannel 类提供的 map()方法把文件对象映射到虚拟内存。仔细看源码中 map() 方法的实现,可以发现最终其通过调用 native 方法 map0()完成文件对象的映射工作,同时使用 Util.newMappedByteBuffer() 方法初始化 MappedByteBuffer 实例,但最终返回的是 DirectByteBuffer 的实例。在 Java 程序中使用 MappedByteBuffer 的 get()方法来获取内存数据是最终通过 DirectByteBuffer.get() 方法实现(底层通过 unsafe.getByte() 方法,以“地址 + 偏移量”的方式获取指定映射至内存中的数据)。

(3)使用 Mmap 的限制

a.Mmap 映射的内存空间释放的问题;由于映射的内存空间本身就不属于 JVM 的堆内存区(Java Heap),因此其不受 JVM GC 的控制,卸载这部分内存空间需要通过系统调用 unmap()方法来实现。然而 unmap() 方法是 FileChannelImpl 类里实现的私有方法,无法直接显示调用。RocketMQ 中的做法是,通过 Java 反射的方式调用“sun.misc”包下的 Cleaner 类的 clean() 方法来释放映射占用的内存空间;

b.MappedByteBuffer 内存映射大小限制;因为其占用的是虚拟内存(非 JVM 的堆内存),大小不受 JVM 的 -Xmx 参数限制,但其大小也受到 OS 虚拟内存大小的限制。一般来说,一次只能映射 1.5~2G 的文件至用户态的虚拟内存空间,这也是为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了;

c. 使用 MappedByteBuffe 的其他问题;会存在内存占用率较高和文件关闭不确定性的问题;

2.2、OS 的 PageCache 机制

PageCache 是 OS 对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写访问,这里的主要原因就是在于 OS 使用 PageCache 机制对读写访问操作进行了性能优化,将一部分的内存用作 PageCache。



(1)对于数据文件的读取,如果一次读取文件时出现未命中 PageCache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取(ps:顺序读入紧随其后的少数几个页面)。这样,只要下次访问的文件已经被加载至 PageCache 时,读取操作的速度基本等于访问内存。

(2)对于数据文件的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。

对于文件的顺序读写操作来说,读和写的区域都在 OS 的 PageCache 内,此时读写性能接近于内存。RocketMQ 的大致做法是,将数据文件映射到 OS 的虚拟内存中(通过 JDK NIO 的 MappedByteBuffer),写消息的时候首先写入 PageCache,并通过异步刷盘的方式将消息批量的做持久化(同时也支持同步刷盘);订阅消费消息时(对 CommitLog 操作是随机读取),由于 PageCache 的局部性热点原理且整体情况下还是从旧到新的有序读,因此大部分情况下消息还是可以直接从 Page Cache 中读取,不会产生太多的缺页(Page Fault)中断而从磁盘读取。

90ec12b3f8104452acebd4f58667cf73.png
RokcetMQ 文件存储 PageCache 机制.jpg

PageCache 机制也不是完全无缺点的,当遇到 OS 进行脏页回写,内存回收,内存 swap 等情况时,就会引起较大的消息读写延迟。

对于这些情况,RocketMQ 采用了多种优化技术,比如内存预分配,文件预热,mlock 系统调用等,来保证在最大可能地发挥 PageCache 机制优点的同时,尽可能地减少其缺点带来的消息读写延迟。

三、RocketMQ 存储优化技术

这一节将主要介绍 RocketMQ 存储层采用的几项优化技术方案在一定程度上可以减少 PageCache 的缺点带来的影响,主要包括内存预分配,文件预热和 mlock 系统调用。

3.1 预先分配 MappedFile

在消息写入过程中(调用 CommitLog 的 putMessage() 方法),CommitLog 会先从 MappedFileQueue 队列中获取一个 MappedFile,如果没有就新建一个。

这里,MappedFile 的创建过程是将构建好的一个 AllocateRequest 请求(具体做法是,将下一个文件的路径、下下个文件的路径、文件大小为参数封装为 AllocateRequest 对象)添加至队列中,后台运行的 AllocateMappedFileService 服务线程(在 Broker 启动时,该线程就会创建并运行),会不停地 run,只要请求队列里存在请求,就会去执行 MappedFile 映射文件的创建和预分配工作,分配的时候有两种策略,一种是使用 Mmap 的方式来构建 MappedFile 实例,另外一种是从 TransientStorePool 堆外内存池中获取相应的 DirectByteBuffer 来构建 MappedFile(ps:具体采用哪种策略,也与刷盘的方式有关)。并且,在创建分配完下个 MappedFile 后,还会将下下个 MappedFile 预先创建并保存至请求队列中等待下次获取时直接返回。RocketMQ 中预分配 MappedFile 的设计非常巧妙,下次获取时候直接返回就可以不用等待 MappedFile 创建分配所产生的时间延迟。

4619bbcc24b7451da3fa04662a2dc5a8.png

预分配 MappedFile 的主要过程.jpg

3.2 文件预热 &&mlock 系统调用

(1)mlock 系统调用:其可以将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到 swap 空间。对于 RocketMQ 这种的高吞吐量的分布式消息队列来说,追求的是消息读写低延迟,那么肯定希望尽可能地多使用物理内存,提高数据读写访问的操作效率。

(2)文件预热:预热的目的主要有两点;第一点,由于仅分配内存并进行 mlock 系统调用后并不会为程序完全锁定这些内存,因为其中的分页可能是写时复制的。因此,就有必要对每个内存页面中写入一个假的值。其中,RocketMQ 是在创建并分配 MappedFile 的过程中,预先写入一些随机值至 Mmap 映射出的内存空间里。第二,调用 Mmap 进行内存映射后,OS 只是建立虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。程序要访问数据时 OS 会检查该部分的分页是否已经在内存中,如果不在,则发出一次缺页中断。这里,可以想象下 1G 的 CommitLog 需要发生多少次缺页中断,才能使得对应的数据才能完全加载至物理内存中(ps:X86 的 Linux 中一个标准页面大小是 4KB)?RocketMQ 的做法是,在做 Mmap 内存映射的同时进行 madvise 系统调用,目的是使 OS 做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。

四、RocketMQ 存储相关的模型与封装类简析

(1)CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容。单个文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

(2) ConsumeQueue:消息消费的逻辑队列,其中包含了这个 MessageQueue 在 CommitLog 中的起始物理位置偏移量 offset,消息实体内容的大小和 Message Tag 的哈希值。从实际物理存储来说,ConsumeQueue 对应每个 Topic 和 QueuId 下面的文件。单个文件大小约 5.72M,每个文件由 30W 条数据组成,每个文件默认大小为 600 万个字节,当一个 ConsumeQueue 类型的文件写满了,则写入下一个文件;

(3)IndexFile:用于为生成的索引文件提供访问服务,通过消息 Key 值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引;

(4)MapedFileQueue:对连续物理存储的抽象封装类,源码中可以通过消息存储的物理偏移量位置快速定位该 offset 所在 MappedFile(具体物理存储位置的抽象)、创建、删除 MappedFile 等操作;

(5)MappedFile:文件存储的直接内存映射业务抽象封装类,源码中通过操作该类,可以把消息字节写入 PageCache 缓存区(commit),或者原子性地将消息持久化的刷盘(flush);

五、RocketMQ 消息刷盘的主要过程

在 RocketMQ 中消息刷盘主要可以分为同步刷盘和异步刷盘两种。
e29a44ed868e4978a8bb3db3a7be0eca.png

RocketMQ 同步 && 异步刷盘两种方式.jpg
(1)同步刷盘:如上图所示,只有在消息真正持久化至磁盘后,RocketMQ 的 Broker 端才会真正地返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用领域。RocketMQ 同步刷盘的大致做法是,基于生产者消费者模型,主线程创建刷盘请求实例—GroupCommitRequest 并在放入刷盘写队列后唤醒同步刷盘线程—GroupCommitService,来执行刷盘动作(其中用了 CAS 变量和 CountDownLatch 来保证线程间的同步)。这里,RocketMQ 源码中用读写双缓存队列(requestsWrite/requestsRead)来实现读写分离,其带来的好处在于内部消费生成的同步刷盘请求可以不用加锁,提高并发度。

(2)异步刷盘:能够充分利用 OS 的 PageCache 的优势,只要消息写入 PageCache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量。异步和同步刷盘的区别在于,异步刷盘时,主线程并不会阻塞,在将刷盘线程 wakeup 后,就会继续执行。

六、结语

在参考了 @艾瑞克的那篇 RocketMQ 存储相关技术博文后,让我理解了公众号的文章与其他技术细节文章应该是有所区别的,公众号文章还是力求精简(ps:贴大量代码尤其需要慎重),篇幅太长会影响阅读体验,更多的内容应该以各种设计图和少量的文字为说明。同时,由于 RocketMQ 本身较为复杂,光看技术文章只能理解和领会一个大概,更多地还是需要自己多撸源码、Debug 以及多实践才能对其有一个较为深入的理解。

由于目前微信对本公众号依然没有放开评论功能,需要讨论的同学可以直接在公号内给我留言,我会依次回复内容。如果喜欢本文,请收藏后点个赞并转发朋友圈哦。


本文地址:http://www.6aiq.com/article/1563130737288
知乎专栏 点击关注
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出