共计 7386 个字符,预计需要花费 19 分钟才能阅读完成。
前 言
大家好,我是三此君,一个在自我救赎之路上的非典型程序员。
“一张图”系列旨在通过“一张图”系统性的解析一个板块的知识点:
- 三此君向来不喜爱零零散散的知识点,通过一张图将零散的知识点连接起来,可能让咱们对一个板块有更深刻、更零碎的了解。
- 同时本系列尽可能的精炼,心愿可能让大家花 20% 的工夫,疾速了解这个板块下 80% 的内容。
本文是“一张图”系列的第一个板块:一张图解析 RocketMQ。
- 为了叙述的不便,绘图的时候将整个系列分为许多小的模块,解说的时候也是依照模块循序渐进的。一张图解析 RocketMQ 原图,画图不易,记得关注公众号:三此君
- 一张图解析 RocketMQ 是会深刻到源码层面,然而文中不会粘贴源码。三此君在看源码的时候写了很多备注,能够升高大家看源码的难度,须要的同学自行到三此君的仓库中 Fork:rocketmq release-4.3.0
本文是《一张图解析 RocketMQ》系列的第 1 篇,明天的内容次要分为三个局部:
- 整体架构:会从大家相熟的“生产者 - 消费者模式”逐渐推出 RocketMQ 残缺架构,只须要记住一张残缺的架构图即可。
- 元数据管理:我把 RocketMQ 集群的元数据整顿成一张图,不便大家直观的理解都有哪些元数据,各有什么用。
- 音讯收发示例:通过 Docker 部署 RocketMQ,并用简略的示例串起 RocketMQ 音讯收发流程。
整体架构
什么是音讯队列?顾名思义,首先得有一个队列,这个队列用来存储音讯。那有了音讯队列就得有人往里面放,有人往里面取。有没有似曾相识燕归来的感 jio,这莫非就是连小学生都晓得的,经典的“生产者 - 消费者模式”?接下来咱们就来看看它外面穿了什么?
别急,先来回顾一下“生产者 - 消费者模式”这个老朋友。简略来说,这个模型是由两类线程和一个队列形成:
- 生产者线程:生产产品,并把产品放到队列里。
- 消费者线程:从队列外面获取产品,并生产。
有了这个队列,生产者就只须要关注生产,而不必管消费者的消费行为,更不必期待消费者线程执行完;消费者也只管生产,不必管生产者是怎么生产的,更不必等着生产者生产。
这意味着什么呢,生产者和消费者之间实现 解藕 和异步。这就厉害了,因为咱们生存中很多都是异步的。比方最近新冠疫情卷土重来,我点的外卖只能送到小区门口的外卖队列外面,而我只能去外卖队列外面取外卖,而后一顿饥不择食。
具体“生产者 - 消费者模式”怎么实现,想必各位小学都学过了,咱们来看看这个模式还有什么问题吧。最大的问题就是咱们小学学的“生产者 - 消费者模式”是个单机版的,只能自嗨。这就相当于,我就是外卖骑手,我点了个外卖放到外卖队列,而后我再从外卖队列外面去取,一顿操作猛如虎呀!于是就有了进化版,咱们把消费者,队列,生产者放到不同的服务器上,这就是传说中的分布式音讯队列了。
生产者生产的音讯通过网络传递给队列存储,消费者通过网络从队列获取音讯。然而还有问题,音讯可能有很多种,全都放在一起岂不是乱套了?我点的外卖和快递全都放在一起,太难找了吧。于是咱们就须要辨别不同类型音讯,雷同类型的音讯称为一个 Topic。同时,骑手不可能只有一个,点外卖的也不会只有我一个人,于是就有了 生产者组 和消费者组。
但还是有问题呀,小区那么大,一个队列放不下。我住在小区南门,点个外卖还要跑去北门拿,那真的是 eggs hurt
。于是物业在东南西北门各设了一个外卖快递搁置点。也就是咱们有多个队列,组成 队列集群。
可是,问题又双叒叕来了(还有完没完),一个小区那么多个外卖快递队列,骑手怎么晓得送到哪里去,我又怎么晓得去哪里取?很简略,导航呀。咱们把导航的信息称为 路由信息,这些信息须要有一个治理的中央,它通知生产者,某这个 Topic 的音讯能够发给哪些队列,同时通知消费者你须要的音讯能够从哪些队列外面取。RocketMQ 为这些路由信息的设置了管理员 NameServer,当然 NameServer 也能够有很多个,组成 NameServer 集群。
到这里,你就应该晓得 RocketMQ 外面都穿了什么啦。包含了 生产者(Producer),消费者(Consumer),NameServer 以及队列自身(Broker)。Broker 是代理的意思,负责队列的存取等操作,咱们能够把 Broker 了解为队列自身。
- NameServer:咱们能够同时部署很多台 NameServer 服务器,并且这些服务器是无状态的,节点之间无任何信息同步。
NameServer 起来后监听 端口,期待 Broker、Producer、Consumer 连上来,NameServer 是集群元数据管理核心。 -
Broker:Broker 启动,跟所有的 NameServer 放弃长连贯,每 30s 发送一次发送心跳包(像心跳一样继续稳固的发送申请)。心跳包中蕴含以后 Broker 信息 (IP+ 端口等)以及存储所有 Topic 信息。注册胜利后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
咱们能够同时部署多个 Master 和多个 Slave,一个 Master 能够对应多个 Slave,然而一个 Slave 只能对应一个 Master。Master 与 Slave 的须要有雷同的 BrokerName,不同的 BrokerId。BrokerId 为 0 示意 Master,非 0 示意 Slave,但只有 BrokerId=1 的从服务器才会参加音讯的读负载。(能够临时疏忽 Broker 的主从角色)
- Topic:收发音讯前,先创立 Topic,创立 Topic 时须要指定该 Topic 要存储在哪些 Broker 上,也能够在发送音讯时主动创立 Topic。
- Producer:Producer 发送音讯,启动时先跟 NameServer 集群中的其中一台建设长连贯,并从 NameServer 中获取以后发送的 Topic 存在哪些 Broker 上,采纳轮询策略从抉择一个队列,而后与队列所在的 Broker 建设长连贯,并向 Broker 发消息。
-
Consumer:Consumer 跟 Producer 相似,跟其中一台 NameServer 建设长连贯,获取以后订阅 Topic 存在哪些 Broker 上,而后间接跟 Broker 建设连贯通道,开始生产音讯。
咱们刚刚提到骑手不止一个,取外卖快递的也不止我一个,所以会有生产者组合消费者组的概念。这里须要补充阐明一下,音讯分为集群音讯和播送音讯:
- 集群音讯:一个 Topic 的一条音讯,一个消费者组 只能有一个消费者实例生产。例如,同样是外卖 Topic,一份外卖,咱们整个小区也只有一个人生产,就是集群生产。
- 播送音讯:一个 Topic 的一条音讯,一个消费者组 所有消费者实例都会生产。例如,如果是因为疫情,政府发放食品,那咱们小区每个人都会生产,就是播送生产。
元数据管理
因为 Producer、Consumer 和 Broker 都须要和 NameServer 交互,负责的三此君不得不先和大家唠唠 NameServer 是何方神圣。下面有说道 NameServer 是集群的元数据管理核心,那它到底治理了哪些元数据?咱们来看看 NameServer 外面又穿了什么,看完了记得关注、转发、点赞、珍藏哦。
简略来说,NameServer 是咱们的整个 RocketMQ 集群的元数据管理核心,负责集群元数据的增删改查。先不论这个增删改查是怎么实现的,咱们甚至能够了解就是数据库的增删改查,然而咱们肯定要晓得这些元数据都长什么样子。能力晓得 Producer、Consumer 及 Broker 是如何依据这些数据进行音讯收发的。
如图所示,二主二从的 Broker 集群相干的元数据信息,包含 topicQueueTable、BrokerAddrTable、ClusterAddrTable、brokerLiveInfo、FilterServer (临时不必关注,图中未画出)。
HashMap<String topic, List<QueueData>> topicQueueTable
:Key 是 Topic 的名称,它存储了所有 Topic 的属性信息。Value 是个 QueueData 列表,长度等于这个 Topic 数据存储的 Master Broker 的个数,QueueData 里存储着 Broker 的名称、读写 queue 的数量、同步标识等。HashMap<String BrokerName, BrokerData> brokerAddrTable
:这个构造存储着一个 BrokerName 对应的属性信息,包含所属的 Cluster 名称,一个 Master Broker 和多个 Slave Broker 的地址信息HashMap<String ClusterName, Set<String BrokerName>> clusterAddrTable
:存储的是集群中 Cluster 的信息,就是一个 Cluster 名称对应一个由 BrokerName 组成的汇合。HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable
:Key 是 BrokerAddr 对应着一台机器,BrokerLiveTable 存储的内容是这台 Broker 机器的实时状态,包含上次更新状态的工夫戳,NameServer 会定期检查这个工夫戳,超时没有更新就认为这个 Broker 有效了,将其从 Broker 列表里革除。HashMap<String BrokerAddr, List<String> FilterServer> filterServerTable
:Key 是 Broker 的地址,Value 是和这个 Broker 关联的多个 FilterServer 的地址。Filter Server 是过滤服务器,是 RocketMQ 的一种服务端过滤形式,一个 Broker 能够有一个或多个 Filter Server。
其余角色会被动向 NameServer 上报状态,依据上报音讯里的申请码做相应的解决,更新存储的对应信息。
- Broker 接到创立 Topic 的申请后向 NameServer 发送注册信息,NameServer 收到注册信息后首先更新 Broker 信息,而后对每个 Master 角色的 Broker,创立一个 QueueData 对象。如果是新建 Topic,就是增加 QueueData 对象;如果是批改 Topic,就是把旧的 QueueData 删除,退出新的 QueueData。
- Broker 向 NameServer 发送的心跳会更新工夫戳,NameServer 每 10 秒查看一次查看工夫戳,查看到工夫戳超过 2 分钟则认为 Broker 已生效,便会触发清理逻辑。
- 连贯断开的事件也会触发状态更新,当 NameServer 和 Broker 的长连贯断掉当前,onChannelDestroy 函数会被调用,把这个 Broker 的信息清理进来。
- Producer/Consumer 启动之后会和 NameServer 建设长连贯,定时从 NameServer 获取路由信息保留到本地。音讯的发送与拉取都会用到下面的数据。
那么多数据,置信大家看的有点晕,三此君简略总结下:NameServer 通过 brokerLiveInfo 来保护存活的 Broker。Producer 会获取下面的路由信息,发送音讯的时候指定发送到哪个 Topic,依据 Topic 能够从 topicQueueTable 抉择一个 Broker,依据 BrokerName 能够从 BrokerAddrTable 获取到 Broker IP 地址。有了地址 Producer 就能够将音讯通过网络传递给 Broker。
音讯收发示例
RocketMQ 部署
刚刚咱们理解 RocketMQ 整体架构,那怎么样通过 RocketMQ 收发音讯呢?须要先通过 Docker 部署一套 RocketMQ:
如果你没有装置 Docker,能够依据菜鸟教程 MacOS Docker 装置 /Windows Docker 装置 进行装置。而后,通过 docker-compose 部署 RocketMQ:
- 克隆 docker-middleware 仓库,关上 RocketMQ 目录;
- 批改
broker.conf
中的brokerIP1
参数为本机 IP; - 进入
docker-compose.yml
文件所在门路,执行docker-compose up
命令即可;
留神:如果你当初不理解 Docker 不重要,只须要依照步骤部署好 RocketMQ 即可,并不会妨碍咱们了解 RocketMQ 相干内容。
部署实现后咱们就能够在 Docker Dashboard 中看到 RocketMQ 相干容器,包含 Broker、NameServer 及 Console(RocketMQ 控制台),到这里咱们就能够应用部署的 RocketMQ 收发音讯了。
RocketMQ 曾经部署好了,接下来先来看一个简略的音讯收发示例,能够说是 RocketMQ 的 “Hello World”。
音讯发送
public class SyncProducer {public static void main(String[] args) throws Exception {
// 实例化音讯生产者 Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置 NameServer 的地址
producer.setNamesrvAddr("localhost:9876");
// 启动 Producer 实例
producer.start();
// 创立音讯,并指定 Topic,Tag 和音讯体
Message msg = new Message("Topic1","Tag", "Key",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送音讯到一个 Broker
SendResult sendResult = producer.send(msg);
// 通过 sendResult 返回音讯是否胜利送达
System.out.printf("%s%n", sendResult);
// 如果不再发送音讯,敞开 Producer 实例。producer.shutdown();}
}
- 首先,实例化一个生产者
producer
,并通知它 NameServer 的地址,这样生产者能力从 NameServer 获取路由信息。 - 而后
producer
得做一些初始化(这是很要害的步骤),它要和 NameServer 通信,要先建设通信连贯等。 producer
曾经筹备好了,那得筹备好要发的内容,把 “Hello World” 发送到 Topic1。- 内容筹备好,那
producer
就能够把音讯发送进来了。producer
怎么晓得 Broker 地址呢?他就会去 NameServer 获取路由信息,失去 Broker 的地址是 localhost:10909,而后通过网络通信将音讯发送给 Broker。 - 生产者发送的音讯通过网络传输给 Broker,Broker 须要对音讯依照肯定的构造进行存储。存储实现之后,把存储后果告知生产者。
音讯接管
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置 NameServer 的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个 Topic,以及 Tag 来过滤须要生产的音讯
erbconsumerijun.subscribe("sancijun", "*");
// 注册回调实现类来解决从 broker 拉取回来的音讯
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该音讯曾经被胜利生产
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();}
}
- 首先,实例化一个消费者
consumer
,通知它 NameServer 的地址,这样消费者能力从 NameServer 获取路由信息。 - 而后这个消费者须要晓得本人能够生产哪些 Topic 的音讯,也就是每个消费者须要订阅一个或多个 Topic。
- 消费者也须要做一些初始化,业务自身并没有理睬怎么从 Broker 拉取音讯,这些都是消费者石破天惊的贡献。所以,咱们须要启动消费者,消费者会从 NameServer 拉取路由信息,并一直从 Broker 拉取音讯。拉取回来的音讯提供给业务定义的 MessageListener。
- 音讯拉取回来后,生产这须要怎么解决呢?每个消费者都不一样(业务自身决定),由咱们业务定义的 MessageListener 解决。解决完之后,消费者也须要确认收货,就是通知 Broker 生产胜利了。
以上就是本文的全部内容,本文没有堆砌太多无意义的概念,没有讲什么削峰解耦,异步通信。这些内容网上也很多,看了和没看没什么两样。最初的最初,看懂的点赞,没看懂的珍藏,顺便在分享给你的小伙伴。还没有关注的敌人记得关注,入股不亏。
参考文献
- RocketMQ 官网文档
- RocketMQ 源码
- 丁威, 周继锋. RocketMQ 技术底细:RocketMQ 架构设计与实现原理. 机械工业出版社, 2019-01.
- 李伟. RocketMQ 分布式消息中间件:外围原理与最佳实际. 电子工业出版社, 2020-08.
- 杨开元. RocketMQ 实战与原理解析. 机械工业出版社, 2018-06.
转载请注明出处