并发消费

相关配置

配置 说明 默认
consumeMessageBatchMaxSize 批量消费消息的最大数量 1
consumeThreadMin 消费线程的最小数量 20
consumeThreadMax 消费线程最大数量 20
consumeTimeout 消费超时时间,单位分钟 15

消息消费线程池

ConsumeMessageConcurrentlyService构造方法里面构造了 消费现场处理器

1
2
3
4
5
6
7
8
9
10
11
// 无边界阻塞队列
this.consumeRequestQueue = new LinkedBlockingQueue<>();
String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));

  1. 消费线程池使用的长度为Integer.MAX_VALUE的阻塞队列存放待待消费任务
  2. 线程前缀名 ConsumeMessageThread_+ consumerGroup
  3. 消费线程池核心线程数和最大线程数默认都是20; 相应配置可修改consumeThreadMin、consumeThreadMax

拉取消息PULL_MESSAGE

客户端会不停的去拉取消息PULL_MESSAGE到客户端来进行消费。该部分请看PULL_MESSAGE 请求流程

提交消费请求

PULL_MESSAGE 拉取到的消息需要提交给消费线程consumeExecutor去消费。

ConsumeMessageConcurrentlyService#submitConsumeRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {// 当前提交的消息量小于等于批量处理的数量; 就直接一次提交
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 拆分成多个ConsumeRequest
}
}
  1. 将待消费的消息创建对应的ConsumeRequest并提交到线程池中处理, 如果待消费的消息小于批量处理大小(consumeMessageBatchMaxSize 默认1),则只创建一个ConsumeRequest任务; 如果大于,则拆分成多个ConsumeRequest任务,一个ConsumeRequest任务最多处理consumeMessageBatchMaxSize条消息

  2. 如果当前待处理的任务太多了, 阻塞队列满了则延迟5秒后再次投递

消费请求 ConsumeRequest

消费请求任务 提交到线程池之后执行,执行流程如下

ConsumeRequest#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

public void run() {
// 处理队列被 丢弃,则直接忽略
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
// 去掉RETRY 前缀
defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}

if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

  1. 如果处理队列是被Dropped状态,则直接return

  2. 如果有设置ConsumeMessageHook,则执行ConsumeMessageHook的前置方法executeHookBefore

  3. 执行MessageListenerConcurrently监听器

  4. 根据监听器的执行结果,设置ConsumeReturnType
    ①.ConsumeReturnType.EXCEPTION : 监听器抛出了异常
    ②.ConsumeReturnType.RETURNNULL: 监听器返回了NULL
    ③.ConsumeReturnType.TIME_OUT : 监听器消费超时,默认15分钟
    ④.ConsumeReturnType.FAILED : 监听器返回了 ConsumeConcurrentlyStatus.RECONSUME_LATER
    ⑤. ConsumeReturnType.SUCCESS: 监听器返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS (ROLLBACK 、COMMIT 已经废弃,这个也是ConsumeReturnType.SUCCESS)
    如果有设置ConsumeMessageHook,则会把ConsumeReturnType放到consumeMessageContext的Props属性ConsumeContextType

  5. 如果有设置ConsumeMessageHook,则执行ConsumeMessageHook的后置方法executeHookAfter

  6. 处理消费结果:
    ①. 将消费成功的消息从 ProcessQueue中的TreeMap中移除
    ②. 将已消费成功的偏移量更新到内存offsetTable中; 用于后续的提交
    ③. 如果是消费失败状态, 则会将消息发回队列中等待重试。

定期清理过期消息

如果消费者在消费的时候, 一直阻塞着, 那么这个Offset就成为一个Offset的阻塞点, 它后面的已消费的Offset都不能够提交. 那么整个消费流程就会停滞。那么有什么兜底方案避免这种情况吗?

答案是有的,有一个定期清理过期消息的定时任务;

ConsumeMessageConcurrentlyService#start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
// 每隔15分钟 执行一次清理过期的消息(兜底政策);
// 查找每个ProcessQueue的的最小Offset,是否有超时还未消费完成的Msg;如果有的话,则主动发回重试,并将该Msg从TreeMap中移除
@Override
public void run() {
try {
cleanExpireMsg();
} catch (Throwable e) {
log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
}
}
// 这里的间隔时间改成1或者 5 分钟比较合适
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}

默认情况下,超时时间是15分钟, 上面的定时任务会判断每个ProcessQueue中的TreeMap最小偏移量的那个Message是否已经过期了。
如果过期了

  1. 将该消息发回重试,间隔时间延长等级为3 ,对应的默认时间是10s。
  2. 将该Message从TreeMap中移除,移除了之后 它后面已消费的offset就可以提交了。
  3. 计算是否超时的时间, consumeStartTimeStamp是在执行ConsumeRequest请求,执行完了ConsumeMessageHook前置处理器开始计算的。ConsumeMessageConcurrentlyService#run

PS:这里的间隔时间为this.defaultMQPushConsumer.getConsumeTimeout(); 个人觉得设置的并不合理; 因为最差的情况, 过期的消息可能需要this.defaultMQPushConsumer.getConsumeTimeout() * 2 ; 默认情况下也就是15分钟*2 ;

顺序消费

相关配置

配置 说明 默认
consumeThreadMin 消费线程的最小数量 20
consumeThreadMax 消费线程最大数量 20
consumeTimeout 消费超时时间,单位分钟 15

消息消费线程池

跟并发消费ConsumeMessageConcurrentlyService情况一样 ;

提交消费请求

1
2
3
4
5
6
7
8
9
10
11
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}

因为顺序消费需要严格保证顺序性,不跟并发消费一样把消息创建多个ConsumeRequest来消费,而是只有一个ConsumeRequest :

执行消费流程

ConsumeMessageOrderlyService#run

1
2
3
public void run() {
// 代码忽略
}

25062

消费完成处理流程

Q&A

1. 如果是批量消费,一部分成功一部分失败会如何重试?

并发消费的情况

目前的情况, 如果用户返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER; 则该批次消费的消息都会全部重试。
如果你想让成功的部分不重试,仅仅重试失败的部分,那么你可以通过设置ConsumeMessageContext中的ackIndex的值来实现, 并且需要返回ConsumeConcurrentlyStatus.SUCCESS;
具体操作请看:Rocketmq并发消费失败重试机制

顺序消费的情况

顺序消费的情况,根据用户返回的ConsumeOrderlyStatus状态, 如果是ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 状态; 则表示需要重试;
重试的逻辑是

  1. 如果重试次数超出了最大重试次数maxReconsumeTimes(默认INTEGER.MAXVALUE), 则尝试将该批次的消息重新发到重试队列中。如果该批次都发送成功,则标记该批次消费成功。
  2. 如果没有超出最大重试次数,则会延迟suspendCurrentQueueTimeMillis(默认1000ms)之后重新消费。

2. 并发消费,如何保证消费 offset 提交顺序?

每次消息消费的时候,只是会把已经消费的消息 从 待消费消息TreeMap中移除掉.
然后更新一下当前TreeMap最小值为消费Offset。 提交的时候每次提交的也是最小的消息Offset。
比如 并发批量消费的时候用异步的方式去消费5条消息,对应的偏移量为 1、2、3、4、5,它们也存在TreeMap中 ,
消费成功之后就会从TreeMap中移除, 提交当前TreeMap中最小的偏移量; 就是2、3、4、5都消费成功了,但是1没有消费成功,那么它们都不会被提交未消费成功。

3. 批量消息是怎么消费的呢?

可以通过设置consumer.consumeMessageBatchMaxSize(批量消费数量) 来进行批量消费。