关于源码:探秘RocketMQ源码Series1Producer视角看事务消息

28次阅读

共计 14923 个字符,预计需要花费 38 分钟才能阅读完成。

简介:探秘 RocketMQ 源码——Series1:Producer 视角看事务音讯

1. 前言

Apache RocketMQ 作为广为人知的开源消息中间件,诞生于阿里巴巴,于 2016 年捐献给了 Apache。从 RocketMQ 4.0 到现在最新的 v4.7.1,不论是在阿里巴巴外部还是内部社区,都博得了宽泛的关注和好评。
出于趣味和工作的须要,近期自己对 RocketMQ 4.7.1 的局部代码进行了研读,其间产生了很多困惑,也播种了更多的启发。

本文将站在发送方视角,通过浏览 RocketMQ Producer 源码,来剖析在事务音讯发送中 RocketMQ 是如何工作的。须要阐明的是,本文所贴代码,均来自 4.7.1 版本的 RocketMQ 源码。本文中所探讨的发送,仅指从 Producer 发送到 Broker 的过程,并不蕴含 Broker 将音讯投递到 Consumer 的过程。

2. 宏观概览

RocketMQ 事务音讯发送流程:

图 1

联合源码来看,RocketMQ 的事务音讯 TransactionMQProducer 的 sendMessageInTransaction 办法,理论调用了 DefaultMQProducerImpl 的 sendMessageInTransaction 办法。咱们进入 sendMessageInTransaction 办法,整个事务音讯的发送流程清晰可见:

首先,做发送前查看,并填入必要参数,包含设 prepare 事务音讯。

源码清单 -1

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {TransactionListener transactionListener = getCheckListener(); 
        if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

进入发送解决流程:

源码清单 -2

 try {sendResult = this.send(msg);
    } catch (Exception e) {throw new MQClientException("send message Exception", e);
    }

依据 broker 返回的处理结果决策本地事务是否执行,半音讯发送胜利则开始本地事务执行:

源码清单 -3

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {log.debug("Used new transaction API");
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg); 
                }
                if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:  // 当备 broker 状态不可用时,半音讯要回滚,不执行本地事务
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

本地事务执行完结,依据本地事务状态进行二阶段解决:

源码清单 -4

try {this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {log.warn("local transaction execute" + localTransactionState + ", but end broker transaction failed", e);
    }

    // 组装发送后果
    // ...
    return transactionSendResult;
}

接下来,咱们深刻每个阶段代码剖析。

3. 深扒底细

3.1 一阶段发送

重点剖析 send 办法。进入 send 办法后,咱们发现,RocketMQ 的事务音讯的一阶段,应用了 SYNC 同步模式:

源码清单 -5

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

这一点很容易了解,毕竟事务音讯是要依据一阶段发送后果来决定要不要执行本地事务的,所以肯定要阻塞期待 broker 的 ack。

咱们进入 DefaultMQProducerImpl.java 中去看 sendDefaultImpl 办法的实现,通过读这个办法的代码,来尝试理解在事务音讯的一阶段发送过程中 producer 的行为。值得注意的是,这个办法并非为事务音讯定制,甚至不是为 SYNC 同步模式定制的,因而读懂了这段代码,根本能够对 RocketMQ 的音讯发送机制有了一个较为全面的意识。

这段代码逻辑十分通顺,不忍切片。为了节俭篇幅,将代码中较为繁冗但信息量不大的局部以正文代替,尽可能保留流程的完整性。集体认为较为重要或是容易被疏忽的局部,以正文标出,后文还有局部细节的具体解读。

源码清单 -6

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();
    // 一、音讯有效性校验。见后文
    Validators.checkMessage(msg, this.defaultMQProducer);
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;

    // 获取以后 topic 的发送路由信息,次要是要 broker,如果没找到则从 namesrv 获取
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        // 二、发送重试机制。见后文
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            // 第一次发送是 mq == null,之后都是有 broker 信息的
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 三、rocketmq 发送音讯时如何抉择队列?——broker 异样躲避机制 
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    // 发送外围代码
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // rocketmq 抉择 broker 时的躲避机制,开启 sendLatencyFaultEnable == true 才失效
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

                    switch (communicationMode) {
                    // 四、RocketMQ 的三种 CommunicationMode。见后文
                        case ASYNC: // 异步模式
                            return null;
                        case ONEWAY: // 单向模式
                            return null;
                        case SYNC: // 同步模式
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    // ...
                    // 主动重试
                } catch (MQClientException e) {
                    // ...
                    // 主动重试
                } catch (MQBrokerException e) {
                   // ...
                    // 仅返回码 ==NOT_IN_CURRENT_UNIT==205 时主动重试
                    // 其余状况不重试,抛异样
                } catch (InterruptedException e) {
                   // ...
                    // 不重试,抛异样
                }
            } else {break;}
        }

        if (sendResult != null) {return sendResult;}

        // 组装返回的 info 信息,最初以 MQClientException 抛出
        // ... ...

        // 超时场景抛 RemotingTooMuchRequestException
        if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        // 填充 MQClientException 异样信息
        // ...
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic:" + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

3.1.1 音讯有效性校验

源码清单 -7

Validators.checkMessage(msg, this.defaultMQProducer);
在此办法中校验音讯的有效性,包含对 topic 和音讯体的校验。topic 的命名必须符合规范,且防止应用内置的零碎音讯 TOPIC。音讯体长度 > 0 && 音讯体长度 <= 102410244 = 4M。

源码清单 -8

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
    throws MQClientException {if (null == msg) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // topic
    Validators.checkTopic(msg.getTopic());
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // body
    if (null == msg.getBody()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }

    if (0 == msg.getBody().length) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }

    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX:" + defaultMQProducer.getMaxMessageSize());
    }
}

3.1.2 发送重试机制

Producer 在音讯发送不胜利时,会主动重试,最多发送次数 = retryTimesWhenSendFailed + 1 = 3 次。

值得注意的是,并非所有异常情况都会重试,从以上源码中能够提取到的信息通知咱们,在以下三种状况下,会主动重试:
1)产生 RemotingException,MQClientException 两种异样之一时。
2)产生 MQBrokerException 异样,且 ResponseCode 是 NOT_IN_CURRENT_UNIT = 205 时。
3)SYNC 模式下,未产生异样且发送后果状态非 SEND_OK。

在每次发送音讯之前,会先查看是否在后面这两步就曾经耗时超长(超时时长默认 3000ms),若是,则不再持续发送并且间接返回超时,不再重试。这里阐明了 2 个问题:
1)producer 外部主动重试对业务利用而言是无感知的,利用看到的发送耗时是蕴含所有重试的耗时在内的;
2)一旦超时意味着本次音讯发送曾经以失败告终,起因是超时。这个信息最初会以 RemotingTooMuchRequestException 的模式抛出。

这里须要指出的是,在 RocketMQ 官网文档中指出,发送超时时长是 10s,即 10000ms,网上许多人对 rocketMQ 的超时工夫解读也认为是 10s。然而代码中却明明白白写着 3000ms,最终我 debug 之后确认,默认超时工夫的确是 3000ms。这里也倡议 RocketMQ 团队对文档进行确认,如确有误,还是早日更正为好。

图 2

3.1.3 broker 的异样躲避机制

源码清单 -8

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

这行代码是发送前抉择 queue 的过程。

这里波及 RocketMQ 音讯发送高可用的的一个外围机制,latencyFaultTolerance。这个机制是 Producer 负载平衡的一部分,通过 sendLatencyFaultEnable 的值来管制,默认是 false 敞开状态,不启动 broker 故障提早机制,值为 true 时启用 broker 故障提早机制,可由 Producer 被动关上。

抉择队列时,开启异样躲避机制,则依据 broker 的工作状态防止抉择以后状态不佳的 broker 代理,不衰弱的 broker 会在一段时间内被躲避,不开启异样躲避机制时,则按程序选取下一个队列,但在重试场景下会尽量抉择不同于上次发送 broker 的 queue。每次音讯发送都会通过 updateFaultItem 办法来保护 broker 的状态信息。

源码清单 -9

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {
        // 计算提早多久,isolation 示意是否须要隔离该 broker,若是,则从 30s 往前找第一个比 30s 小的提早值,再按下标判断躲避的周期,若 30s,则是 10min 躲避;// 否则,按上一次发送耗时来决定躲避时长;long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}  

深刻到 selectOneMessageQueue 办法外部一探到底:

源码清单 -10

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {
        // 开启异样躲避
        try {int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                // 按程序取下一个 message queue 作为发送的 queue
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 以后 queue 所在的 broker 可用,且与上一个 queue 的 broker 雷同,// 或者第一次发送,则应用这个 queue
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();}
    // 不开启异样躲避,则随机自增抉择 Queue
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

3.1.4 RocketMQ 的三种 CommunicationMode

源码清单 -11

public enum CommunicationMode {
    SYNC,
    ASYNC,
    ONEWAY,
}

以上三种模式指的都是音讯从发送方达到 broker 的阶段,不蕴含 broker 将音讯投递给订阅方的过程。

三种模式的发送形式的差别:

单向模式:ONEWAY。音讯发送方只管发送,并不关怀 broker 解决的后果如何。这种模式下,因为解决流程少,发送耗时十分小,吞吐量大,但不能保障音讯牢靠不丢,罕用于流量微小但不重要的音讯场景,例如心跳发送等。

异步模式:ASYNC。音讯发送方发送音讯到 broker 后,无需期待 broker 解决,拿到的是 null 的返回值,而由一个异步的线程来做音讯解决,解决实现后以回调的模式通知发送方发送后果。异步解决时如有异样,返回发送方失败后果之前,会通过外部重试(默认 3 次,发送方不感知)。这种模式下,发送方期待时长较小,吞吐量较大,音讯牢靠,用于流量大但重要的音讯场景。

同步模式:SYNC。音讯发送方需期待 broker 解决实现并明确返回胜利或失败,在音讯发送方拿到音讯发送失败的后果之前,也会经验过外部重试(默认 3 次,发送方不感知)。这种模式下,发送方会阻塞期待音讯处理结果,期待时长较长,音讯牢靠,用于流量不大但重要的音讯场景。须要强调的是,事务音讯的一阶段半事务音讯的解决是同步模式。

在 sendKernelImpl 办法中也能够看到具体的实现差别。ONEWAY 模式最为简略,不做任何解决。负责发送的 sendMessage 办法参数中,相比同步模式,异步模式多了回调办法、蕴含 topic 发送路由元信息的 topicPublishInfo、蕴含发送 broker 信息的 instance、蕴含发送队列信息的 producer、重试次数。另外,异步模式下,会对有压缩的音讯先做 copy。

源码清单 -12

switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            } 

官网文档中有这样一张图,非常清晰的形容了异步通信的具体过程:

图 3

3.2 二阶段发送

源码清单 - 3 体现了本地事务的执行,localTransactionState 将本地事务执行后果与事务音讯二阶段的发送关联起来。

值得注意的是,如果一阶段的发送后果是 SLAVE_NOT_AVAILABLE,即备 broker 不可用时,也会将 localTransactionState 置为 Rollback,此时将不会执行本地事务。之后由 endTransaction 办法负责二阶段提交,见源码清单 -4。具体到 endTransaction 的实现:

源码清单 -13

public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception:" + localException.toString()) : null;
    // 采纳 oneway 的形式发送二阶段音讯
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

在二阶段发送时,之所以用 oneway 的形式发送,集体了解这正是因为事务音讯有一个非凡的牢靠机制——回查。

3.3 音讯回复

当 Broker 通过了一个特定的工夫,发现仍然没有失去事务音讯的二阶段是否要提交或者回滚的确切信息,Broker 不晓得 Producer 产生了什么状况(可能 producer 挂了,也可能 producer 发了 commit 但网络抖动丢了,也可能 …),于是被动发动回查。

事务音讯的回查机制,更多的是在 broker 端的体现。RocketMQ 的 broker 以 Half 音讯、Op 音讯、实在音讯三个不同的 topic 来将不同发送阶段的事务音讯进行了隔离,使得 Consumer 只能看到最终确认 commit 须要投递进来的音讯。其中具体的实现逻辑在本文中暂不多赘述,后续可另开一篇专门来从 Broker 视角来解读。

回到 Producer 的视角,当收到了 Broker 的回查申请,Producer 将依据音讯查看本地事务状态,依据后果决定提交或回滚,这就要求 Producer 必须指定回查实现,以备不时之需。

当然,失常状况下,并不举荐被动发送 UNKNOW 状态,这个状态毫无疑问会给 broker 带来额定回查开销,只在呈现不可预知的异常情况时才启动回查机制,是一种比拟正当的抉择。

另外,4.7.1 版本的事务回查并非有限回查,而是最多回查 15 次:

源码清单 -14

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 15;

附录

官网给出 Producer 的默认参数如下(其中超时时长的参数,在前文中也曾经提到,debug 的后果是默认 3000ms,并非 10000ms):

图 4

RocketMQ 作为一款优良的开源消息中间件,有很多开发者基于它做了二次开发,例如蚂蚁团体商业化产品 SOFAStack MQ 音讯队列,就是基于 RocketMQ 内核进行的再次开发的金融级消息中间件,在音讯管控、通明运维等方面做了大量优良的工作。

愿 RocketMQ 在社区宽广开发者的共创共建之下,可能一直发展壮大,爆发更强的生命力。
原文链接

本文为阿里云原创内容,未经容许不得转载。

正文完
 0