并发消费 相关配置
配置
说明
默认
consumeMessageBatchMaxSize
批量消费消息的最大数量
1
consumeThreadMin
消费线程的最小数量
20
consumeThreadMax
消费线程最大数量
20
consumeTimeout
消费超时时间,单位分钟
15
消息消费线程池 ConsumeMessageConcurrentlyService构造方法里面构造了 消费现场处理器
1 2 3 4 5 6 7 8 9 10 11 this .consumeRequestQueue = new LinkedBlockingQueue <>(); String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0 , 100 ) : consumerGroup) + "_" ; this .consumeExecutor = new ThreadPoolExecutor ( this .defaultMQPushConsumer.getConsumeThreadMin(), this .defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60 , TimeUnit.MILLISECONDS, this .consumeRequestQueue, new ThreadFactoryImpl ("ConsumeMessageThread_" + consumerGroupTag));
消费线程池使用的长度为Integer.MAX_VALUE的阻塞队列存放待待消费任务
线程前缀名 ConsumeMessageThread_
+ consumerGroup
消费线程池核心线程数和最大线程数默认都是20; 相应配置可修改consumeThreadMin、consumeThreadMax
拉取消息PULL_MESSAGE 客户端会不停的去拉取消息PULL_MESSAGE 到客户端来进行消费。该部分请看PULL_MESSAGE 请求流程
提交消费请求 PULL_MESSAGE 拉取到的消息需要提交给消费线程consumeExecutor
去消费。
ConsumeMessageConcurrentlyService#submitConsumeRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public void submitConsumeRequest ( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this .defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest (msgs, processQueue, messageQueue); try { this .consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this .submitConsumeRequestLater(consumeRequest); } } else { } }
将待消费的消息创建对应的ConsumeRequest 并提交到线程池中处理, 如果待消费的消息小于批量处理大小(consumeMessageBatchMaxSize 默认1),则只创建一个ConsumeRequest任务; 如果大于,则拆分成多个ConsumeRequest任务,一个ConsumeRequest任务最多处理consumeMessageBatchMaxSize
条消息
如果当前待处理的任务太多了, 阻塞队列满了则延迟5秒后再次投递
消费请求 ConsumeRequest 消费请求任务 提交到线程池之后执行,执行流程如下
ConsumeRequest#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 public void run () { if (this .processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}" , ConsumeMessageConcurrentlyService.this .consumerGroup, this .messageQueue); return ; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this .messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext (messageQueue); ConsumeConcurrentlyStatus status = null ; defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null ; if (ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext (); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap <>()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false ); ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false ; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s" , UtilAll.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this .consumerGroup, msgs, messageQueue), e); hasException = true ; } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000 ) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}" , ConsumeMessageConcurrentlyService.this .consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } if (ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageConcurrentlyService.this .getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this .consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this .processConsumeResult(status, context, this ); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}" , messageQueue, msgs); } }
如果处理队列是被Dropped状态,则直接return
如果有设置ConsumeMessageHook ,则执行ConsumeMessageHook的前置方法executeHookBefore
执行MessageListenerConcurrently监听器
根据监听器的执行结果,设置ConsumeReturnType ①.ConsumeReturnType.EXCEPTION : 监听器抛出了异常 ②.ConsumeReturnType.RETURNNULL: 监听器返回了NULL ③.ConsumeReturnType.TIME_OUT : 监听器消费超时,默认15分钟 ④.ConsumeReturnType.FAILED : 监听器返回了 ConsumeConcurrentlyStatus.RECONSUME_LATER ⑤. ConsumeReturnType.SUCCESS: 监听器返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS (ROLLBACK 、COMMIT 已经废弃,这个也是ConsumeReturnType.SUCCESS) 如果有设置ConsumeMessageHook ,则会把ConsumeReturnType放到consumeMessageContext的Props属性ConsumeContextType
中
如果有设置ConsumeMessageHook ,则执行ConsumeMessageHook的后置方法executeHookAfter
处理消费结果: ①. 将消费成功的消息从 ProcessQueue中的TreeMap中移除 ②. 将已消费成功的偏移量更新到内存offsetTable中; 用于后续的提交 ③. 如果是消费失败状态, 则会将消息发回队列中等待重试。
定期清理过期消息 如果消费者在消费的时候, 一直阻塞着, 那么这个Offset就成为一个Offset的阻塞点, 它后面的已消费的Offset都不能够提交. 那么整个消费流程就会停滞。那么有什么兜底方案避免这种情况吗?
答案是有的,有一个定期清理过期消息的定时任务;
ConsumeMessageConcurrentlyService#start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void start () { this .cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable () { @Override public void run () { try { cleanExpireMsg(); } catch (Throwable e) { log.error("scheduleAtFixedRate cleanExpireMsg exception" , e); } } }, this .defaultMQPushConsumer.getConsumeTimeout(), this .defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); }
默认情况下,超时时间是15分钟, 上面的定时任务会判断每个ProcessQueue中的TreeMap最小偏移量的那个Message是否已经过期了。 如果过期了
将该消息发回重试,间隔时间延长等级为3 ,对应的默认时间是10s。
将该Message从TreeMap中移除,移除了之后 它后面已消费的offset就可以提交了。
计算是否超时的时间, consumeStartTimeStamp
是在执行ConsumeRequest请求,执行完了ConsumeMessageHook前置处理器开始计算的。ConsumeMessageConcurrentlyService#run
PS:这里的间隔时间为this.defaultMQPushConsumer.getConsumeTimeout()
; 个人觉得设置的并不合理; 因为最差的情况, 过期的消息可能需要this.defaultMQPushConsumer.getConsumeTimeout() * 2 ; 默认情况下也就是15分钟*2 ;
顺序消费 相关配置
配置
说明
默认
consumeThreadMin
消费线程的最小数量
20
consumeThreadMax
消费线程最大数量
20
consumeTimeout
消费超时时间,单位分钟
15
消息消费线程池 跟并发消费ConsumeMessageConcurrentlyService情况一样 ;
提交消费请求 1 2 3 4 5 6 7 8 9 10 11 @Override public void submitConsumeRequest ( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest (processQueue, messageQueue); this .consumeExecutor.submit(consumeRequest); } }
因为顺序消费需要严格保证顺序性,不跟并发消费一样把消息创建多个ConsumeRequest来消费,而是只有一个ConsumeRequest :
执行消费流程 ConsumeMessageOrderlyService#run
25062
消费完成处理流程
Q&A 1. 如果是批量消费,一部分成功一部分失败会如何重试? 并发消费的情况 :
目前的情况, 如果用户返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER
; 则该批次消费的消息都会全部重试。 如果你想让成功的部分不重试,仅仅重试失败的部分,那么你可以通过设置ConsumeMessageContext中的ackIndex
的值来实现, 并且需要返回ConsumeConcurrentlyStatus.SUCCESS
; 具体操作请看:Rocketmq并发消费失败重试机制
顺序消费的情况
顺序消费的情况,根据用户返回的ConsumeOrderlyStatus状态, 如果是ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
状态; 则表示需要重试; 重试的逻辑是
如果重试次数超出了最大重试次数maxReconsumeTimes(默认INTEGER.MAXVALUE)
, 则尝试将该批次的消息重新发到重试队列中。如果该批次都发送成功,则标记该批次消费成功。
如果没有超出最大重试次数,则会延迟suspendCurrentQueueTimeMillis(默认1000ms)
之后重新消费。
2. 并发消费,如何保证消费 offset 提交顺序?
每次消息消费的时候,只是会把已经消费的消息 从 待消费消息TreeMap中移除掉. 然后更新一下当前TreeMap最小值为消费Offset。 提交的时候每次提交的也是最小的消息Offset。比如 并发批量消费的时候用异步的方式去消费5条消息,对应的偏移量为 1、2、3、4、5,它们也存在TreeMap中 , 消费成功之后就会从TreeMap中移除, 提交当前TreeMap中最小的偏移量; 就是2、3、4、5都消费成功了,但是1没有消费成功,那么它们都不会被提交未消费成功。
3. 批量消息是怎么消费的呢?
可以通过设置consumer.consumeMessageBatchMaxSize(批量消费数量)
来进行批量消费。