本文主要研究一下rocketmq的retryTimesWhenSendFailed

sendDefaultImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {    private final InternalLogger log = ClientLogger.getLog();    private final Random random = new Random();    private final DefaultMQProducer defaultMQProducer;    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =        new ConcurrentHashMap<String, TopicPublishInfo>();    private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();    private final RPCHook rpcHook;    protected BlockingQueue<Runnable> checkRequestQueue;    protected ExecutorService checkExecutor;    private ServiceState serviceState = ServiceState.CREATE_JUST;    private MQClientInstance mQClientFactory;    private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();    private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();    private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;    private final ExecutorService defaultAsyncSenderExecutor;    private ExecutorService asyncSenderExecutor;    //......    private SendResult sendDefaultImpl(        Message msg,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final long timeout    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        this.makeSureStateOK();        Validators.checkMessage(msg, this.defaultMQProducer);        final long invokeID = random.nextLong();        long beginTimestampFirst = System.currentTimeMillis();        long beginTimestampPrev = beginTimestampFirst;        long endTimestamp = beginTimestampFirst;        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());        if (topicPublishInfo != null && topicPublishInfo.ok()) {            boolean callTimeout = false;            MessageQueue mq = null;            Exception exception = null;            SendResult sendResult = null;            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;            int times = 0;            String[] brokersSent = new String[timesTotal];            for (; times < timesTotal; times++) {                String lastBrokerName = null == mq ? null : mq.getBrokerName();                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);                if (mqSelected != null) {                    mq = mqSelected;                    brokersSent[times] = mq.getBrokerName();                    try {                        beginTimestampPrev = System.currentTimeMillis();                        if (times > 0) {                            //Reset topic with namespace during resend.                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));                        }                        long costTime = beginTimestampPrev - beginTimestampFirst;                        if (timeout < costTime) {                            callTimeout = true;                            break;                        }                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                        switch (communicationMode) {                            case ASYNC:                                return null;                            case ONEWAY:                                return null;                            case SYNC:                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                        continue;                                    }                                }                                return sendResult;                            default:                                break;                        }                    } catch (RemotingException e) {                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        continue;                    } catch (MQClientException e) {                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        continue;                    } catch (MQBrokerException e) {                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        switch (e.getResponseCode()) {                            case ResponseCode.TOPIC_NOT_EXIST:                            case ResponseCode.SERVICE_NOT_AVAILABLE:                            case ResponseCode.SYSTEM_ERROR:                            case ResponseCode.NO_PERMISSION:                            case ResponseCode.NO_BUYER_ID:                            case ResponseCode.NOT_IN_CURRENT_UNIT:                                continue;                            default:                                if (sendResult != null) {                                    return sendResult;                                }                                throw e;                        }                    } catch (InterruptedException e) {                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        log.warn("sendKernelImpl exception", e);                        log.warn(msg.toString());                        throw e;                    }                } else {                    break;                }            }            if (sendResult != null) {                return sendResult;            }            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",                times,                System.currentTimeMillis() - beginTimestampFirst,                msg.getTopic(),                Arrays.toString(brokersSent));            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);            MQClientException mqClientException = new MQClientException(info, exception);            if (callTimeout) {                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");            }            if (exception instanceof MQBrokerException) {                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());            } else if (exception instanceof RemotingConnectException) {                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);            } else if (exception instanceof RemotingTimeoutException) {                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);            } else if (exception instanceof MQClientException) {                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);            }            throw mqClientException;        }        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();        if (null == nsList || nsList.isEmpty()) {            throw new MQClientException(                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);        }        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);    }    //......}
  • sendDefaultImpl方法通过tryToFindTopicPublishInfo找到topicPublishInfo,如果不为null且是ok的,则根据communicationMode计算timesTotal,其中是CommunicationMode.SYNC的话,timesTotal为1 + defaultMQProducer.getRetryTimesWhenSendFailed(),否则为1;之后就是最多循环timesTotal次执行sendKernelImpl,RemotingException异常及MQBrokerException异常中responseCode为TOPIC_NOT_EXIST、SERVICE_NOT_AVAILABLE、SYSTEM_ERROR、NO_PERMISSION、NO_BUYER_ID、NOT_IN_CURRENT_UNIT或者是sendResult.getSendStatus()不等于SendStatus.SEND_OK的会重试

sendKernelImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {    private final InternalLogger log = ClientLogger.getLog();    private final Random random = new Random();    private final DefaultMQProducer defaultMQProducer;    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =        new ConcurrentHashMap<String, TopicPublishInfo>();    private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();    private final RPCHook rpcHook;    protected BlockingQueue<Runnable> checkRequestQueue;    protected ExecutorService checkExecutor;    private ServiceState serviceState = ServiceState.CREATE_JUST;    private MQClientInstance mQClientFactory;    private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();    private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();    private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;    private final ExecutorService defaultAsyncSenderExecutor;    private ExecutorService asyncSenderExecutor;    //......    private SendResult sendKernelImpl(final Message msg,                                      final MessageQueue mq,                                      final CommunicationMode communicationMode,                                      final SendCallback sendCallback,                                      final TopicPublishInfo topicPublishInfo,                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        long beginStartTime = System.currentTimeMillis();        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());        if (null == brokerAddr) {            tryToFindTopicPublishInfo(mq.getTopic());            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());        }        SendMessageContext context = null;        if (brokerAddr != null) {            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);            byte[] prevBody = msg.getBody();            try {                //for MessageBatch,ID has been set in the generating process                if (!(msg instanceof MessageBatch)) {                    MessageClientIDSetter.setUniqID(msg);                }                boolean topicWithNamespace = false;                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());                    topicWithNamespace = true;                }                int sysFlag = 0;                boolean msgBodyCompressed = false;                if (this.tryToCompressMessage(msg)) {                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;                    msgBodyCompressed = true;                }                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;                }                if (hasCheckForbiddenHook()) {                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());                    checkForbiddenContext.setCommunicationMode(communicationMode);                    checkForbiddenContext.setBrokerAddr(brokerAddr);                    checkForbiddenContext.setMessage(msg);                    checkForbiddenContext.setMq(mq);                    checkForbiddenContext.setUnitMode(this.isUnitMode());                    this.executeCheckForbiddenHook(checkForbiddenContext);                }                if (this.hasSendMessageHook()) {                    context = new SendMessageContext();                    context.setProducer(this);                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());                    context.setCommunicationMode(communicationMode);                    context.setBornHost(this.defaultMQProducer.getClientIP());                    context.setBrokerAddr(brokerAddr);                    context.setMessage(msg);                    context.setMq(mq);                    context.setNamespace(this.defaultMQProducer.getNamespace());                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                    if (isTrans != null && isTrans.equals("true")) {                        context.setMsgType(MessageType.Trans_Msg_Half);                    }                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {                        context.setMsgType(MessageType.Delay_Msg);                    }                    this.executeSendMessageHookBefore(context);                }                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());                requestHeader.setTopic(msg.getTopic());                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());                requestHeader.setQueueId(mq.getQueueId());                requestHeader.setSysFlag(sysFlag);                requestHeader.setBornTimestamp(System.currentTimeMillis());                requestHeader.setFlag(msg.getFlag());                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));                requestHeader.setReconsumeTimes(0);                requestHeader.setUnitMode(this.isUnitMode());                requestHeader.setBatch(msg instanceof MessageBatch);                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);                    if (reconsumeTimes != null) {                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);                    }                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);                    if (maxReconsumeTimes != null) {                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);                    }                }                SendResult sendResult = null;                switch (communicationMode) {                    case ASYNC:                        Message tmpMessage = msg;                        boolean messageCloned = false;                        if (msgBodyCompressed) {                            //If msg body was compressed, msgbody should be reset using prevBody.                            //Clone new message using commpressed message body and recover origin massage.                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66                            tmpMessage = MessageAccessor.cloneMessage(msg);                            messageCloned = true;                            msg.setBody(prevBody);                        }                        if (topicWithNamespace) {                            if (!messageCloned) {                                tmpMessage = MessageAccessor.cloneMessage(msg);                                messageCloned = true;                            }                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));                        }                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;                        if (timeout < costTimeAsync) {                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                        }                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                            brokerAddr,                            mq.getBrokerName(),                            tmpMessage,                            requestHeader,                            timeout - costTimeAsync,                            communicationMode,                            sendCallback,                            topicPublishInfo,                            this.mQClientFactory,                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),                            context,                            this);                        break;                    case ONEWAY:                    case SYNC:                        long costTimeSync = System.currentTimeMillis() - beginStartTime;                        if (timeout < costTimeSync) {                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                        }                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                            brokerAddr,                            mq.getBrokerName(),                            msg,                            requestHeader,                            timeout - costTimeSync,                            communicationMode,                            context,                            this);                        break;                    default:                        assert false;                        break;                }                if (this.hasSendMessageHook()) {                    context.setSendResult(sendResult);                    this.executeSendMessageHookAfter(context);                }                return sendResult;            } catch (RemotingException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } catch (MQBrokerException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } catch (InterruptedException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } finally {                msg.setBody(prevBody);                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));            }        }        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);    }    //......}
  • sendKernelImpl对于communicationMode为SYNC的,执行的是this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this)

MQClientAPIImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {    private final static InternalLogger log = ClientLogger.getLog();    private static boolean sendSmartMsg =        Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));    static {        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));    }    private final RemotingClient remotingClient;    private final TopAddressing topAddressing;    private final ClientRemotingProcessor clientRemotingProcessor;    private String nameSrvAddr = null;    private ClientConfig clientConfig;    //......    public SendResult sendMessage(        final String addr,        final String brokerName,        final Message msg,        final SendMessageRequestHeader requestHeader,        final long timeoutMillis,        final CommunicationMode communicationMode,        final SendMessageContext context,        final DefaultMQProducerImpl producer    ) throws RemotingException, MQBrokerException, InterruptedException {        return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);    }    //......}
  • MQClientAPIImpl的sendMessage内部调用的是同名的sendMessage方法,只不过其传递的retryTimesWhenSendFailed参数为0,即出现内部方法出现异常不会重试

小结

DefaultMQProducerImpl的sendDefaultImpl方法通过tryToFindTopicPublishInfo找到topicPublishInfo,如果不为null且是ok的,则根据communicationMode计算timesTotal,其中是CommunicationMode.SYNC的话,timesTotal为1 + defaultMQProducer.getRetryTimesWhenSendFailed(),否则为1;之后就是最多循环timesTotal次执行sendKernelImpl,RemotingException异常及MQBrokerException异常中responseCode为TOPIC_NOT_EXIST、SERVICE_NOT_AVAILABLE、SYSTEM_ERROR、NO_PERMISSION、NO_BUYER_ID、NOT_IN_CURRENT_UNIT或者是sendResult.getSendStatus()不等于SendStatus.SEND_OK的会重试

doc

  • DefaultMQProducerImpl