消息中间件—RocketMQ 消息消费(一)

文章摘要:在发送消息给 RocketMQ 后,消费者需要消费。消息的消费比发送要复杂一些,那么 RocketMQ 是如何来做的呢?
在 RocketMQ 系列文章的前面几篇幅中已经对其“RPC 通信部分”和“普通消息发送”两部分进行了详细的阐述,本文将主要从消息消费为切入点简要地介绍下“RocketMQ 中 Pull 和 Push 的两种消费方式”、“RocketMQ 中消费者(Push 模式)的启动流程”和“RocketMQ 中 Pull 和 Push 两种消费方式的简要流程”。在阅读本篇之前希望读者能够先仔细阅读下关于 RocketMQ 分布式消息队列的前几篇文章:

http://www.6aiq.com/article/1563128272857
http://www.6aiq.com/article/1563128435731
http://www.6aiq.com/article/1563129642050

一、如何选择消息消费的方式—Pull or Push?

1.1 MQ 中 Pull 和 Push 的两种消费方式

对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:
(1)Push 方式:由消息中间件(MQ 消息服务器代理)主动地将消息推送给消费者;采用 Push 方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是**“慢消费问题”),而 MQ 不断地向消费者 Push 消息,消费者端的缓冲区可能会溢出,导致异常;
(2)Pull 方式:由消费者客户端主动向消息中间件(MQ 消息服务器代理)拉取消息;采用 Pull 方式,如何设置 Pull 消息的频率需要重点去考虑,举个例子来说,可能 1 分钟内连续来了 1000 条消息,然后 2 小时内没有新消息产生(概括起来说就是
“消息延迟与忙等待”**)。如果每次 Pull 的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ 中消息的堆积量变大;若每次 Pull 的时间间隔较短,但是在一段时间内 MQ 中并没有任何消息可以消费,那么会产生很多无效的 Pull 请求的 RPC 开销,影响 MQ 整体的网络性能;

1.2 RocketMQ 消息消费的长轮询机制

思考题
上面简要说明了 Push 和 Pull 两种消息消费方式的概念和各自特点。如果长时间没有消息,而消费者端又不停的发送 Pull 请求不就会导致 RocketMQ 中 Broker 端负载很高吗?那么在 RocketMQ 中如何解决以做到高效的消息消费呢?

通过研究源码可知,RocketMQ 的消费方式都是基于拉模式拉取消息的,而在这其中有一种长轮询机制(对普通轮询的一种优化),来平衡上面 Push/Pull 模型的各自缺点。基本设计思路是:消费者如果第一次尝试 Pull 消息失败(比如:Broker 端没有可以消费的消息),并不立即给消费者客户端返回 Response 的响应,而是先 hold 住并且挂起请求(将请求保存至 pullRequestTable 本地缓存变量中),然后 Broker 端的后台独立线程—PullRequestHoldService 会从 pullRequestTable 本地缓存变量中不断地去取,具体的做法是查询待拉取消息的偏移量是否小于消费队列最大偏移量,如果条件成立则说明有新消息达到 Broker 端(这里,在 RocketMQ 的 Broker 端会有一个后台独立线程—ReputMessageService 不停地构建 ConsumeQueue/IndexFile 数据,同时取出 hold 住的请求并进行二次处理),则通过重新调用一次业务处理器—PullMessageProcessor 的处理请求方法—processRequest()来重新尝试拉取消息(此处,每隔 5S 重试一次,默认长轮询整体的时间设置为 30s)。
RocketMQ 消息 Pull 的长轮询机制的关键在于 Broker 端的 PullRequestHoldService 和 ReputMessageService 两个后台线程。对于 RocketMQ 的长轮询(LongPolling)消费模式后面会专门详细介绍。

二、RocketMQ 中两种消费方式的 demo 代码

(1)Pull 模式的 Consumer 端代码如下:

        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("consumer");
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest111");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            System.out.println(pullResult.getMsgFoundList().get(0).toString());
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case NO_MATCHED_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    //TODO
                }
            }
        }
        consumer.shutdown();

在示例代码中,可以看到业务工程在 Consumer 启动后,Consumer 主动获取 MessageQueue 的 Set 集合,遍历该集合中的每一个队列,发送 Pull 的请求(参数中带有队列中的消息偏移量),同时需要 Consumer 端自己保存消息消费的 offset 偏移量至本地变量中。在 Pull 模式下,需要业务应用代码自身去完成比较多的事情,因此在实际应用中用的较少。
(2)Push 模式的 Consumer 端代码如下:



DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest111", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setInstanceName("consumer1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

在示例代码中,业务工程的应用程序使用 Push 方式进行消费时,Consumer 端注册了一个监听器,Consumer 在收到消息后主动调用这个监听器完成消费并进行对应的业务逻辑处理。由此可见,业务应用代码只需要完成消息消费即可,无需参与 MQ 本身的一些任务处理(ps:业务代码显得更为简洁一些)。

三、RocketMQ 中消费者 Push 方式的启动流程

这一节主要先讲下 RocketMQ 消费者的启动流程,看下在启动的时候究竟完成了什么样的操作。由于 RocketMQ 的 DefaultMQPushConsumer 和 DefaultMQPullConsumer 启动流程大部分类似,而 DefaultMQPushConsumer 更为复杂一些,因此这一节内容主要讲的是 DefaultMQPushConsumer 启动流程。Push 方式的 Consumer 启动流程的时序图如下图所示:

a0662763d0444563957a9596d0a82f55.png

RocketMQ 的 PushConsumer 启动时序图.jpg

从上面的时序图上可以看出,Push 方式的 Consumer 启动流程完成的任务比较多,主要任务如下:
(1)设置 consumerGroup、NameServer 服务地址、消费起始偏移地址并根据参数 Topic 构建 Consumer 端的 SubscriptionData(订阅关系值);
(2)在 Consumer 端注册消费者监听器,当消息到来时完成消费消息;
(3)启动 defaultMQPushConsumerImpl 实例,主要完成前置校验、复制订阅关系(将 defaultMQPushConsumer 的订阅关系复制至 rebalanceImpl 中,包括 retryTopic(重试主题)对应的订阅关系)、创建 MQClientInstance 实例、设置 rebalanceImpl 的各个属性值、pullAPIWrapper 包装类对象的初始化、初始化 offsetStore 实例并加载消费进度、启动消息消费服务线程以及在 MQClientInstance 中注册 consumer 等任务;
(4)启动 MQClientInstance 实例,其中包括完成客户端网络通信线程、拉取消息服务线程、负载均衡服务线程和若干个定时任务的启动;
(5)向所有的 Broker 端发送心跳(采用加锁方式);
(6)最后,唤醒负载均衡服务线程在 Consumer 端开始负载均衡;

四、RocketMQ 中 Pull 和 Push 两种消费模式流程简析

RocketMQ 提供了两种消费模式,Push 和 Pull,大多数场景使用的是 Push 模式,在源码中这两种模式分别对应的是 DefaultMQPushConsumer 类和 DefaultMQPullConsumer 类。Push 模式实际上在内部还是使用的 Pull 方式实现的,通过 Pull 不断地轮询 Broker 获取消息,当不存在新消息时,Broker 端会挂起 Pull 请求,直到有新消息产生才取消挂起,返回新消息。
(1)RocketMQ 的 Pull 消费模式流程简析
RocketMQ 的 Pull 模式相对来得简单,从上面的 demo 代码中可以看出,业务应用代码通过由 Topic 获取到的 MessageQueue 直接拉取消息(最后真正执行的是 PullAPIWrapper 的 pullKernelImpl()方法,通过发送拉取消息的 RPC 请求给 Broker 端)。其中,消息消费的偏移量需要 Consumer 端自己去维护。
(2)RocketMQ 的 Push 消费模式流程简析
在本文前面已经提到过了,从严格意义上说,RocketMQ 并没有实现真正的消息消费的 Push 模式,而是对 Pull 模式进行了一定的优化,一方面在 Consumer 端开启后台独立的线程—PullMessageService 不断地从阻塞队列—pullRequestQueue 中获取 PullRequest 请求并通过网络通信模块发送 Pull 消息的 RPC 请求给 Broker 端。另外一方面,后台独立线程—rebalanceService 根据 Topic 中消息队列个数和当前消费组内消费者个数进行负载均衡,将产生的对应 PullRequest 实例放入阻塞队列—pullRequestQueue 中。这里算是比较典型的生产者-消费者模型,实现了准实时的自动消息拉取。然后,再根据业务反馈是否成功消费来推动消费进度。
在 Broker 端,PullMessageProcessor 业务处理器收到 Pull 消息的 RPC 请求后,通过 MessageStore 实例从 commitLog 获取消息。如 1.2 节内容所述,如果第一次尝试 Pull 消息失败(比如 Broker 端没有可以消费的消息),则通过长轮询机制先 hold 住并且挂起该请求,然后通过 Broker 端的后台线程 PullRequestHoldService 重新尝试和后台线程 ReputMessageService 的二次处理。

思考题
使用 RocketMQ 的 Pull 模式进行消息消费时,由上面可知该模式下无需自动拉取消息,这样在 DefaultMQPullConsumerImpl 启动时,消息拉取线程—PullMessageService 和消息队列负载线程—RebalanceService 其实也就没必要启动,但实际上却启动了,这里会有问题么?

五、总结

RocketMQ 的消息消费(一)(入门篇幅)就先分析到这里了。建议读者可以将作者之前写的三篇文章—“RocketMQ 的 RPC 通信(一)/(二)”以及“RocketMQ 消息发送”结合起来读,这样会整体会更加连贯,收获更大。关于 RocketMQ 消息消费的内容比较多也比较复杂,涉及“Consumer 端的负载均衡机制”、“RocketMQ 的长轮询机制”和“RocketMQ 中 Pull 和 Push 消费模式的细节内容”将在后续篇幅进行介绍和分析。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。


本文地址:https://www.6aiq.com/article/1563129820252
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出