简介:探秘RocketMQ源码——Series1:Producer视角看事务音讯

1. 前言

Apache RocketMQ作为广为人知的开源消息中间件,诞生于阿里巴巴,于2016年捐献给了Apache。从RocketMQ 4.0到现在最新的v4.7.1,不论是在阿里巴巴外部还是内部社区,都博得了宽泛的关注和好评。
出于趣味和工作的须要,近期自己对RocketMQ 4.7.1的局部代码进行了研读,其间产生了很多困惑,也播种了更多的启发。

本文将站在发送方视角,通过浏览RocketMQ Producer源码,来剖析在事务音讯发送中RocketMQ是如何工作的。须要阐明的是,本文所贴代码,均来自4.7.1版本的RocketMQ源码。本文中所探讨的发送,仅指从Producer发送到Broker的过程,并不蕴含Broker将音讯投递到Consumer的过程。

2. 宏观概览

RocketMQ事务音讯发送流程:


图1

联合源码来看,RocketMQ的事务音讯TransactionMQProducer的sendMessageInTransaction办法,理论调用了DefaultMQProducerImpl的sendMessageInTransaction办法。咱们进入sendMessageInTransaction办法,整个事务音讯的发送流程清晰可见:

首先,做发送前查看,并填入必要参数,包含设prepare事务音讯。

源码清单-1

public TransactionSendResult sendMessageInTransaction(final Message msg,    final LocalTransactionExecuter localTransactionExecuter, final Object arg)    throws MQClientException {    TransactionListener transactionListener = getCheckListener();         if (null == localTransactionExecuter && null == transactionListener) {        throw new MQClientException("tranExecutor is null", null);    }    // ignore DelayTimeLevel parameter    if (msg.getDelayTimeLevel() != 0) {        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);    }    Validators.checkMessage(msg, this.defaultMQProducer);    SendResult sendResult = null;    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

进入发送解决流程:

源码清单-2

    try {        sendResult = this.send(msg);    } catch (Exception e) {        throw new MQClientException("send message Exception", e);    }

依据broker返回的处理结果决策本地事务是否执行,半音讯发送胜利则开始本地事务执行:

源码清单-3

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;    Throwable localException = null;    switch (sendResult.getSendStatus()) {        case SEND_OK: {            try {                if (sendResult.getTransactionId() != null) {                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());                }                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                if (null != transactionId && !"".equals(transactionId)) {                    msg.setTransactionId(transactionId);                }                if (null != localTransactionExecuter) {                     localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                } else if (transactionListener != null) {                     log.debug("Used new transaction API");                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                 }                if (null == localTransactionState) {                    localTransactionState = LocalTransactionState.UNKNOW;                }                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {                    log.info("executeLocalTransactionBranch return {}", localTransactionState);                    log.info(msg.toString());                }            } catch (Throwable e) {                log.info("executeLocalTransactionBranch exception", e);                log.info(msg.toString());                localException = e;            }        }        break;        case FLUSH_DISK_TIMEOUT:        case FLUSH_SLAVE_TIMEOUT:        case SLAVE_NOT_AVAILABLE:  // 当备broker状态不可用时,半音讯要回滚,不执行本地事务            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;            break;        default:            break;    }

本地事务执行完结,依据本地事务状态进行二阶段解决:

源码清单-4

    try {        this.endTransaction(sendResult, localTransactionState, localException);    } catch (Exception e) {        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);    }    // 组装发送后果    // ...    return transactionSendResult;}

接下来,咱们深刻每个阶段代码剖析。

3. 深扒底细

3.1 一阶段发送

重点剖析send办法。进入send办法后,咱们发现,RocketMQ的事务音讯的一阶段,应用了SYNC同步模式:

源码清单-5

public SendResult send(Message msg,    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}

这一点很容易了解,毕竟事务音讯是要依据一阶段发送后果来决定要不要执行本地事务的,所以肯定要阻塞期待broker的ack。

咱们进入DefaultMQProducerImpl.java中去看sendDefaultImpl办法的实现,通过读这个办法的代码,来尝试理解在事务音讯的一阶段发送过程中producer的行为。 值得注意的是,这个办法并非为事务音讯定制,甚至不是为SYNC同步模式定制的,因而读懂了这段代码,根本能够对RocketMQ的音讯发送机制有了一个较为全面的意识。
这段代码逻辑十分通顺,不忍切片。为了节俭篇幅,将代码中较为繁冗但信息量不大的局部以正文代替,尽可能保留流程的完整性。集体认为较为重要或是容易被疏忽的局部,以正文标出,后文还有局部细节的具体解读。

源码清单-6

private SendResult sendDefaultImpl(    Message msg,    final CommunicationMode communicationMode,    final SendCallback sendCallback,    final long timeout    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    this.makeSureStateOK();    // 一、音讯有效性校验。见后文    Validators.checkMessage(msg, this.defaultMQProducer);    final long invokeID = random.nextLong();    long beginTimestampFirst = System.currentTimeMillis();    long beginTimestampPrev = beginTimestampFirst;    long endTimestamp = beginTimestampFirst;    // 获取以后topic的发送路由信息,次要是要broker,如果没找到则从namesrv获取    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());    if (topicPublishInfo != null && topicPublishInfo.ok()) {        boolean callTimeout = false;        MessageQueue mq = null;        Exception exception = null;        SendResult sendResult = null;        // 二、发送重试机制。见后文        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;        int times = 0;        String[] brokersSent = new String[timesTotal];        for (; times < timesTotal; times++) {            // 第一次发送是mq == null, 之后都是有broker信息的            String lastBrokerName = null == mq ? null : mq.getBrokerName();            // 三、rocketmq发送音讯时如何抉择队列?——broker异样躲避机制             MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);            if (mqSelected != null) {                mq = mqSelected;                brokersSent[times] = mq.getBrokerName();                try {                    beginTimestampPrev = System.currentTimeMillis();                    if (times > 0) {                        //Reset topic with namespace during resend.                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));                    }                    long costTime = beginTimestampPrev - beginTimestampFirst;                    if (timeout < costTime) {                        callTimeout = true;                        break;                    }                    // 发送外围代码                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                    endTimestamp = System.currentTimeMillis();                    // rocketmq 抉择 broker 时的躲避机制,开启 sendLatencyFaultEnable == true 才失效                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                    switch (communicationMode) {                    // 四、RocketMQ的三种CommunicationMode。见后文                        case ASYNC: // 异步模式                            return null;                        case ONEWAY: // 单向模式                            return null;                        case SYNC: // 同步模式                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                    continue;                                }                            }                            return sendResult;                        default:                            break;                    }                } catch (RemotingException e) {                    // ...                    // 主动重试                } catch (MQClientException e) {                    // ...                    // 主动重试                } catch (MQBrokerException e) {                   // ...                    // 仅返回码==NOT_IN_CURRENT_UNIT==205 时主动重试                    // 其余状况不重试,抛异样                } catch (InterruptedException e) {                   // ...                    // 不重试,抛异样                }            } else {                break;            }        }        if (sendResult != null) {            return sendResult;        }        // 组装返回的info信息,最初以MQClientException抛出        // ... ...        // 超时场景抛RemotingTooMuchRequestException        if (callTimeout) {            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");        }        // 填充MQClientException异样信息        // ...    }    validateNameServerSetting();    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}

3.1.1 音讯有效性校验

源码清单-7

 Validators.checkMessage(msg, this.defaultMQProducer);

在此办法中校验音讯的有效性,包含对topic和音讯体的校验。topic的命名必须符合规范,且防止应用内置的零碎音讯TOPIC。音讯体长度 > 0 && 音讯体长度 <= 1024_1024_4 = 4M 。

源码清单-8

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)    throws MQClientException {    if (null == msg) {        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");    }    // topic    Validators.checkTopic(msg.getTopic());    Validators.isNotAllowedSendTopic(msg.getTopic());    // body    if (null == msg.getBody()) {        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");    }    if (0 == msg.getBody().length) {        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");    }    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());    }}

3.1.2 发送重试机制

Producer在音讯发送不胜利时,会主动重试,最多发送次数 = retryTimesWhenSendFailed + 1 = 3次 。

值得注意的是,并非所有异常情况都会重试,从以上源码中能够提取到的信息通知咱们,在以下三种状况下,会主动重试:
1)产生RemotingException,MQClientException两种异样之一时。
2)产生MQBrokerException异样,且ResponseCode是NOT\_IN\_CURRENT\_UNIT = 205时。
3)SYNC模式下,未产生异样且发送后果状态非 SEND\_OK。

在每次发送音讯之前,会先查看是否在后面这两步就曾经耗时超长(超时时长默认3000ms),若是,则不再持续发送并且间接返回超时,不再重试。这里阐明了2个问题:
1)producer外部主动重试对业务利用而言是无感知的,利用看到的发送耗时是蕴含所有重试的耗时在内的;
2)一旦超时意味着本次音讯发送曾经以失败告终,起因是超时。这个信息最初会以RemotingTooMuchRequestException的模式抛出。

这里须要指出的是,在RocketMQ官网文档中指出,发送超时时长是10s,即10000ms,网上许多人对rocketMQ的超时工夫解读也认为是10s。然而代码中却明明白白写着3000ms,最终我debug之后确认,默认超时工夫的确是3000ms。这里也倡议RocketMQ团队对文档进行确认,如确有误,还是早日更正为好。


图2

3.1.3 broker的异样躲避机制

源码清单-8

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);  

这行代码是发送前抉择queue的过程。

这里波及RocketMQ音讯发送高可用的的一个外围机制,latencyFaultTolerance。这个机制是Producer负载平衡的一部分,通过sendLatencyFaultEnable的值来管制,默认是false敞开状态,不启动broker故障提早机制,值为true时启用broker故障提早机制,可由Producer被动关上。

抉择队列时,开启异样躲避机制,则依据broker的工作状态防止抉择以后状态不佳的broker代理,不衰弱的broker会在一段时间内被躲避,不开启异样躲避机制时,则按程序选取下一个队列,但在重试场景下会尽量抉择不同于上次发送broker的queue。每次音讯发送都会通过updateFaultItem办法来保护broker的状态信息。

源码清单-9

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {    if (this.sendLatencyFaultEnable) {        // 计算提早多久,isolation示意是否须要隔离该broker,若是,则从30s往前找第一个比30s小的提早值,再按下标判断躲避的周期,若30s,则是10min躲避;        // 否则,按上一次发送耗时来决定躲避时长;        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);    }}  

深刻到selectOneMessageQueue办法外部一探到底:

源码清单-10

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    if (this.sendLatencyFaultEnable) {        // 开启异样躲避        try {            int index = tpInfo.getSendWhichQueue().getAndIncrement();            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                if (pos < 0)                    pos = 0;                // 按程序取下一个message queue作为发送的queue                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                // 以后queue所在的broker可用,且与上一个queue的broker雷同,                // 或者第一次发送,则应用这个queue                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                        return mq;                }            }            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);            if (writeQueueNums > 0) {                final MessageQueue mq = tpInfo.selectOneMessageQueue();                if (notBestBroker != null) {                    mq.setBrokerName(notBestBroker);                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                }                return mq;            } else {                latencyFaultTolerance.remove(notBestBroker);            }        } catch (Exception e) {            log.error("Error occurred when selecting message queue", e);        }        return tpInfo.selectOneMessageQueue();    }    // 不开启异样躲避,则随机自增抉择Queue    return tpInfo.selectOneMessageQueue(lastBrokerName);}

3.1.4 RocketMQ的三种CommunicationMode

源码清单-11

 public enum CommunicationMode {    SYNC,    ASYNC,    ONEWAY,}

以上三种模式指的都是音讯从发送方达到broker的阶段,不蕴含broker将音讯投递给订阅方的过程。
三种模式的发送形式的差别:

  • 单向模式:ONEWAY。音讯发送方只管发送,并不关怀broker解决的后果如何。这种模式下,因为解决流程少,发送耗时十分小,吞吐量大,但不能保障音讯牢靠不丢,罕用于流量微小但不重要的音讯场景,例如心跳发送等。
  • 异步模式:ASYNC。音讯发送方发送音讯到broker后,无需期待broker解决,拿到的是null的返回值,而由一个异步的线程来做音讯解决,解决实现后以回调的模式通知发送方发送后果。异步解决时如有异样,返回发送方失败后果之前,会通过外部重试(默认3次,发送方不感知)。这种模式下,发送方期待时长较小,吞吐量较大,音讯牢靠,用于流量大但重要的音讯场景。
  • 同步模式:SYNC。音讯发送方需期待broker解决实现并明确返回胜利或失败,在音讯发送方拿到音讯发送失败的后果之前,也会经验过外部重试(默认3次,发送方不感知)。这种模式下,发送方会阻塞期待音讯处理结果,期待时长较长,音讯牢靠,用于流量不大但重要的音讯场景。须要强调的是,事务音讯的一阶段半事务音讯的解决是同步模式。

在sendKernelImpl办法中也能够看到具体的实现差别。ONEWAY模式最为简略,不做任何解决。负责发送的sendMessage办法参数中,相比同步模式,异步模式多了回调办法、蕴含topic发送路由元信息的topicPublishInfo、蕴含发送broker信息的instance、蕴含发送队列信息的producer、重试次数。另外,异步模式下,会对有压缩的音讯先做copy。

源码清单-12

    switch (communicationMode) {                case ASYNC:                    Message tmpMessage = msg;                    boolean messageCloned = false;                    if (msgBodyCompressed) {                        //If msg body was compressed, msgbody should be reset using prevBody.                        //Clone new message using commpressed message body and recover origin massage.                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66                        tmpMessage = MessageAccessor.cloneMessage(msg);                        messageCloned = true;                        msg.setBody(prevBody);                    }                    if (topicWithNamespace) {                        if (!messageCloned) {                            tmpMessage = MessageAccessor.cloneMessage(msg);                            messageCloned = true;                        }                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));                    }                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;                    if (timeout < costTimeAsync) {                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                    }                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                        brokerAddr,                        mq.getBrokerName(),                        tmpMessage,                        requestHeader,                        timeout - costTimeAsync,                        communicationMode,                        sendCallback,                        topicPublishInfo,                        this.mQClientFactory,                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),                        context,                        this);                    break;                case ONEWAY:                case SYNC:                    long costTimeSync = System.currentTimeMillis() - beginStartTime;                    if (timeout < costTimeSync) {                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                    }                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                        brokerAddr,                        mq.getBrokerName(),                        msg,                        requestHeader,                        timeout - costTimeSync,                        communicationMode,                        context,                        this);                    break;                default:                    assert false;                    break;            } 

官网文档中有这样一张图,非常清晰的形容了异步通信的具体过程:


图3

3.2 二阶段发送

源码清单-3体现了本地事务的执行,localTransactionState将本地事务执行后果与事务音讯二阶段的发送关联起来。
值得注意的是,如果一阶段的发送后果是SLAVE\_NOT\_AVAILABLE,即备broker不可用时,也会将localTransactionState置为Rollback,此时将不会执行本地事务。之后由endTransaction办法负责二阶段提交,见源码清单-4。具体到endTransaction的实现:

源码清单-13

public void endTransaction(    final SendResult sendResult,    final LocalTransactionState localTransactionState,    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {    final MessageId id;    if (sendResult.getOffsetMsgId() != null) {        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());    } else {        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());    }    String transactionId = sendResult.getTransactionId();    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();    requestHeader.setTransactionId(transactionId);    requestHeader.setCommitLogOffset(id.getOffset());    switch (localTransactionState) {        case COMMIT_MESSAGE:            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);            break;        case ROLLBACK_MESSAGE:            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);            break;        case UNKNOW:            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);            break;        default:            break;    }    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());    requestHeader.setMsgId(sendResult.getMsgId());    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;    // 采纳oneway的形式发送二阶段音讯    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,        this.defaultMQProducer.getSendMsgTimeout());}

在二阶段发送时,之所以用oneway的形式发送,集体了解这正是因为事务音讯有一个非凡的牢靠机制——回查。

3.3 音讯回复

当Broker通过了一个特定的工夫,发现仍然没有失去事务音讯的二阶段是否要提交或者回滚的确切信息,Broker不晓得Producer产生了什么状况(可能producer挂了,也可能producer发了commit但网络抖动丢了,也可能...),于是被动发动回查。
事务音讯的回查机制,更多的是在broker端的体现。RocketMQ的broker以Half音讯、Op音讯、实在音讯三个不同的topic来将不同发送阶段的事务音讯进行了隔离,使得Consumer只能看到最终确认commit须要投递进来的音讯。其中具体的实现逻辑在本文中暂不多赘述,后续可另开一篇专门来从Broker视角来解读。

回到Producer的视角,当收到了Broker的回查申请,Producer将依据音讯查看本地事务状态,依据后果决定提交或回滚,这就要求Producer必须指定回查实现,以备不时之需。
当然,失常状况下,并不举荐被动发送UNKNOW状态,这个状态毫无疑问会给broker带来额定回查开销,只在呈现不可预知的异常情况时才启动回查机制,是一种比拟正当的抉择。

另外,4.7.1版本的事务回查并非有限回查,而是最多回查15次:

源码清单-14

/** * The maximum number of times the message was checked, if exceed this value, this message will be discarded. */@ImportantFieldprivate int transactionCheckMax = 15;

附录

官网给出Producer的默认参数如下(其中超时时长的参数,在前文中也曾经提到,debug的后果是默认3000ms,并非10000ms):


图4

RocketMQ作为一款优良的开源消息中间件,有很多开发者基于它做了二次开发,例如蚂蚁团体商业化产品SOFAStack MQ音讯队列,就是基于RocketMQ内核进行的再次开发的金融级消息中间件,在音讯管控、通明运维等方面做了大量优良的工作。
愿RocketMQ在社区宽广开发者的共创共建之下,可能一直发展壮大,爆发更强的生命力。

咱们是阿里云智能寰球技术服务-SRE团队,咱们致力成为一个以技术为根底、面向服务、保障业务零碎高可用的工程师团队;提供业余、体系化的SRE服务,帮忙广大客户更好地应用云、基于云构建更加稳固牢靠的业务零碎,晋升业务稳定性。咱们冀望可能分享更多帮忙企业客户上云、用好云,让客户云上业务运行更加稳固牢靠的技术,您可用钉钉扫描下方二维码,退出阿里云SRE技术学院钉钉圈子,和更多云上人交换对于云平台的那些事。

版权申明:本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。