关于java:RocketMQ-消息发送设计和原理详解-源码剖析

36次阅读

共计 25229 个字符,预计需要花费 64 分钟才能阅读完成。

1. 背景

发送音讯是 MQ 最根底的操作之一。RocketMQ 官网提供了多语言客户端反对音讯的发送和生产操作。
当然,音讯发送并不仅仅牵扯到客户端操作。客户端做的是向 Broker 发送申请,申请中蕴含了音讯的全副信息。而 Broker 须要解决客户端发送来的生产申请,将音讯存储起来。
在这篇文章中我将将解析音讯发送流程中生产者和 Broker 的解决流程,揭秘 RocketMQ 音讯发送高性能、高牢靠的原理。

2. 概述

RocketMQ 的 Java 客户端提供了丰盛的音讯发送 API,反对多种音讯发送的形式和非凡音讯的发送。
包含 3 种发送形式(同步、异步、单向)和多种非凡音讯(程序音讯、延时音讯、批量音讯、过滤音讯、事务音讯)。
对于客户端音讯发送 API 的具体应用形式,能够参考官网文档:https://github.com/apache/rocketmq/blob/develop/docs/cn/RocketMQ_Example.md。

2.1 音讯发送形式和非凡音讯

2.1.1 三种音讯发送形式

RocketMQ 反对 3 种音讯发送形式:同步、异步和单向。

  1. 同步(Sync)

    • 发送音讯时,同步期待,直到 Broker 返回发送后果。
    • 这种形式最为牢靠,然而发送性能最差。
    • 在一些可靠性要求十分高的场景下,举荐应用同步形式。比方:重要的音讯告诉,短信告诉。
  2. 异步(Async)

    • 发送音讯时,无需期待 Broker 返回发送后果,发送线程不阻塞。执行发送 API 时指定音讯发送胜利后的回调函数。
    • 这种形式相比于同步发送,性能能够晋升多个数量级,但可靠性不如同步发送。
    • 在对响应工夫敏感、流量较大的场景下,举荐应用异步形式。异步发送是应用最宽泛的发送形式。
  3. 单向(One-way)

    • 发送音讯时,间接返回,不期待 Broker 返回发送后果,也不注册回调函数。
    • 这种发送形式性能最高,可靠性最差。它不关怀发送后果,不在乎音讯是否胜利存储在 Broker 中。
    • 实用于音讯失落也没有太大影响的场景,例如发送日志。

这三种发送形式中,异步发送是最为宽泛应用的发送形式。配合一些重试和弥补机制,能够达成较高的可靠性和很高的性能。

2.1.2 非凡音讯类型

上面简略介绍一下几种非凡音讯类型。

  1. 一般音讯:发送效率最高、应用场景最宽泛的音讯类型。一般音讯能够由客户端并发发送。不保障一般音讯生产的程序。单 Broker 性能能够达到十万级别。(视 Broker 配置而变)
  2. 队列级别程序音讯:RocketMQ 将一个 Topic 分为多个队列,以进步生产速度。每隔分区内的音讯遵循先生产先生产的程序。
  3. Topic 级别程序音讯:如果把一个 Topic 的队列数量设为 1,那么该 Topic 中的音讯也遵循先生产先生产。
  4. 提早音讯:音讯发送后,消费者并不马上收到音讯,而是期待一段指定的工夫之后能力生产到该音讯。
  5. 事务音讯:提供分布式事务性能,能够保障发送音讯和另外的操作同时胜利或者同时失败。
  6. 批量音讯:将多个音讯包装成一个批量音讯,一起发送。升高网络传输次数,晋升传输效率。

2.2 路由机制

RocketMQ 的 Topic 能够分为多个队列,每个队列可能散布在不同 Broker 上。

音讯的路由指的是发送音讯时须要先获取 Topic 的路由信息(其中蕴含每个 Topic 的队列和它们所在的 Broker 地址),而后抉择一个队列进行发送。

音讯发送的 API 提供了参数,能够传入要发送的队列信息,或者传入队列抉择办法,以供用户抉择发往某个 Broker 的具体队列。

2.3 音讯发送流程

音讯发送的流程波及到 RocketMQ 的三个组件:生产者、Broker、NameServer。
其中生产者负责发送音讯,Broker 负责解决音讯发送申请,NameServer 负责更新和提供路由信息。

如图所示

  • 生产者每 30 秒向 NameServer 拉取路由信息,Broker 每 30 秒向 NameServer 发送路由信息。
  • 生产者发送音讯时,会先在本地查问 Topic 路由信息。
  • 如果查问不到,会申请 NameServer 查问。
  • 随后将音讯发送给 Broker。
  • Broker 也会在本地查问 Topic 路由信息来查看音讯的 Topic 是否存在。
  • 随后保留音讯,如果是异步发送则间接返回,如果同步发送则期待音讯保留胜利后返回。

2.4 高可用设计

2.4.1 生产者高可用

  • 音讯发送重试机制:生产者在音讯发送时如果呈现失败,默认会重试 2 次。
  • 故障躲避机制:如果重试的音讯仍发往同一个 Broker,发送大概率还是会失败,所以在重试时会尽量避开刚刚发送失败的 Broker。

    • 能够通过配置故障提早机制来指定是在本次音讯发送时临时避开发送失败的 Broker,还是在之后一段时间都避开该 Broker

2.4.2 Broker 端高可用

Broker 端的高可用是用数据同步的形式,将音讯同步到备 Broker 上,当主 Broker 产生故障时,能够从备 Broker 上复原数据。

3. 具体设计

3.1 音讯

RocketMQ 中的音讯类次要有 3 个

  • Message 为客户端须要应用的音讯类。
  • MessageExt 为音讯扩大属性类,它扩大了 Message,在 Broker 上产生此对象。
  • MessageExtBrokerInner 是存储外部应用的 Message 对象,在 rocketmq-store 模块应用。

在发送音讯时,用到的是 Message,能够指定音讯的属性、音讯体和 flag。

3.2 生产者类图

  • DefaultMQProducer 是 RocketMQ 中默认的生产者实现,它实现了 MQAdmin 接口。
  • DefaultMQProducer 外部包装了一个 DefaultMQProducerImpl 字段,它是生产者的具体实现类,DefaultMQProducer 调用它外部的 DefaultMQProducerImpl 来发送命令。
  • DefaultMQProducerImpl 外部注册了一个 MQClientInstance 字段。MQClientInstance 是与 NameServer 和 Broker 通信的中介。MQClientInstanceClientId 一一对应,ClientIdclientIpinstanceNameunitName 组成。如果不手动批改,一般来说一个启动的客户端过程只有一个 MQClientInstance 实例,这样能够节俭客户端资源。
  • MQClientInstnace 外部的 producerTable 注册了 ClientId 和 DefaultMQProducerImpl 的对应关系
  • MQClientAPIImpl 提供了发送音讯的 API,它调用 RemotingClient 执行发送。

3.3 生产者启动

  1. 结构 DefaultMQProducer 实例,start() 启动
  2. 初始化 DefaultMQProducerImpl,设置状态为 CREATE_JUST
  3. 启动 DefaultMQProducerImpl
  4. 查看配置
  5. 依据 ClientId 获取或创立 MQClientInstance
  6. DefaultMQProducerImpl 注册到 MQClientInstance
  7. 启动 MQClientInstanceMQClientInstance 启动定时工作,包含从 NameServer 拉取 Topic 路由信息、向 Broker 发送心跳
  8. MQClientInstance 启动 PullMessageServiceRebalanceService
  9. 设置服务状态为 RUNNING

3.4 音讯发送

RocketMQ 的音讯发送流程图如下图所示:

其中 MQProducerImpl 负责执行外围的音讯发送办法 sendDefaultImpl

这个办法中蕴含了音讯发送的外围逻辑

  1. 查找 Topic 路由信息
  2. 抉择音讯队列
  3. 发送音讯

3.4.1 查找 Topic 路由信息

指定 Topic 发送音讯后,生产者须要晓得音讯要发往哪个 Broker 地址。于是须要获取 Topic 路由信息,查问 Topic 所在的 Broker,随后抉择一个 Broker 进行发送。该逻辑在 DefaultMQProducerImpl#tryToFindTopicPublishInfo() 中执行。

在第一次发送音讯时,本地没有缓存 Topic 路由信息,所以须要被动从 NameServer 拉取,而后更新到本地路由表缓存。随后生产者会启动定时工作,每隔 30s 从新从 NameServer 拉取路由信息。

留神,第一次查问 NameServer 时,如果没有拉取到 Topic 路由信息,则会应用默认 Topic(AUTO_CREATE_TOPIC_KEY_TOPIC)再次查问。

默认 Topic 在 Broker 启动时创立,是为主动创立主题使用的。
它的目标是在主题没有被创立时,让生产者发送音讯时也可能查问到 Broker 地址。
而后等音讯真正发送到 Broker 时,会依据音讯的 Topic 创立主题。

如果最终都没有拉取到 Topic 路由信息,则会抛出异样。

3.4.2 重试机制

同步发送和异步发送的重试次数别离由 retryTimesWhenSendFailedretryTimesWhenSendAsyncFailed 指定,默认都为 2 次(发送 1 次,重试 1 次)。

  • 同步发送的重试逻辑即在 sendDefaultImpl() 办法中循环发送执行发送逻辑。
  • 异步发送的重试逻辑在 MQClientAPIIpml()sendMessageAsync() 结构的回调办法中指定。它调用 onExceptionImpl() 办法,如果以后发送次数小于异步发送重试次数,则再次执行 sendMessageAsync() 重试发送。

3.4.3 抉择音讯队列:故障提早机制

获取到 Topic 路由信息后,须要从中抉择一个队列进行发送。抉择队列的逻辑由 MQFaultStrategy#selectOneMessageQueue()解决,它用来解决 Broker 的 队列抉择 故障提早机制

默认机制

默认机制下,依照轮询的形式抉择队列。如果上一次发送胜利,抉择下一个队列。如果上一次发送失败,会躲避上一次发送的 MessageQueue 所在的 Broker。

故障提早机制

故障提早机制是为了可能在音讯发送的时候尽量避开上次发送失败的 Broker,它由 sendLatencyFaultEnable 参数来设置开启,默认为敞开状态。

  • 敞开:发送一次音讯失败后,会在本次音讯发送过程中避开该 Broker,但下次发送音讯时还会持续尝试。
  • 开启:发送一次音讯失败后,会乐观地认为 Broker 不可用,在接下来一段时间内都不再向其发送音讯。随着发送失败次数的增多,躲避工夫将越来越长。

3.4.4 发送音讯

音讯发送逻辑由 DefaultMQProducerImpl#sendKernelImpl() 解决。

  1. 先依据上一步抉择的队列,查问 MQClientInstance 获取对应的 Broker 地址(先查本地缓存,如果没有则从 NameServer 拉取)。
  2. 设置音讯的全局惟一 ID,而后对于超过 4KB(默认)的音讯执行 zip 压缩。
  3. 执行发送之前的钩子函数 executeSendMessageHookBefore(),如音讯轨迹的解决逻辑就在这里进行解决。
  4. 构建音讯申请(头)
  5. 依据音讯发送形式调用 MQClientAPIImpl 进行网络传输
  6. 执行音讯发送后的钩子函数

MQClientAPIImpl 负责调用 NettyRemotingClient 将生产音讯的申请发往 Broker。

3.5 Broker 解决发送申请

Broker 通过 SendMessageProcessor 解决生产者发来的音讯生产申请。以后应用 asyncSendMessage() 异步解决生产者发送过去的申请。

RocketMQ 的 Netty 申请解决机制会依照业务逻辑进行处理器的拆分。具体地说,RocketMQ 为不同的申请类型(申请码)注册不同的业务处理器和线程池去解决。比方音讯生产的申请由 sendMessageExecutor 线程池解决,生产申请由 pullMessageExecutor 解决……

Broker 解决音讯生产申请逻辑如下:

  1. 查看音讯合理性
  2. 如果音讯重试次数超过容许的最大重试次数,将进入死信队列。
  3. 调用存储模块将音讯存储

3.6 Batch 音讯(批量音讯)

为了缩小网络申请次数,RocketMQ 反对将对立主题的一批音讯打包发送。对于每条音讯较小且音讯较多的场景,应用批量发送能够晋升发送效率。

批量音讯 MessageBatch 类继承一般音讯类 Message,外部仅仅多了音讯列表 List<Message> messages。这样就能够像发送一般音讯一样发送批量音讯。发送前须要做的就是将多条一般音讯体编码,放到 MessageBatch 的音讯体中。

服务端接管到后,依照雷同规定对批量音讯进行解码,即可解码出多条音讯。

4. 源码解析

4.1 生产者启动

4.1.1 DefaultMQProducerImpl#start

// DefaultMQProducerImpl.java
public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {
        // 如果状态为 CREATE_JUST,执行启动逻辑。该对象创立时默认状态为 CREATE_JUST
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            // 查看 producerGroup 是否非法
            this.checkConfig();

            // 扭转生产者的 instanceName 为过程 ID,防止同一个服务器上的多个生产者实例名雷同
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();
            }

            // 创立 MQClientInstance 实例
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            // 向 MQClientInstance 注册服务,将以后生产者退出 MQClientInstance 治理(退出 MQClientInstance.producerTable)// 不便后续调用网络申请、进行心跳检测等
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                            null);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            // 启动 MQClientInstance,如果曾经启动,则不会执行
            if (startFactory) {mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                     this.defaultMQProducer.isSendMessageWithVIPChannel());
            // 设置服务状态为 RUNNING
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once,"
                                        + this.serviceState
                                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                        null);
        default:
            break;
    }

    // 启动后马上向 NameServer 发送心跳
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    RequestFutureHolder.getInstance().startScheduledTask(this);

}

4.1.2 MQClientException

/**
 * 启动客户端代理
 *
 * @throws MQClientException
 */
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                // 当生产失败的时候,须要把音讯发回去
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

/**
 * 启动定时工作
 */
private void startScheduledTask() {if (null == this.clientConfig.getNamesrvAddr()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    /**
     * 默认每 30s 从 nameserver 获取 Topic 路由信息
     * 包含 生产者和消费者
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    /**
     * 每 30s 向 Broker 端发送心跳
     * 1. 革除离线的 Broker
     * 2. 汇报心跳给 broker
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();} catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    /**
     * 每 5s 把消费者的 offset 长久化
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    /**
     * 每 60s 调整线程池
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}

4.2 音讯发送

4.2.1 音讯发送实现

// DefaultMQProducerImpl.java
/**
 * 发送音讯实现
 * - 验证合法性 checkMessage
 * - 查找主题路由信息 tryToFindTopicPublishInfo
 * - 抉择音讯队列 selectOneMessageQueue
 * - 发送音讯 sendKernelImpl
 *
 * @param msg
 * @param communicationMode
 * @param sendCallback
 * @param timeout
 * @return
 * @throws MQClientException
 * @throws RemotingException
 * @throws MQBrokerException
 * @throws InterruptedException
 */
private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 查看生产者处于运行状态
    this.makeSureStateOK();
    // 验证音讯是否符合规范
    Validators.checkMessage(msg, this.defaultMQProducer);
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 获取主题的路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        // 循环执行发送,解决同步发送重试。同步发送共重试 timesTotal 次,默认为 2 次,异步发送只执行一次
        for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 抉择音讯队列
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }

                    // 发送音讯
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // 解决发送异样,更新失败条目
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}
                            }

                            return sendResult;
                        default:
                            break;
                    }
                }
                // catch ...
            } else {break;}
        }

        // 发送胜利,返回发送后果
        if (sendResult != null) {return sendResult;}

        // 发送失败,抛出异样
        // ...
        // mqClientException.setResponseCode(...)
        throw mqClientException;
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic:" + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

4.2.2 查找路由信息

// DefaultMQProducerImpl.java
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 尝试获取缓存的路由信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    // 如果路由信息没有找到,则从 NameServer 上获取
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {
        // 如果第一次没有查问到,第二次应用默认主题查问
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

4.2.3 抉择音讯队列

默认机制,故障提早机制敞开

// TopicPublishInfo.java
/**
 * 抉择队列
 * 上一次发送胜利则抉择下一个队列,上一次发送失败会躲避上次发送的 MessageQueue 所在的 Broker
 *
 * @param lastBrokerName 上次发送的 Broker 名称,如果为空示意上次发送胜利
 * @return
 */
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {
        // 轮询队列,抉择下一个队列
        return selectOneMessageQueue();} else {
        // 上次发送失败,躲避上次发送的 MessageQueue 所在的 Broker
        for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}
        }
        return selectOneMessageQueue();}
}

故障提早机制

// MQFaultStrategy.java
/**
 * 抉择发送的队列,依据是否启用 Broker 故障提早机制走不同逻辑
 *
 * sendLatencyFaultEnable=false,默认不启用 Broker 故障提早机制
 * sendLatencyFaultEnable=true,启用 Broker 故障提早机制
 *
 * @param tpInfo
 * @param lastBrokerName
 * @return
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // 启用 Broker 故障提早机制
    if (this.sendLatencyFaultEnable) {
        try {
            // 轮询获取一个音讯队列
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 验证该音讯队列是否可用,躲避注册过不可用的 Broker。if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }

            // 如果没有可用的 Broker,尝试从躲避的 Broker 中抉择一个可用的 Broker,如果没有找到,返回 null
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();}
    // 不启用 Broker 故障提早机制
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

4.2.4 发送音讯 API 外围入口

// DefaultMQProducerImpl.java
/**
 * 音讯发送 API 外围入口
 * 1. 依据 MessageQueue 获取 Broker 地址
 * 2. 为音讯调配全局惟一 ID,执行消息压缩和事务
 * 3. 如果注册了发送钩子函数,则执行发送之前的钩子函数
 * 4. 构建音讯发送申请包
 * 5. 依据音讯发送形式(同步、异步、单项)进行网络传输
 * 6. 如果注册了发送钩子函数,执行发送之后的钩子函数
 *
 * @param msg 待发送音讯
 * @param mq 发送的音讯队列
 * @param communicationMode 音讯发送模式:SYNC、ASYNC、ONEWAY
 * @param sendCallback 异步发送回调函数
 * @param topicPublishInfo 主题路由信息
 * @param timeout 音讯发送超时工夫
 * @return 音讯发送后果
 */
private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();
    // 依据 MessageQueue 获取 Broker 的网络地址
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        // 如果 MQClientInstance 的 brokerAddrTable 未缓存该 Broker 信息,则从 NameServer 被动拉取 topic 路由信息
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    // 找到 topic 的路由信息
    if (brokerAddr != null) {brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            // 设置音讯的全局惟一 ID(UNIQUE_ID),对于批量音讯,在生成过程中曾经设置了 ID
            //for MessageBatch,ID has been set in the generating process
            if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);
            }

            // 解决命名空间逻辑
            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }

            // 解决压缩,默认音讯体超过 4KB 的音讯进行 zip 压缩,并设置压缩标识
            int sysFlag = 0;
            boolean msgBodyCompressed = false;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true;
            }

            // 处理事务 Prepared 音讯,并设置事务标识
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}

            // CheckForbiddenHook ...

            // 执行音讯发送前的钩子函数
            if (this.hasSendMessageHook()) {context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);
                }
                // 执行所有 before 钩子函数
                this.executeSendMessageHookBefore(context);
            }

            // 构建音讯发送申请
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch);
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }

                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }

            // 依据音讯发送形式进行网络传输
            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    // 执行客户端同步发送办法
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

            if (this.hasSendMessageHook()) {context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }

            return sendResult;
        } 
        // catch ...
        } finally {msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }

    // 被动更新后还是找不到路由信息,则抛出异样
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

4.3 Broker 解决发送申请

4.3.1 Broker 注册发送音讯处理器

/**
 * 创立和注册 Broker 申请解决类
 * RocketMQ 依照业务逻辑辨别申请处理器,每个类型的申请码对应一个业务处理器(NettyRequestProcessor)* 这样就实现了为不同申请码设置对应线程池,实现不同申请线程池的隔离
 */
public void registerProcessor() {
    /**
    * SendMessageProcessor
    */
    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    sendProcessor.registerSendMessageHook(sendMessageHookList);
    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
    // ...
}

其中 sendMessageExecutor 是发送音讯解决线程池,默认有 4 个线程。每个线程执行 SendMessageProcessor#processRequest() 办法

4.3.2 发送音讯处理器解决

SendMessageProcessor#processRequest() 最终调用 asyncSendMessage() 办法解决发送申请

// SendMessageProcessor.java
/**
 * 解决客户端的发送音讯申请
 * 1. 查看音讯合法性检查
 * 2. 如果音讯重试次数超过最大重试次数,音讯将进入 DLQ 死信队列。* 3. 将音讯保留到存储
 *
 * @param ctx
 * @param request
 * @param mqtraceContext
 * @param requestHeader
 * @return
 */
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                            SendMessageContext mqtraceContext,
                                                            SendMessageRequestHeader requestHeader) {
    // 结构 Response,蕴含音讯合法性检查
    final RemotingCommand response = preSend(ctx, request, requestHeader);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);
    }

    final byte[] body = request.getBody();

    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
    }

    // 结构存储用的 Message 对象
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    // 判断发过来的音讯是否曾经达到从新生产的重试最大次数,进入死信队列
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);
    }

    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    MessageAccessor.setProperties(msgInner, origProps);
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
    if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
        // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.
        // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.
        String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later
        origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
    } else {msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    }

    // 保留到音讯存储
    CompletableFuture<PutMessageResult> putMessageResult = null;
    String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
            return CompletableFuture.completedFuture(response);
        }
        putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
    } else {putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

参考资料

  • 生产音讯样例——官网文档
  • RocketMQ 音讯发送流程
  • 《RocketMQ 技术底细 第二版》
  • 《RocketMQ 分布式消息中间件 外围原理与最佳实际》

欢送关注公众号【消息中间件】(middleware-mq),更新消息中间件的源码解析和最新动静!

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0