前言

首先, 想要全面的了解消息轨迹, 我们可以提出以下问题, 然后根据以下问题逐步的去从源码中寻找答案!

  • 什么是消息轨迹?消息轨迹可以做什么?
  • 如何开启消息轨迹以及如何使用?
  • 消息轨迹的实现原理是什么?
  • 使用消息轨迹的注意点和建议?

什么是消息轨迹

消息轨迹是指一条消息从生产者发送到RocketMQ服务端,再到消费者消费处理,整个过程中的各个相关节点的时间、状态等数据汇聚而成的完整链路信息。该轨迹可作为生产环境中排查问题强有力的数据支持。

RocketMQ系统中,一条消息的完整链路包含生产者、服务端、消费者三个角色,每个角色处理消息的过程中都会在轨迹链路中增加相关的信息,将这些信息汇聚即可获取任意消息当前的状态,涉及的数据如下图所示。

使用场景
在生产环境的消息收发不符合预期时可以使用消息轨迹工具排查问题。通过消息的属性(Message ID、Message Key、Topic)搜索相关的消息轨迹,找到消息的实际收发状态,帮助诊断问题。

如何启动和使用消息轨迹

启用消息轨迹

修改Broker配置

1
2
3
4
5
6
7
8
## 是否开启自动创建消息轨迹的Topic,默认false
traceTopicEnable=true

## Broker端收到消息会将该值返回给客户端,客户端根据该值判断是否需要发送轨迹消息,意思是只有发到该Broker中的消息才会有轨迹消息 ,默认false
traceOn=true

## 存储消息轨迹的Topic,默认为RMQ_SYS_TRACE_TOPIC;这个值不用改,直接使用就好
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC

关键代码 TopicConfigManager

1
2
3
4
5
6
7
8
9
// 如果开启了消息轨迹, 则自动创建消息轨迹Topic, 读写队列都是1
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
  1. 开启traceTopicEnable=true,仅仅只是会自动创建消息轨迹Topic,不代表允许该Broker允许写入轨迹消息
  2. 消息轨迹Topic是系统Topic,并且读写队列都是1
  3. 必须在Broker中配置traceOn=true,才允许写入轨迹消息

SendMessageTraceHookImpl#sendMessageAfter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

@Override
public void sendMessageAfter(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded 如果是消息轨迹的Topic 那就不需要再执行了,不然成递归了
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) {
return;
}
if (context.getSendResult() == null) {// 超时或者返回异常等消息发送失败的情况,那肯定消息轨迹也不能发送
return;
}
// 判断一下Broker是否开启了Trace 消息轨迹
if (context.getSendResult().getRegionId() == null
|| !context.getSendResult().isTraceOn()) {
// if switch is false,skip it
return;
}

}
  1. Broker返回Response的时候,会把Broker的配置一起返回,客户端通过判断这个值来决定是不是需要发送轨迹消息。

Producer开启消息轨迹

DefaultMQProducer构造函数有2个入参与此相关

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 指定命名空间、生产者组、RPC 挂钩、启用的 msgTrace 标志和自定义跟踪主题名称的构造函数。
*
* @param namespace 此 MQ Producer 实例的命名空间。
* @param producerGroup 生产者组,请参阅同名字段。
* @param rpcHook RPC 每个远程命令执行时要执行的 RPC 挂钩
* @param enableMsgTrace 是否开启消息轨迹
* @param customizedTraceTopic 消息跟踪主题的名称值。如果不配置,可以使用默认的跟踪主题名称:RMQ_SYS_TRACE_TOPIC。
*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {

  1. 构建生产者实例的时候将enableMsgTrace设置为true; customizedTraceTopic设置为null,则会使用默认的消息轨迹Topic:RMQ_SYS_TRACE_TOPIC; 如果Broker端自定义了消息轨迹的Topic,那么这里需要对应修改;
  2. 发送消息的时候, 尽量指定keys, 这样可以更高效的查找到消息的轨迹, 管理后台需要通过Key或者MessageId来查找

使用示例:

1
2
3
4
5
6
7
8
// 设置自定义Hook 和 开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer(groupName, new SzzProducerRPCHook(),true,null);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
Message msg = new Message(topic, tag,"szzkey",
(" I'm 石臻臻, timestamp:" + System.currentTimeMillis()).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);

Consumer开启消息轨迹

跟Producer类似, DefaultMQPushConsumer构造函数也有同样的入参

1
2
3
4

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)

  1. 构建消费者实例的时候将enableMsgTrace设置为true; customizedTraceTopic设置为null,则会使用默认的消息轨迹Topic; 如果Broker端自定义了消息轨迹的Topic,那么这里需要对应修改;

使用示例:

1
2
3
4
5
// 开启消息轨迹
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName,true);
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.subscribe(topic, "*");
consumer.start();

使用说明

消息轨迹的使用不会增加额外的接入成本(不过会增加消息量)。所有类型的消息正常发送后,即可根据消息的属性在RocketMQ控制台上查询到消息的发送轨迹,但消费轨迹需要注意以下几点:

消息类型 查询说明
普通消息 消息发送之后有发送轨迹,没消费前显示尚未消费。消费后会展示投递和消费信息。
顺序消息 消息发送之后有发送轨迹,没消费前显示尚未消费。消费后会展示投递和消费信息。
定时消息和延时消息 如果当前系统时间没有到达指定消费的时间,轨迹查询不到,消息也查询不到。
事务消息 事务未提交之前,轨迹查询不到,消息也查询不到。

控制台查询消息轨迹

消息轨迹的实现原理

相关ISSUE: Request new feature for the message track #525

生产者在发送消息的时候,如果判断开启了enableMsgTrace, 则会注册2个Hook
SendMessageHookEndTransactionHook

  1. SendMessageTraceHookImpl 记录消息轨迹的Hook
  2. EndTransactionTraceHookImpl 跟事务消息相关的轨迹Hook,事务提交的时候会写入消息轨迹

PS: 想了解发送消息时候所有HOOK调用顺序请看 发送消息流程中的各个扩展点说明和使用

在消息的发送前后SendMessageTraceHookImpl会分别执行sendMessageBeforesendMessageAfter

在消息发送完成之后,这个sendMessageAfter会组装好轨迹消息,然后提交给AsyncTraceDispatcher中的阻塞队列;
异步任务AsyncRunnable会从队列中获取轨迹消息,然后当做一个正常的消息发送到服务端, 当然这个消息的Topic就是RMQ_SYS_TRACE_TOPIC(默认);

当然整个流程不影响正常消息的发送,轨迹消息是异步化处理

AsyncTraceDispatcher#sendTraceDataByMQ

  1. 轨迹消息除了基本的信息还有发送的状态
  2. 轨迹消息是异步批量发送的, 按照时间维度和一批次数据量大小触发发送的
    ①. 同一批次最多不超过500ms就得发出去了
    ②. 同一批次积攒的消息大小不超过4M就得发出去了
    ③. 同步一次积攒的Key不超过32kb就得发出去了

消息轨迹的使用建议

独立存放消息轨迹

建议专门规划一个broker节点(或集群)来存储这些轨迹消息数据,即rocketmq集群只有一个broker配置启动消息轨迹功能,且这个节点不承担正常业务数据。

比如,目前已经搭建好的集群 cluster_a ,你需要新增一个单独的Broker(或者集群)来专门用于存储消息轨迹

  1. 创建一个新的Broker, 修改几个配置项,创建独立的集群名称、开启轨迹消息、不允许自动创建Topic

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
     
    # 集群名称,同一个集群的Broker,该配置需要相同;默认 :DefaultCluster
    brokerClusterName=集群名称(消息轨迹相关设置为独立的一个集群)
    #是否开启消息跟踪;默认false
    traceTopicEnable=true
    #是否允许 Broker 自动创建Topic;
    autoCreateTopicEnable=false

    ## Broker端收到消息会将该值返回给客户端,客户端根据该值判断是否需要发送轨迹消息,意思是只有发到该Broker中的消息才会有轨迹消息 ,默认false
    traceOn=true

  2. 将正常用于业务的集群的消息轨迹配置traceTopicEnable关闭

    1
    2
    3
    4
    traceTopicEnable=false
    ## 允许写入该Broker的消息 后续能发送轨迹消息
    traceOn=true

  3. 如果正常业务集群之前开启过traceTopicEnable, 那么他们本地配置中会持久化过消息轨迹Topic RMQ_SYS_TRACE_TOPIC, 就是配置了traceTopicEnable=false 那么在初始化的时候也会从本地磁盘中加载该Topic,导致还是能够接受到轨迹消息,所以我们需要主动的去这些Broker里面删除轨迹消息的Topic。
    方式一:直接在对应的Broker删除轨迹消息Topic
    手动方式
    可以查询他们之前都有分配在哪些Broker里,然后去对应的Broker配置topics.json 中把该Topic删除掉。例如

    命令行方式

    1
    sh mqadmin deleteTopic -n=127.0.0.1:9876 -t=RMQ_SYS_TRACE_TOPIC   -c=集群名

    方式二更新RMQ_SYS_TRACE_TOPIC权限为只读
    命令行方式

    1
    2
    3
        
    sh mqadmin updateTopic -n=127.0.0.1:9876 -t=RMQ_SYS_TRACE_TOPIC -c=szz_cluster_a -p 4

    控制台方式也可以直接操作修改

    但是上述操作需要支持修改系统Topic, 对应的Broker配置validateSystemTopicWhenUpdateTopic=false(允许修改系统配置)

PS: 将轨迹消息存放在单独的Broker中,一定要检查上面的第三步骤,不然达不到效果

异步刷盘

消息轨迹这类型的消息不太需要保证消息的可靠性, 但是它需要能够支持比较大的吞吐量;

所以可以根据这个,自己按需调整和优化

比如刷盘策略可以选择更为激进的 异步刷盘+缓冲区

  1. 同步刷盘 flushDiskType=SYNC_FLUSH
  2. 异步刷盘 flushDiskType=ASYNC_FLUSH
  3. 异步刷盘+缓冲区flushDiskType=ASYNC_FLUSH transientStorePoolEnable=true

单副本

没有必要设置多副本、或者主从的形式、

FAQ

如果是延迟消息,还没有到消费的时间,消息轨迹能查询到吗?消息能查询到吗?

都不能,只有等到了消费时间才可以查询到。

如果Broker中的traceOn不一致会怎么样?

有一个点必须要了解清楚,Broker中的traceOn配置不是代表轨迹消息能不能写入到该Broker中,而是代表着当前写入该Broker中的消息能不能写轨迹消息,它是把traceOn的状态返回给客户端,然后由客户端来决定是否继续发起轨迹消息,而具体该估计消息会发往哪个队列中,则是按照轮训策略选择。

所以假设如果你不想让轨迹消息写入到某个Broker中,那么只需要让它不要注册轨迹Topic就行了