消息中间件—RocketMQ 的 RPC 通信(二)



转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com

AIQ 机器学习大数据 知乎专栏 点击关注

文章摘要:如何设计 RPC 通信层模型是任何一款性能强劲的 MQ 所要重点考虑的问题
在(一)篇中主要介绍了 RocketMQ 的协议格式,消息编解码,通信方式 (同步 / 异步 / 单向)、消息发送 / 接收以及异步回调的主要通信流程。而本篇将主要对 RocketMQ 消息队列 RPC 通信部分的 Netty 多线程模型进行重点介绍。

一、为何要使用 Netty 作为高性能的通信库?

在看 RocketMQ 的 RPC 通信部分时候,可能有不少同学有这样子的疑问,RocketMQ 为何要选择 Netty 而不直接使用 JDK 的 NIO 进行网络编程呢?这里有必要先来简要介绍下 Netty。
Netty 是一个封装了 JDK 的 NIO 库的高性能网络通信开源框架。它提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
下面主要列举了下一般系统的 RPC 通信模块会选择 Netty 作为底层通信库的理由(作者认为 RocketMQ 的 RPC 同样也是基于此选择了 Netty):
(1)Netty 的编程 API 使用简单,开发门槛低,无需编程者去关注和了解太多的 NIO 编程模型和概念;
(2)对于编程者来说,可根据业务的要求进行定制化地开发,通过 Netty 的 ChannelHandler 对通信框架进行灵活的定制化扩展;
(3)Netty 框架本身支持拆包 / 解包,异常检测等机制,让编程者可以从 JAVA NIO 的繁琐细节中解脱,而只需要关注业务处理逻辑;
(4)Netty 解决了(准确地说应该是采用了另一种方式完美规避了)JDK NIO 的 Bug(Epoll bug,会导致 Selector 空轮询,最终导致 CPU 100%);
(5)Netty 框架内部对线程,selector 做了一些细节的优化,精心设计的 reactor 多线程模型,可以实现非常高效地并发处理;
(6)Netty 已经在多个开源项目(Hadoop 的 RPC 框架 avro 使用 Netty 作为通信框架)中都得到了充分验证,健壮性 / 可靠性比较好。

二、RocketMQ 中 RPC 通信的 Netty 多线程模型

RocketMQ 的 RPC 通信部分采用了 **“1+N+M1+M2”** 的 Reactor 多线程模式,对网络通信部分进行了一定的扩展与优化,这一节主要让我们来看下这一部分的具体设计与实现内容。

2.1、Netty 的 Reactor 多线程模型设计概念与简述

这里有必要先来简要介绍下 Netty 的 Reactor 多线程模型。Reactor 多线程模型的设计思想是分而治之 + 事件驱动。
(1)分而治之
一般来说,一个网络请求连接的完整处理过程可以分为接受(accept)、数据读取(read)、解码 / 编码(decode/encode)、业务处理(process)、发送响应(send)这几步骤。Reactor 模型将每个步骤都映射成为一个任务,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是这个任务,且采用以非阻塞方式执行。
(2)事件驱动
每个任务对应特定网络事件。当任务准备就绪时,Reactor 收到对应的网络事件通知,并将任务分发给绑定了对应网络事件的 Handler 执行。

2.2、RocketMQ 中 RPC 通信的 1+N+M1+M2 的 Reactor 多线程设计与实现

(1)RocketMQ 中 RPC 通信的 Reactor 多线程设计与流程
RocketMQ 的 RPC 通信采用 Netty 组件作为底层通信库,同样也遵循了 Reactor 多线程模型,同时又在这之上做了一些扩展和优化。下面先给出一张 RocketMQ 的 RPC 通信层的 Netty 多线程模型框架图,让大家对 RocketMQ 的 RPC 通信中的多线程分离设计有一个大致的了解。
a2ee9dc73f054dd6ad2bb478e9b3784a.png

RocketMQ 的 RPC 通信层—1+N+M1+M2 模型.png

从上面的框图中可以大致了解 RocketMQ 中 NettyRemotingServer 的 Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的 1)负责监听 TCP 网络连接请求,建立好连接后丢给Reactor 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为 3),它负责将建立好连接的 socket 注册到 selector 上去(RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker 线程池(defaultEventExecutorGroup,即为上面的“M1”,源码中默认设置为 8)
为了更为高效的处理 RPC 的网络请求,这里的 Worker 线程池是专门用于处理 Netty 网络通信相关的(包括编码 / 解码、空闲链接管理、网络连接管理以及网络请求处理)。而处理业务操作放在业务线程池中执行(这个内容在“RocketMQ 的 RPC 通信(一)篇”中也有提到),根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)
下面以表格的方式列举了下上面所述的“1+N+M1+M2”Reactor 多线程模型

线程数 线程名 线程具体说明
1 NettyBoss_%d Reactor 主线程
N NettyServerEPOLLSelector_%d_%d Reactor 线程池
M1 NettyServerCodecThread_%d Worker 线程池
M2 RemotingExecutorThread_%d 业务 processor 处理线程池

(2)RocketMQ 中 RPC 通信的 Reactor 多线程的代码具体实现
说完了 Reactor 多线程整体的设计与流程,大家应该就对 RocketMQ 的 RPC 通信的 Netty 部分有了一个比较全面的理解了,那接下来就从源码上来看下一些细节部分(在看该部分代码时候需要读者对 JAVA NIO 和 Netty 的相关概念与技术点有所了解)。
在 NettyRemotingServer 的实例初始化时,会初始化各个相关的变量包括 serverBootstrap、nettyServerConfig 参数、channelEventListener 监听器并同时初始化 eventLoopGroupBoss 和 eventLoopGroupSelector 两个 Netty 的 EventLoopGroup 线程池(这里需要注意的是,如果是 Linux 平台,并且开启了 native epoll,就用 EpollEventLoopGroup,这个也就是用 JNI,调的 c 写的 epoll;否则,就用 Java NIO 的 NioEventLoopGroup。),具体代码如下:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;
      //省略部分代码
      //初始化时候nThreads设置为1,说明RemotingServer端的Disptacher链接管理和分发请求的线程为1,用于接收客户端的TCP连接
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        /**
         * 根据配置设置NIO还是Epoll来作为Selector线程池
         * 如果是Linux平台,并且开启了native epoll,就用EpollEventLoopGroup,这个也就是用JNI,调的c写的epoll;否则,就用Java NIO的NioEventLoopGroup。
         * 
         */
        if (useEpoll()) {
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
        //省略部分代码 

在 NettyRemotingServer 实例初始化完成后,就会将其启动。Server 端在启动阶段会将之前实例化好的 1 个 acceptor 线程(eventLoopGroupBoss),N 个 IO 线程(eventLoopGroupSelector),M1 个 worker 线程(defaultEventExecutorGroup)绑定上去。前面部分也已经介绍过各个线程池的作用了。
这里需要说明的是,Worker 线程拿到网络数据后,就交给 Netty 的 ChannelPipeline(其采用责任链设计模式),从 Head 到 Tail 的一个个 Handler 执行下去,这些 Handler 是在创建 NettyRemotingServer 实例时候指定的。NettyEncoder 和 NettyDecoder 负责网络传输数据和 RemotingCommand 之间的编解码。NettyServerHandler 拿到解码得到的 RemotingCommand 后,根据 RemotingCommand.type 来判断是 request 还是 response 来进行相应处理,根据业务请求码封装成不同的 task 任务后,提交给对应的业务 processor 处理线程池处理。

 @Override
    public void start() {
        //默认的处理线程池组,使用默认的处理线程池组用于处理后面的多个Netty Handler的逻辑操作

        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {

                    private AtomicInteger threadIndex = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
        /**
         * 首先来看下 RocketMQ NettyServer 的 Reactor 线程模型,
         * 一个 Reactor 主线程负责监听 TCP 连接请求;
         * 建立好连接后丢给 Reactor 线程池,它负责将建立好连接的 socket 注册到 selector
         * 上去(这里有两种方式,NIO和Epoll,可配置),然后监听真正的网络数据;
         * 拿到网络数据后,再丢给 Worker 线程池;
         *
         */
        //RocketMQ-> Java NIO的1+N+M模型:1个acceptor线程,N个IO线程,M1个worker 线程。
        ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        //服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
                        .option(ChannelOption.SO_REUSEADDR, true)//这个参数表示允许重复使用本地地址和端口
                        .option(ChannelOption.SO_KEEPALIVE, false)//当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
                        .childOption(ChannelOption.TCP_NODELAY, true)//该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
                        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())//这两个参数用于操作接收缓冲区和发送缓冲区
                        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {

                                ch.pipeline()
                                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                                        .addLast(defaultEventExecutorGroup,
                                                new NettyEncoder(),//rocketmq解码器,他们分别覆盖了父类的encode和decode方法
                                                new NettyDecoder(),//rocketmq编码器
                                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//Netty自带的心跳管理器
                                                new NettyConnectManageHandler(),//连接管理器,他负责捕获新连接、连接断开、异常等事件,然后统一调度到NettyEventExecuter处理器处理。
                                                new NettyServerHandler()//当一个消息经过前面的解码等步骤后,然后调度到channelRead0方法,然后根据消息类型进行分发 
                                        );
                            }
                        });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        //定时扫描responseTable,获取返回结果,并且处理超时
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

从上面的描述中可以概括得出 RocketMQ 的 RPC 通信部分的 Reactor 线程池模型框图。

fe35c65463f84ab38bba354213ed8411.png

RocketMQ 的 RPC 通信层—Reactor 线程池.png

整体可以看出 RocketMQ 的 RPC 通信借助 Netty 的多线程模型,其服务端监听线程和 IO 线程分离,同时将 RPC 通信层的业务逻辑与处理具体业务的线程进一步相分离。时间可控的简单业务都直接放在 RPC 通信部分来完成,复杂和时间不可控的业务提交至后端业务线程池中处理,这样提高了通信效率和 MQ 整体的性能。(ps:其中抽象出 NioEventLoop 来表示一个不断循环执行处理任务的线程,每个 NioEventLoop 有一个 selector,用于监听绑定在其上的 socket 链路。)

三、总结

仔细阅读 RocketMQ 的过程中收获了很多关于网络通信设计技术和知识点。对于刚接触开源版的 RocketMQ 的童鞋来说,想要自己掌握 RPC 通信部分的各个技术知识点,还需要不断地使用本地环境进行 debug 调试和阅读源码反复思考。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。后续还会陆续发布 RocketMQ 其他模块(Client、Broker 和 NameServer 等)的相关技术文章,敬请关注。
在此顺便为自己打个 Call,有兴趣的朋友可以关注下我的个人公众号:“匠心独运的博客”,对于 Java 并发、Spring、数据库和消息队列的一些细节、问题的文章将会在这个公众号上发布,欢迎交流与讨论。


更多高质资源 尽在AIQ 机器学习大数据 知乎专栏 点击关注

转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com