消息中间件—RocketMQ 消息发送


本文地址:http://www.6aiq.com/article/1563129642050
知乎专栏 点击关注
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出

大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适合初中阶童鞋作为 MQ 研究和学习的切入点。因此,本篇主要从一条消息发送为切入点,详细阐述在 RocketMQ 这款分布式消息队列中发送一条普通消息的大致流程和细节。在阅读本篇之前希望读者能够先仔细读下关于 RocketMQ 分布式消息队列 Remoting 通信模块的两篇文章:
消息中间件—RocketMQ 的 RPC 通信(一)
消息中间件—RocketMQ 的 RPC 通信(二)

一、RocketMQ 网络架构图

RocketMQ 分布式消息队列的网络部署架构图如下图所示(其中,包含了生产者 Producer 发送普通消息至集群的两条主线)对于上图中几个角色的说明:

(1)NameServer:RocketMQ 集群的命名服务器(也可以说是注册中心),它本身是无状态的(实际情况下可能存在每个 NameServer 实例上的数据有短暂的不一致现象,但是通过定时更新,在大部分情况下都是一致的),用于管理集群的元数据( 例如,KV 配置、Topic、Broker 的注册信息)。
(2)Broker(Master):RocketMQ 消息代理服务器主节点,起到串联 Producer 的消息发送和 Consumer 的消息消费,和将消息的落盘存储的作用;
(3)Broker(Slave):RocketMQ 消息代理服务器备份节点,主要是通过同步 / 异步的方式将主节点的消息同步过来进行备份,为 RocketMQ 集群的高可用性提供保障;
(4)Producer(消息生产者):在这里为普通消息的生产者,主要基于 RocketMQ-Client 模块将消息发送至 RocketMQ 的主节点。

对于上面图中几条通信链路的关系:

(1)Producer 与 NamerServer:每一个 Producer 会与 NameServer 集群中的一个实例建立 TCP 连接,从这个 NameServer 实例上拉取 Topic 路由信息;
(2)Producer 和 Broker:Producer 会和它要发送的 topic 相关联的 Master 的 Broker 代理服务器建立 TCP 连接,用于发送消息以及定时的心跳信息;
(3)Broker 和 NamerServer:Broker(Master or Slave)均会和每一个 NameServer 实例来建立 TCP 连接。Broker 在启动的时候会注册自己配置的 Topic 信息到 NameServer 集群的每一台机器中。即每一个 NameServer 均有该 broker 的 Topic 路由配置信息。其中,Master 与 Master 之间无连接,Master 与 Slave 之间有连接;

二、客户端发送普通消息的 demo 方法

在 RocketMQ 源码工程的 example 包下就有最为简单的发送普通消息的样例代码(ps:对于刚刚接触 RocketMQ 的童鞋使用这个包下面的样例代码进行系统性的学习和调试)。
我们可以直接 run 下 **“org.apache.rocketmq.example.simple”** 包下 Producer 类的 main 方法即可完成一次普通消息的发送(主要代码如下,在这里需本地将 NameServer 和 Broker 实例均部署起来):

 //step1.启动DefaultMQProducer,启动时的具体流程一会儿会详细说明
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("producer");
        producer.start();        try {
            {                //step2.封装将要发送消息的内容
                Message msg = new Message("TopicTest",                        "TagA",                        "OrderID188",                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));                //step3.发送消息流程,具体流程待会儿说
                SendResult sendResult = producer.send(msg);
            }
        } catch (Exception e) {            //Exception code
        }
        producer.shutdown();

三、RocketMQ 发送普通消息的全流程解读

从上面一节中可以看出,消息生产者发送消息的 demo 代码还是较为简单的,核心就几行代码,但在深入研读 RocketMQ 的 Client 模块后,发现其发送消息的核心流程还是有一些复杂的。下面将主要从 DefaultMQProducer 的启动流程、send 发送方法和 Broker 代理服务器的消息处理三方面分别进行分析和阐述。

3.1 DefaultMQProducer 的启动流程

在客户端发送普通消息的 demo 代码部分,我们先是将 DefaultMQProducer 实例启动起来,里面调用了默认生成消息的实现类—DefaultMQProducerImpl 的 start() 方法。

@Override
    public void start() throws MQClientException {        this.defaultMQProducerImpl.start();
    }

默认生成消息的实现类—DefaultMQProducerImpl 的启动主要流程如下:
(1)初始化得到 MQClientInstance 实例对象,并注册至本地缓存变量—producerTable 中;
(2)将默认 Topic(“TBW102”)保存至本地缓存变量—topicPublishInfoTable 中;
(3)MQClientInstance 实例对象调用自己的 start() 方法,启动一些客户端本地的服务线程,如拉取消息服务、客户端网络通信服务、重新负载均衡服务以及其他若干个定时任务(包括,更新路由 / 清理下线 Broker/ 发送心跳 / 持久化 consumerOffset/ 调整线程池),并重新做一次启动(这次参数为 false);
(4)最后向所有的 Broker 代理服务器节点发送心跳包;
总结起来,DefaultMQProducer 的主要启动流程如下:
15b1dd8e8ab143cf8cbe20d37ba70ddf.png

这里有以下几点需要说明:

(1)在一个客户端中,一个 producerGroup 只能有一个实例;
(2)根据不同的 clientId,MQClientManager 将给出不同的 MQClientInstance;
(3)根据不同的 producerGroup,MQClientInstance 将给出不同的 MQProducer 和 MQConsumer(保存在本地缓存变量——producerTable 和 consumerTable 中);

3.2 send 发送方法的核心流程

通过 Rocketmq 的客户端模块发送消息主要有以下三种方法:
(1)同步方式

(2)异步方式

(3)Oneway 方式

其中,使用(1)、(2)种方式来发送消息比较常见,具体使用哪一种方式需要根据业务情况来判断。本节内容将结合同步发送方式(同步发送模式下,如果有发送失败的最多会有 3 次重试(也可以自己设置),其他模式均 1 次)进行消息发送核心流程的简析。使用同步方式发送消息核心流程的入口如下:

 /**
     * 同步方式发送消息核心流程的入口,默认超时时间为3s
     *
     * @param msg     发送消息的具体Message内容
     * @param timeout 其中发送消息的超时时间可以参数设置
     * @return
     * @throws MQClientException
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    public SendResult send(Message msg,        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }

3.2.1 尝试获取 TopicPublishInfo 的路由信息

我们一步步 debug 进去后会发现在 sendDefaultImpl()方法中先对待发送的消息进行前置的验证。如果消息的 Topic 和 Body 均没有问题的话,那么会调用—tryToFindTopicPublishInfo() 方法,根据待发送消息的中包含的 Topic 尝试从 Client 端的本地缓存变量—topicPublishInfoTable中查找,如果没有则会从 NameServer 上更新 Topic 的路由信息(其中,调用了 MQClientInstance 实例的 updateTopicRouteInfoFromNameServer 方法,最终执行的是 MQClientAPIImpl 实例的 getTopicRouteInfoFromNameServer 方法),这里分别会存在以下两种场景:
(1)生产者第一次发送消息(此时,Topic 在 NameServer 中并不存在):因为第一次获取时候并不能从远端的 NameServer 上拉取下来并更新本地缓存变量—topicPublishInfoTable 成功。因此,第二次需要通过默认 Topic—TBW102的 TopicRouteData 变量来构造 TopicPublishInfo 对象,并更新 DefaultMQProducerImpl 实例的本地缓存变量——topicPublishInfoTable。
另外,在该种类型的场景下,当消息发送至 Broker 代理服务器时,在SendMessageProcessor 业务处理器的 sendBatchMessage/sendMessage 方法里面的 super.msgCheck(ctx, requestHeader, response) 消息前置校验中,会调用 TopicConfigManager 的 createTopicInSendMessageMethod 方法,在 Broker 端完成新 Topic 的创建并持久化至配置文件中(配置文件路径:{rocketmq.home.dir}/store/config/topics.json)。(ps:该部分内容其实属于 Broker 有点超本篇的范围,不过由于涉及新 Topic 的创建因此在略微提了下)
(2)生产者发送 Topic 已存在的消息:由于在 NameServer 中已经存在了该 Topic,因此在第一次获取时候就能够取到并且更新至本地缓存变量中 topicPublishInfoTable,随后 tryToFindTopicPublishInfo 方法直接可以 return。

在 RocketMQ 中该部分的核心方法源码如下(已经加了注释):a.tryToFindTopicPublishInfo 方法源码如下:



/**
     * 根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo
     * 如果没有则更新路由信息,从nameserver端拉取最新路由信息
     *
     * topicPublishInfo
     * 
     * @param topic
     * @return
     */
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {        //step1.先从本地缓存变量topicPublishInfoTable中先get一次
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);        if (null == topicPublishInfo || !topicPublishInfo.ok()) {            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());            //step1.2 然后从nameServer上更新topic路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);            //step2 然后再从本地缓存变量topicPublishInfoTable中再get一次
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {            return topicPublishInfo;
        } else {            /**
             *  第一次的时候isDefault为false,第二次的时候default为true,即为用默认的topic的参数进行更新
             */
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);          
			return topicPublishInfo;
        }
    }

b.getTopicRouteInfoFromNameServer 方法源码如下:

/**
     * 本地缓存中不存在时从远端的NameServer注册中心中拉取Topic路由信息
     *
     * @param topic
     * @param timeoutMillis
     * @param allowTopicNotExist
     * @return
     * @throws MQClientException
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     * @throws RemotingConnectException
     */
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);        //设置请求头中的Topic参数后,发送获取Topic路由信息的request请求给NameServer
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);       //这里由于是同步方式发送,所以直接return response的响应
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);        assert response != null;        switch (response.getCode()) {            //如果NameServer中不存在待发送消息的Topic
            case ResponseCode.TOPIC_NOT_EXIST: {                if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }                break;
            }            //如果获取Topic存在,则成功返回,利用TopicRouteData进行解码,且直接返回TopicRouteData
            case ResponseCode.SUCCESS: {                byte[] body = response.getBody();                if (body != null) {                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }            default:                break;
        }        throw new MQClientException(response.getCode(), response.getRemark());
    }

将 TopicRouteData 转换至 TopicPublishInfo 路由信息的映射图如下:
ee5eaf3e22244b6dbd76a946d79f30bf.png

其中,上面的 TopicRouteData 和 TopicPublishInfo 路由信息变量大致如下:

3.2.2 选择消息发送的队列

在获取了 TopicPublishInfo 路由信息后,RocketMQ 的客户端在默认方式下,selectOneMessageQueuef() 方法会从 TopicPublishInfo 中的 messageQueueList 中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在 MQFaultStrategy 这个类中定义:

public class MQFaultStrategy {    //维护每个Broker发送消息的延迟
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();    //发送消息延迟容错开关
    private boolean sendLatencyFaultEnable = false;    //延迟级别数组
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};    //不可用时长数组
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
  ......
}

这里通过一个 sendLatencyFaultEnable 开关来进行选择采用下面哪种方式:
(1)sendLatencyFaultEnable 开关打开:在随机递增取模的基础上,再过滤掉 not available 的 Broker 代理。所谓的 "latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的 latency 超过 550Lms,就退避 3000Lms;超过 1000L,就退避 60000L。
(2)sendLatencyFaultEnable 开关关闭(默认关闭):采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息。

/**
     * 根据sendLatencyFaultEnable开关是否打开来分两种情况选择队列发送消息
     * @param tpInfo
     * @param lastBrokerName
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {        if (this.sendLatencyFaultEnable) {            try {                //1.在随机递增取模的基础上,再过滤掉not available的Broker代理;对之前失败的,按一定的时间做退避
                int index = tpInfo.getSendWhichQueue().getAndIncrement();                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                            return mq;
                    }
                }                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);                if (writeQueueNums > 0) {                    final MessageQueue mq = tpInfo.selectOneMessageQueue();                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }            return tpInfo.selectOneMessageQueue();
        }        //2.采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

3.2.3 发送封装后的 RemotingCommand 数据包

在选择完发送消息的队列后,RocketMQ 就会调用 sendKernelImpl() 方法发送消息(该方法为,通过 RocketMQ 的 Remoting 通信模块真正发送消息的核心)。在该方法内总共完成以下几个步流程:
(1)根据前面获取到的 MessageQueue 中的 brokerName,调用 MQClientInstance 实例的 findBrokerAddressInPublish() 方法,得到待发送消息中存放的 Broker 代理服务器地址,如果没有找到则跟新路由信息;
(2)如果没有禁用,则发送消息前后会有钩子函数的执行(executeSendMessageHookBefore()/executeSendMessageHookAfter() 方法);
(3)将与该消息相关信息封装成 RemotingCommand 数据包,其中请求码 RequestCode 为以下几种之一:
a.SENDMESSAGE(普通发送消息)
b.SEND****MESSAGEV2(优化网络数据包发送)c.SEND****BATCH_MESSAGE(消息批量发送)
(4)根据获取到的 Broke 代理服务器地址,将封装好的 RemotingCommand 数据包发送对应的 Broker 上,默认发送超时间为 3s;
(5)这里,真正调用 RocketMQ 的 Remoting 通信模块完成消息发送是在 MQClientAPIImpl 实例 sendMessageSync() 方法中,代码具体如下:

 private SendResult sendMessageSync(        final String addr,        final String brokerName,        final Message msg,        final long timeoutMillis,        final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);        assert response != null;        return this.processSendResponse(brokerName, msg, response);
    }

(6)processSendResponse 方法对发送正常和异常情况分别进行不同的处理并返回 sendResult 对象;
(7)发送返回后,调用 updateFaultItem 更新 Broker 代理服务器的可用时间;
(8)对于异常情况,且标志位—retryAnotherBrokerWhenNotStoreOK,设置为 true 时,在发送失败的时候,会选择换一个 Broker;
在生产者发送完成消息后,客户端日志打印如下:

SendResult [sendStatus=SEND_OK, msgId=020003670EC418B4AAC208AD46930000, offsetMsgId=AC1415A200002A9F000000000000017A, messageQueue=MessageQueue [topic=TopicTest, brokerName=HQSKCJJIDRRD6KC, queueId=2], queueOffset=1]

3.3 Broker 代理服务器的消息处理简析

Broker 代理服务器中存在很多 Processor 业务处理器,用于处理不同类型的请求,其中一个或者多个 Processor 会共用一个业务处理器线程池。对于接收到消息,Broker 会使用 SendMessageProcessor 这个业务处理器来处理。SendMessageProcessor 会依次做以下处理:
(1)消息前置校验,包括 broker 是否可写、校验 queueId 是否超过指定大小、消息中的 Topic 路由信息是否存在,如果不存在就新建一个。这里与上文中“尝试获取 TopicPublishInfo 的路由信息”一节中介绍的内容对应。如果 Topic 路由信息不存在,则 Broker 端日志输出如下:

2018-06-14 17:17:24 INFO SendMessageThread_1 - receive SendMessage request command, RemotingCommand [code=310, language=JAVA, version=252, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1528967815569, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]

2018-06-14 17:17:24 WARN SendMessageThread_1 - the topic TopicTest not exist, producer: /172.20.21.162:62661

2018-06-14 17:17:24 INFO SendMessageThread_1 - Create new topic by default topic:[TBW102] config:[TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]] producer:[172.20.21.162:62661]

Topic 路由信息新建后,第二次消息发送后,Broker 端日志输出如下:

2018-08-02 16:26:13 INFO SendMessageThread_1 - receive SendMessage request command, RemotingCommand [code=310, language=JAVA, version=253, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1533198373524, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]

2018-08-02 17:04:13 INFO SendMessageThread_1 - now broker will do DefaultMessageStore classs to persist the message data content

(2)构建 MessageExtBrokerInner;
(3)调用brokerController.getMessageStore().putMessage 将 MessageExtBrokerInner 做落盘持久化处理;
(4)根据消息落盘结果(正常 / 异常情况),BrokerStatsManager 做一些统计数据的更新,最后设置 Response 并返回;

四、总结

使用 RocketMQ 的客户端发送普通消息的流程大概到这里就分析完成。建议读者可以将作者之前写的两篇关于 RocketMQ 的 RPC 通信(一)和(二)结合起来读,可能整体会更加连贯,收获更大。关于顺序消息、分布式事务消息等内容将在后续篇幅中陆续介绍,敬请期待。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。


本文地址:http://www.6aiq.com/article/1563129642050
知乎专栏 点击关注
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出