前 言
三此君看了好几本书,看了很多遍源码整顿的 一张图进阶 RocketMQ 图片,对于 RocketMQ 你只须要记住这张图!感觉不错的话,记得点赞关注哦。
【重要】视频在 B 站同步更新,欢送围观,轻轻松松涨姿态。一张图进阶 RocketMQ- 音讯发送(视频版)
https://www.bilibili.com/vide…
本文是“一张图进阶 RocketMQ”系列第 3 篇,对 RocketMQ 不理解的同学能够先看看三此君的
一张图进阶 RocketMQ- 整体架构,一张图进阶 RocketMQ – NameServer。
在理解了 RocketMQ 的整体架构之后,咱们来深刻的剖析下生产者音讯发送的设计与实现。本文从一个生产者示例开始,以两行代码为切入点,逐渐分析生产者启动流程以及同步音讯发送流程。
生产者示例
音讯发送分为同步音讯、异步音讯和单向音讯,简略来说:
- 同步音讯:音讯发送之后会期待 Broker 响应,并把响应后果传递给业务线程,整个过程业务线程在期待。
- 异步音讯:调用异步发送 API,Producer 把音讯发送申请放进线程池就返回。逻辑解决,网络申请都在线程池中进行,等后果解决完之后回调业务定义好的回调函数。
- 单向音讯:只负责发送音讯,不论发送后果。
咱们先来回顾下同步音讯发送的例子:
public class SyncProducer {public static void main(String[] args) throws Exception {
// 实例化音讯生产者 Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置 NameServer 的地址
producer.setNamesrvAddr("localhost:9876");
// 启动 Producer 实例
producer.start();
// 创立音讯,并指定 Topic,Tag 和音讯体
Message msg = new Message("sancijun","order", "orderId", "我肯定会关注三此君".getBytes("UTF-8"));
// 发送音讯到一个 Broker
SendResult sendResult = producer.send(msg);
// 通过 sendResult 返回音讯是否胜利送达
System.out.printf("%s%n", sendResult);
// 如果不再发送音讯,敞开 Producer 实例。producer.shutdown();}
}
- 首先,实例化一个生产者
producer
,并通知它 NameServer 的地址,这样生产者能力从 NameServer 获取路由信息。 - 而后
producer
得做一些初始化(这是很要害的步骤),它要和 NameServer 通信,要先初始化通信模块等。 producer
曾经筹备好了,那得筹备好要发的内容,把 “ 我肯定会关注三此君 ” 发送到 Topic=”sanicjun“。- 内容筹备好,那
producer
就能够把音讯发送进来了。producer
怎么晓得 Broker 地址呢?他会去 NameServer 获取路由信息,失去 Broker 的地址是 localhost:10909,而后通过网络通信将音讯发送给 Broker。 - 生产者发送的音讯通过网络传输给 Broker,Broker 须要对音讯依照肯定的构造进行存储。存储实现之后,把存储后果告知生产者。
其中有两个要害的中央:producer.start()
及 producer.send()
,也就是生产者初始化及音讯发送。咱们以这两行代码为切入点,看看 RocketMQ Producer 的设计与实现。
Tips:因为本文是 RocketMQ 设计与实现剖析,尽管不会粘贴任何源码,然而图文中会有大量的类名和办法名,看的时候不用执着于这些生疏的类名和办法名,三此君会解释这些类和办法的用处。
指标:将音讯发送给 Broker 进行存储
关键点 1: 怎么依据 topic+ 路由信息 建设网络通道,进行音讯的发送
关键点 2: 音讯在发送过程中又通过了哪些解决?
生产者启动
咱们实例化一个生产者 DefaultMQProducer,并调用 DefaultMQProducer.start() 办法进行初始化:
启动流程比拟长,其实最重要的就是初始化了通信模块,并启动了多个定时工作,这些在前面的音讯发送过程中都会用到:
- 查看配置是否非法:生产者组名是否为空、是否满足命名规定、长度是否满足等。
- 启动通信模块服务 Netty RemotingClient:RemotingClient 是一个接口,底层应用的通信框架是 Netty,提供了实现类 NettyRemotingClient,RemotingClient 在初始化的时候实例化 Bootstrap,不便后续用来创立 SocketChannel;后文会介绍 RocketMQ 的通信机制,大家稍安勿躁。
-
启动 5 个后盾定时工作 :定时更新 NameServerAddr 信息, 定时更新 topic 的路由信息,定时向 Broker 发送心跳及清理下线的 Broker,定时长久化 Consumer 的 Offset 信息,定时调整线程池;
生产者每 30s 会从某台 NameServer 获取 Topic 和 Broker 的映射关系(路由信息)存在本地内存中,如果发现新的 Broker 就会和其建设长连贯,每 30s 会发送心跳至 Broker 保护连贯。
Tips:生产者为什么要启动音讯拉取服务?重均衡服务是什么?简略来说,这两个服务都是用于消费者的,这里咱们暂且不理睬。音讯拉取服务 pullMessageService 是从 Broker 拉取音讯的服务,重均衡服务 rebalanceService 用于消费者的负载平衡,负责调配消费者可生产的音讯队列。
同步发送
总体上讲,音讯发送能够划分为三个层级:
- 业务层:筹备须要发送的音讯。
- 音讯解决层:获取业务发送的 Message,通过一系列的参数查看、音讯发送筹备、参数包装等操作。
- 通信层:基于 Netty 封装的一个网络通信服务,将音讯发送给 Broker。
咱们通过后面的示例来看整个同步音讯发送的解决流程,整个过程咱们的次要指标就是把音讯发送到 Broker:
- 第一步:业务层构建待发送音讯
Message msg = new Message("sancijun","order", "orderId", "我肯定会关注三此君".getBytes("UTF-8"));
-
第二步:而后咱们调用
producer.send(msg)
发送音讯,可是 producer 怎么晓得发给谁呢?音讯自身又须要通过哪些解决呢?咱们进入调用链直到 sendDefaultImpl- 查看音讯是否为空,音讯的 Topic 的名字是否为空或者是否符合规范,音讯体大小是否符合要求,最大值为 4MB,能够通过 maxMessageSize 进行设置。
-
执行 tryToFindTopicPublishInfo() 办法:获取 Topic 路由信息,如果不存在则抛出异样。如果本地缓存没有路由信息,就通过 Namesrv 获取路由信息,更新到本地。音讯构建的时候咱们指定了音讯所属 Topic,依据 Topic 路由信息咱们能够找到对应的 Broker。
Tips:从 NameServer 获取的路由信息 TopicRouteData 会蕴含指定 Topic 的 topicQueueTable、brokerAddrTable。在 NameServer 集群元数据管理局部咱们讲过,通过 topicName 从 topicQueueTable 获取对应的 brokerName,再依据 brokerName 从 brokerAddrTable 中获取 Broker IP 地址。
- 计算音讯发送的重试次数,同步重试和异步重试的执行形式是不同的。在同步发送状况下如果发送失败会默认重投两次(默认 retryTimesWhenSendFailed = 2),并且不会抉择上次失败的 Broker,会向其余 Broker 投递。
-
执行队列抉择办法 selectOneMessageQueue()。依据 lastBrokerName(上次发送音讯失败的 Broker 的名字)和 Topic 路由信息选一个 MessageQueue。
首次发送时 lastBrokerName 为 null,采纳轮询策略抉择一个 MessageQueue。如果上次发送失败,也是采纳轮询策略抉择一个 MessageQueue,然而会跳过上次发送失败 Broker 的 MessageQueue,也就是换一个 Broker 发送。Tips:抉择一个 MessageQueue,什么是 MessageQueue 呢?这和 Broker 的存储构造相干,咱们会在存储局部具体介绍,这里先说论断,咱们创立 Topic 时指定了这个 Topic 的读写队列数,每个 MessageQueue 有不同的 queueId(0-3)。
咱们也能够通过 sendLatencyFaultEnable 来设置是否总是发送到提早级别较低的 Broker,默认值为 False,我么这里就不展开讨论了。
- 执行 sendKernelImpl() 办法。
-
第三步:sendDefaultImpl 做了一系列逻辑解决,咱们曾经失去了待发送的 BrokerName,而咱们的指标是把音讯发送到 Broker。sendKernelImpl 办法是发送音讯的外围办法,次要用于筹备通信层的入参(比方 Broker 地址、申请体等),将申请传递给通信层。
-
依据 MessageQueue.brokerName 获取 Broker IP 地址,给 message 增加全局惟一 ID。
Tips:sendKernelImpl 也有很多的逻辑解决,咱们临时先略过这里的压缩、事务音讯、钩子函数、重试音讯:
对大于 4k 的一般音讯进行压缩,并设置音讯的零碎标记为 MessageSysFlag.COMPRESSED_FLAG。
如果是事务 Prepared 音讯,则设置音讯的零碎标记为 MessageSysFlag.TRANSACTION_PREPARED_TYPE
如果注册了音讯发送钩子函数,则执行音讯发送之前的加强逻辑,通过 DefaultMQProducerImpl#registerSendMessageHook 注册钩子解决类,并且能够注册多个。
构建发送音讯申请头:生产者组、主题名称、默认创立主题 Key、该主题在单个 Broker 默认队列数、队列 ID(队列序号)、音讯零碎标记(MessageSysFlag)、音讯发送工夫、音讯标记、音讯扩大属性、音讯重试次数、是否是批量音讯等
解决重试音讯。
- 调用 MQClientAPIImpl.sendMessage(),首先构建一个近程申请 RemotingCommand,依据发送类型(同步或异步)调用不同的通信层实现办法。咱们这里是同步音讯,则调用
RemotingClient.invokeSync()。
- 解决返回后果,将通信层返回的后果封装成 SendResult 对象返回给业务层。
-
-
第四步:RemotingClient 是基于 Netty 实现的,相熟 Netty 的同学曾经大略晓得前面的流程,不相熟的同学也没有关系,这里先混个眼生,上面咱们会对 Netty 做简略的介绍。
- RemotingClient.invokeSync() 先是通过 Broker Addr 获取或者创立 Netty Channel。先从 channelTables Map 本地缓存中,以 Broker Addr 为 key 获取 Channel,没有获取到则通过 Netty Bootstrap.connect( Broker Addr) 创立 Channel,并放入缓存。
- 而后生成 <opaque, ResponseFuture> 的键值对放入 responseTable 缓存中,后果返回的时候依据 opaque 从缓存中获取后果。
- 调用 channel.writeAndFlush() 将音讯通过网络传输给指定 Broker。这里是 Netty 框架的 API,曾经不在 RocketMQ 领域。
- 调用 ResponseFuture.waitResponse() 办法,直到 Netty 接管 Broker 的返回后果。其实就是执行 countDownLatch.await()。
-
第五步:后果解决及返回。
- Broker 处理结果返回,Netty 产生可读事件,由 Channelhandler 解决可读事件,这里是 NettyClientHandler.channelRead0()接管写入数据,解决可读事件。
- 而后解决返回后果,从 responseTable 取出 ResponseFuture,并执行 responseFuture.putResponse()。实际上就只执行 countDownLatch.countDown() 唤醒第四步中期待的调用线程,返回 Broker 的处理结果 RemotingCommand。
- 后果层层返回,直到 MQClientAPIImpl.sendMessageSync() 出手了,这里调用 MQClientAPIImpl.processSendResponse() 解决返回后果,封装成 SendResult 对象返回给业务层。
到这里,生产者曾经将音讯发送到指定的 Broker 了,其中包含了音讯的层层校验及封装;还有很重要的是如何抉择一个 MessageQueue 进行发送(重试),重试是保障音讯发送牢靠的关键步骤;最初通过 Netty 将申请发送给 Broker。咱们先不论 Broker 收到申请如何解决,然而要明确音讯如何送到 Broker 进行存储,须要对 Netty 有简略的了解。
总结
以上就是 RocketMQ 音讯发送的次要内容,咱们简略的总结下:
- 生产者启动:次要是调用 NettyRemotingClient.start() 初始化 Netty 客户端,并启动 5 个后盾线程;
- 音讯发送:业务层封装发送的音讯,逻辑层进行层层校验及封装,轮询策略抉择一个 MessageQueue 发送(重试),通信层基于 Netty 将音讯发送给 Broker。
参考文献
- RocketMQ 官网文档
- RocketMQ 源码
- 丁威, 周继锋. RocketMQ 技术底细:RocketMQ 架构设计与实现原理. 机械工业出版社, 2019-01.
- 李伟. RocketMQ 分布式消息中间件:外围原理与最佳实际. 电子工业出版社, 2020-08.
- 杨开元. RocketMQ 实战与原理解析. 机械工业出版社, 2018-06.