这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

网友投稿 329 2022-08-24

这些年遇到的RocketMQ消息消费超时/消费异常重试机制导致的重复消费问题(并发消费和顺序消费)源码分析

一、前言

前一段时间在业务上遇到了一个MQ重复消费的问题,排查发现一个老哥在代码里写了个线程睡眠n分钟(n为客户控制)的逻辑(设计方案真是一言难尽…),导致必现消息重复消费的问题;于是接盘在业务上修改了设计方案、做了消息幂等处理。本着​​知其然知其所以然​​的原则,今天深入分析一下消息消费超时/失败是怎么重试的?

PS:RocketMQ版本4.8.0,本文中相关源码注释见GitHub中:​​RocketMQ:release-4.8.0​​。

二、源码分析

在​​RocketMQ源码分析pullMessage:Consumer是如何从broker拉取消息的?​​这篇文章我们介绍了Consumer如何从Broker拉取消息的、Consumer如何处理拉取到的消息;

其中在从Broker拉取消息成功之后,会进入到PullCallback#onSuccess()方法,当拉取到消息时,首先将消息全部放入到处理队列​​ProcessQueue​​​中,然后通知​​消费消息服务consumeMessageService​​开始干活。

1、入口

接着上面的来看,​​ConsumeMessageService#submitConsumeRequest()​​为Consumer开始真正开始消费消息的入口;

而​​ConsumeMessageService​​​是一个接口,它有两个实现:​​ConsumeMessageConcurrentlyService​​​、​​ConsumeMessageOrderlyService​​,分别表示并发消费模式、顺序消费模式;因此下面我们从并发消费和顺序消费两部分分别研究消息消费超时/失败的重试机制;

2、并发消费模式核心逻辑

我们接着​​ConsumeMessageConcurrentlyService#submitConsumeRequest()​​来看,并发消费模式下是如何处理消息消费请求的?

​​ConsumeMessageConcurrentlyService​​采用线程池的机制对消息进行分批并发消费,默认一个消息是一批;

1> Consumer端线程执行异常导致的消费异常重试

从上图我们可以看出,在线程池中执行线程任务时,如果失败,Consumer端会自己延时5s之后重试当前消息消费任务,见​​ConsumeMessageConcurrentlyService#submitConsumeRequestLater()​​方法;

ConsumeRequest

​​ConsumeRequest​​是ConsumeMessageConcurrentlyService的内部类,它作为一个线程任务,内部封装了消息消息请求的具体执行逻辑;

​​ConsumeRequest#run()​​方法主要做四个操作:

注册业务系统自定义的消费监听器,负责具体的消息消费;并设置消息的重试Topic;执行消费监听器的​​consumeMessage()​​方法,进行真正的消费消息操作;统计消息消费数据;判断消息消息是否超时、出现异常,处理消息消费结果;

@Overridepublic void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } // todo 1、这里是执行消费前的钩子函数,也就是我们业务系统定义的消费监听器,负责具体消息的消费 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; // 设置消息的重试topic defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null; // 如果消费者注册了消息消费者hook if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); // consumer消费前的钩子函数,类似于Spring中的BeanPostProcessor#postProcessBeforeInitialization()方法 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { // 设置每条消息被消费的时间 MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } // 2、 开始消费消息 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } // 消息消费总耗时 long consumeRT = System.currentTimeMillis() - beginTimestamp; // 根据是否出现异常等,判断处理结果 if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } // 消费超时,默认15分钟 } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } // 在钩子函数中放入消费结果 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } // 执行后置的钩子函数 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); // 类似于Spring中的BeanPostProcessor#postProcessAfterInitialization()方法 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } // 3、统计消息消费数据 ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { // 4、处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); }}

下面我开始本文的正题,在调用消息消费监听器后是如何处理消息消费超时 / 异常的?

2> 执行业务上自定义消费监听器导致的消费异常重试

如果在自定义消费监听器中执行业务逻辑出现异常,会将hasException属性设置为true,供后置的钩子函数​​ConsumeMessageHook​​使用;并且此时status字段为null,在后面的逻辑中如果发现status字段为null,则会将其设置为RECONSUME_LATER(稍后重新消费);最终在processConsumeResult()中再根据这个status处理消费结果;

在​​processConsumeResult()​​方法中,会维护一个变量ackIndex表示当前消费请求中 第一个未ACK的消息 在msgs集合中的下标;如果消息全部消费成功,则ackIndex为msgsSize + 1;如果消息消费失败,则ackIndex为-1,表示该批消息需要全部重新消费,即将消息发送回Broker;如果发送消息回Broker失败,Consumer则延时5s后,重新执行当前消费请求。

发送回Broker的消息延时级别默认0;

消息延时级别从哪里获取的?

我们往上追,最终确定ConsumeConcurrentlyContext的来源为ConsumeRequest#run()方法的开头处,并且后续未对其​​delayLevelWhenNextConsume​​属性做任何修改;

这个我们从ConsumeConcurrentlyContext类中也可以看到,其​​setDelayLevelWhenNextConsume()​​方法未被使用;

Broker端SendMessageProcessor#asyncConsumerSendMsgBack()方法中会处理backMsg。虽然Consumer没有传延时级别,但Broker会默认将其延时级别设置为3,然后将消息先以延时消息的机制发送到延时队列(SCHEDULE_TOPIC_XXXX)中,最多只会重试16次(可配置);重试超过16次会将消息添加到死信队列(%DLQ%+消费组)中。待消息到达投递时间(到期)后,消息转存到重试队列(%RETRY%consumerGroup)中。

Consumer端此时再接收到的该消息本质上是源自"%RETRY%+消费组名称"主题,而不是原始的topic。

Broker端如何处理的?见文章:​​深度剖析RocketMQ延时消息机制原理/源码​​;

3> 消费超时重试

从ConsumeRequest#run()方法中,我们看不到任何关于消费超时重试的处理,这里只会统计一个消费超时的状态;

可以发现消费超时阈值的获取方式:defaultMQPushConsumer.getConsumeTimeout(),我们看看这个方法还在哪里被用到了;点进去发现在ProcessQueue的cleanExpiredMsg()方法中有调用它,作为判断消息是否过期的阈值(也是并发消费模式下消息过期的阈值)。

​​ProcessQueue#cleanExpiredMsg()​​方法中如果判断消息已经过期,会将消息在本地缓存msgTreeMap中清除、并以延时消息(延时级别为3)的方式发送回Broker。 Consumer端此时再接收到的该消息本质上是源自"%RETRY%+消费组名称"主题,而不是开始的那个topic。

Broker端如何处理的?见文章:​​深度剖析RocketMQ延时消息机制原理/源码​​;

对于消费超时的消息,首先会以延时消息的机制将其发送到延时队列(SCHEDULE_TOPIC_XXXX)中,待消息到达投递时间(到期)后,消息转存到重试队列(%RETRY%consumerGroup)中。这里我们可以注意到延时消息的方式会将消息又被存到CommitLog中 2 * n(重试次数)遍。

从这里我们可以看到,延时级别是3的情况下,理应10s后消息才会从延时队列投递到重试队列中,然而现象确实经过了10ms就投递了;由于消息到期的时间是从ConsumeQueue中每条记录的后64位取的,下一篇我们就研究一下CommitLog中的数据如何同步到ConsumeQueue中的?

3、顺序消费模式核心逻辑

顺序消费模式和并发消费模式一样都存在消费异常重试的场景,但是由于顺序消费模式不会清理过期消息,所以不存在消费超时重试的场景。

1> 消费异常重试

与并发消费不同的是顺序消费的ConsumeRequest只针对ProcessQueue和MessageQueue,而不是针对消息。其获取消息的逻辑是直接从ProcessQueue中取,一次取consumeMessageBatchMaxSize个(默认一个)。

另外:run()方法中消息消费的逻辑与非顺序消费差不多,但其关键点在于消息的消费/获取的顺序性,所以就不可避免的引入锁机制(加锁范围是针对ProcessQueue,或者说是MessageQueue,所以说RocketMQ无法做到多MessageQueue的全局顺序消费)。

出现异常的具体逻辑也基本和并发消费模式一样:

如果调用自定义消费监听器消费消息异常,则status状态为null,在后面的逻辑中如果status == null,则将SUSPEND_CURRENT_QUEUE_A_MOMENT赋值给status,进而在processConsumeResult()中处理消费结果。

在processConsumeResult()方法中,我们主要看非自动提交ACK的情况;

首先会检查是否达到最大重试消费次数(默认是Integer.MAX_VALUE);如果没有达到最大重试次数,默认延时1s后再次开始尝试消费消息。

1)checkReconsumeTimes()

看一下checkReconsumeTimes()方法是如何判断重试次数的?

1)submitConsumeRequestLater()

延时1s后,再次开启当前消费消息任务;

2> 顺序消费不存在超时消费机制

但是其不存在超时消费重试的概念,因为没有清理过期消息这个操作:

在Consumer启动的时候会启动消息消费服务​​consumeMessageService​​,对于并发消费模式而言是定期清理过期消息;而对于顺序消费而言则是定时向Broker申请加锁,以确保消息的顺序消费。

三、总结

1、并发消费模式

消费异常重试机制:

出现异常的两种场景:执行消费请求异常出错、执行指定以消费监听器出错;出现异常之后会发送延时级别为0的消息到Broker,Broker端的SendMessageProcessor#asyncConsumerSendMsgBack()方法中遇到延时级别为0的消息会将其延时级别设置为(3 + 消费重试次数);然后将消息先以延时消息的机制发送到延时队列(SCHEDULE_TOPIC_XXXX)中,最多只会重试16次(可配置);重试超过16次会将消息添加到死信队列(%DLQ%+消费组)中。待消息到达投递时间(到期)后,消息转存到重试队列(%RETRY%consumerGroup)中。**Consumer端此时再次接收到的该消息本质上是源自"%RETRY%+消费组名称"主题,而不是原始的topic。

消费超时重试机制:

主要体现在并发消费模式会周期性清理过期的消息,然后将其发送回Broker,后面的步骤和消费异常重试机制一样;最终当前消费者能再次消费到重试队列(%RETRY%+consumerGroup)中的消息。

2、顺序消费模式

顺序消费模式不存在消费超时重试的机制,对于消费异常重试的逻辑基本和并发消费一样,区别在于,顺序消费模式遇到异常,延时1s后重试(再次消费),重试次数默认为Integer.MAX_VALUE;

这里我们可以发现一个问题,如果一个消息处理的特别慢 或者说消费出现异常在一直重试,那么它后面的消息就会被阻塞;进而导致消息堆积的现象。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Linux下的xml文件的创建
下一篇:餐饮店如何做好口碑营销?(怎样做好口碑营销)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~