消息中间件—RocketMQ 消息消费(三)(消息消费重试)



转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com

AIQ 机器学习大数据 知乎专栏 点击关注

摘要:如果 Consumer 端消费消息失败,那么 RocketMQ 是如何对失败的异常情况进行处理?
前面两篇 RocketMQ 消息消费(一)/(二)篇,主要从 Push/Pull 两种消费模式的简要流程、长轮询机制和 Consumer 端负载均衡这几点内容出发,介绍了 RocketMQ 消息消费的正常流程和细节内容,本篇内容将主要介绍 Consumer 端消费失败的异常流程。
这里先回顾往期 RocketMQ 技术分享的篇幅:

消息中间件—RocketMQ 的 RPC 通信(一)
消息中间件—RocketMQ 的 RPC 通信(二)
消息中间件—RocketMQ 消息发送
消息中间件—RocketMQ 消息消费(一)
消息中间件—RocketMQ 消息消费(二)(push 模式实现)

一、其他 MQ 中间件消费端可靠性的保障

在业务开发中,大家一定都遇到过业务工程因为各类异常(可能是业务工程本身的异常、JVM 内存异常或者系统所在的虚拟机宕机等),而导致 MQ 中间件发送过来的业务消息消费失败而无法再次消费该消息的情况。目前,很多 MQ 消息中间件都有相应的机制和方法来保证 Consumer 端消费消息的可靠性。下面先来看看 RabbitMQ 和 Kafka 这两款 MQ 消息中间件是如何来保证消费者端消息处理的可靠性的呢?

1.1 简谈 RabbitMQ 的手动消息确认 ACK 机制

RabbitMQ 提供了消息确认机制。消费者在订阅队列时,可以在代码中手动设置 autoAck 参数为 false,这时 RabbitMQ 会等待消费者显式地回复确认信号(即为显式地调用channel.basicAck(envelope.getDeliveryTag(), false) 方法)后才从集群中的内存(或磁盘)节点上移除消息,从而保证了这条消息不会因为消费失败而导致丢失。

1.2 简析 Kafka 消息消费的手动提交

在 Kafka 中,也可以采用上面那种的消费后的确认机制,通过在 Consumer 端设置 **“enable.auto.commit”** 属性为 false 后,待业务工程正常处理完消费后,在代码中手动调用 KafkaConsumer 实例的 commitSync() 方法提交(ps:这里指的是同步阻塞 commit 消费的偏移量,等待 Broker 端的返回响应,需要注意 Broker 端在对 commit 请求做出响应之前,消费端会处于阻塞状态,从而限制消息的处理性能和整体吞吐量),以确保消息能够正常被消费。如果在消费过程中,消费端突然 Crash,这时候消费偏移量没有 commit,等正常恢复后依然还会处理刚刚未 commit 的消息。

二、RocketMQ 消费失败后的消费重试机制

对比了另外两款 MQ 中间件后,接下来进入正题,主要来说说 RocketMQ 在消费失败后的是如何来保证消息消费的可靠性?

2.1 重试队列与死信队列的概念

在介绍 RocketMQ 的消费重试机制之前,需要先来说下“重试队列”和“死信队列”两个概念。
(1)重试队列:如果 Consumer 端因为各种类型异常导致本次消费失败,为防止该消息丢失而需要将其重新回发给 Broker 端保存,保存这种因为异常无法正常消费而回发给 MQ 的消息队列称之为重试队列。RocketMQ 会为每个消费组都设置一个 Topic 名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为 **“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”** 的重试队列中(具体细节后面会详细阐述)。
(2)死信队列:由于有些原因导致 Consumer 端长时间的无法正常消费从 Broker 端 Pull 过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中。在 RocketMQ 中,SubscriptionGroupConfig 配置常量默认地设置了两个参数,一个是 retryQueueNums 为 1(重试队列数量为 1 个),另外一个是 retryMaxTimes 为 16(最大重试消费的次数为 16 次)。Broker 端通过校验判断,如果超过了最大重试消费次数则会将消息移至这里所说的死信队列。这里,RocketMQ 会为每个消费组都设置一个 Topic 命名为“%DLQ%+consumerGroup" 的死信队列。一般在实际应用中,移入至死信队列的消息,需要人工干预处理;

2.1 Consumer 端回发消息至 Broker 端

在业务工程中的 Consumer 端(Push 消费模式下),如果消息能够正常消费需要在注册的消息监听回调方法中返回CONSUME_SUCCESS的消费状态,否则因为各类异常消费失败则返回RECONSUME_LATER的消费状态。消费状态的枚举类型如下所示:

public enum ConsumeConcurrentlyStatus {
    //业务方消费成功
    CONSUME_SUCCESS,
    //业务方消费失败,之后进行重新尝试消费
    RECONSUME_LATER;
}

如果业务工程对消息消费失败了,那么则会抛出异常并且返回这里的RECONSUME_LATER状态。这里,在消费消息的服务线程—consumeMessageService 中,将封装好的消息消费任务 ConsumeRequest 提交至线程池—consumeExecutor 异步执行。从消息消费任务 ConsumeRequest 的 run() 方法中会执行业务工程中注册的消息监听回调方法,并在 processConsumeResult 方法中根据业务工程返回的状态(CONSUME_SUCCESS 或者 RECONSUME_LATER)进行判断和做对应的处理(下面讲的都是在消费通信模式为集群模型下的,广播模型下的比较简单就不再分析了)。
(1)业务方正常消费(CONSUME_SUCCESS):正常情况下,设置 ackIndex 的值为 consumeRequest.getMsgs().size() - 1,因此后面的遍历 consumeRequest.getMsgs()消息集合条件不成立,不会调用回发消费失败消息至 Broker 端的方法—sendMessageBack(msg, context)。最后,更新消费的偏移量;
(2)业务方消费失败(RECONSUME_LATER):异常情况下,设置 ackIndex 的值为 -1,这时就会进入到遍历 consumeRequest.getMsgs()消息集合的 for 循环中,执行回发消息的方法—sendMessageBack(msg, context)。这里,首先会根据 brokerName 得到 Broker 端的地址信息,然后通过网络通信的 Remoting 模块发送 RPC 请求到指定的 Broker 上,如果上述过程失败,则创建一条新的消息重新发送给 Broker,此时新消息的 Topic 为 **“%RETRY%+ConsumeGroupName”**—重试队列的主题。其中,在 MQClientAPIImpl 实例的 consumerSendMessageBack() 方法中封装了 ConsumerSendMsgBackRequestHeader 的请求体,随后完成回发消费失败消息的 RPC 通信请求(业务请求码为:CONSUMER_SEND_MSG_BACK)。倘若上面的回发消息流程失败,则会延迟 5S 后重新在 Consumer 端进行重新消费。与正常消费的情况一样,在最后更新消费的偏移量;

2.3 Broker 端对于回发消息处理的主要流程

Broker 端收到这条 Consumer 端回发过来的消息后,通过业务请求码(CONSUMER_SEND_MSG_BACK)匹配业务处理器—SendMessageProcessor 来处理。在完成一系列的前置校验(这里主要是“消费分组是否存在”、“检查 Broker 是否有写入权限”、“检查重试队列数是否大于 0”等)后,尝试获取重试队列的 TopicConfig 对象(如果是第一次无法获取到,则调用 createTopicInSendMessageBackMethod() 方法进行创建)。根据回发过来的消息偏移量尝试从 commitlog 日志文件中查询消息内容,若不存在则返回异常错误。
然后,设置重试队列的 Topic—“%RETRY%+consumerGroup”至 MessageExt 的扩展属性“RETRY_TOPIC”中,并对根据延迟级别 delayLevel 和最大重试消费次数 maxReconsumeTimes 进行判断,如果超过最大重试消费次数(默认 16 次),则会创建死信队列的 TopicConfig 对象(用于后面将回发过来的消息移入死信队列)。在构建完成需要落盘的 MessageExtBrokerInner 对象后,调用“commitLog.putMessage(msg)”方法做消息持久化。这里,需要注意的是,在 putMessage(msg) 的方法里会使用“SCHEDULE_TOPIC_XXXX”和对应的延迟级别队列 Id 分别替换 MessageExtBrokerInner 对象的 Topic 和 QueueId 属性值,并将原来设置的重试队列主题(“%RETRY%+consumerGroup”)的 Topic 和 QueueId 属性值做一个备份分别存入扩展属性 properties 的“REAL_TOPIC”和“REAL_QID”属性中。看到这里也就大致明白了,回发给 Broker 端的消费失败的消息并非直接保存至重试队列中,而是会先存至 Topic 为 **“SCHEDULE_TOPIC_XXXX”** 的定时延迟队列中。

疑问:上面说了 RocketMQ 的重试队列的 Topic 是“%RETRY%+consumerGroup”,为啥这里要保存至 Topic 是“SCHEDULE_TOPIC_XXXX”的这个延迟队列中呢?

在源码中搜索下关键字—“SCHEDULE_TOPIC_XXXX”,会发现 Broker 端还存在着一个后台服务线程—ScheduleMessageService(通过消息存储服务—DefaultMessageStore 启动),通过查看源码可以知道其中有一个 DeliverDelayedMessageTimerTask 定时任务线程会根据 Topic(“SCHEDULE_TOPIC_XXXX”)与 QueueId,先查到逻辑消费队列 ConsumeQueue,然后根据偏移量,找到 ConsumeQueue 中的内存映射对象,从 commitlog 日志中找到消息对象 MessageExt,并做一个消息体的转换(messageTimeup() 方法,由定时延迟队列消息转化为重试队列的消息),再次做持久化落盘,这时候才会真正的保存至重试队列中。看到这里就可以解释上面的疑问了,定时延迟队列只是为了用于暂存的,然后延迟一段时间再将消息移入至重试队列中。RocketMQ 设定不同的延时级别 delayLevel,并且与定时延迟队列相对应,具体源码如下:

    //省略
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    /**
     * 定时延时消息主题的队列与延迟等级对应关系
     * @param delayLevel
     * @return
     */
    public static int delayLevel2QueueId(final int delayLevel) {
        return delayLevel - 1;
    }

2.4 Consumer 端消费重试机制

每个 Consumer 实例在启动的时候就默认订阅了该消费组的重试队列主题,DefaultMQPushConsumerImpl 的 copySubscription() 方法中的相关代码如下:

private void copySubscription() throws MQClientException {
            //省略其他代码...
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING://如果消息消费模式为集群模式,还需要为该消费组对应一个重试主题
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
            //省略其他代码...
      }

因此,这里也就清楚了,Consumer 端会一直订阅该重试队列主题的消息,向 Broker 端发送如下的拉取消息的 PullRequest 请求,以尝试重新再次消费重试队列中积压的消息。

PullRequest [consumerGroup=CID_JODIE_1, messageQueue=MessageQueue [topic=%RETRY%CID_JODIE_1, brokerName=HQSKCJJIDRRD6KC, queueId=0], nextOffset=51]

最后,给出一张 RocketMQ 消息重试机制的框图(ps:这里只是描述了消息消费失败后重试拉取的部分重要过程):
b5cdd298b62344bb8449c8004cf9cf06.png

RocketMQ 消息重试机制.jpg

三、总结

RocketMQ 的消息消费(三)(消息消费重试)篇幅就先分析到这里了。关于 RocketMQ 消息消费的内容比较多也比较复杂,需要读者结合源码并多次 debug(可以通过分别在 Consumer 端和 Broker 端的部分重要方法中打印重要对象中的各个属性值的方式,来仔细研究下其中的过程),才可以对其有一个较为深刻的理解。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。


更多高质资源 尽在AIQ 机器学习大数据 知乎专栏 点击关注

转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com