共计 12471 个字符,预计需要花费 32 分钟才能阅读完成。
设计(design)
1 音讯存储
音讯存储是 RocketMQ 中最为简单和最为重要的一部分,本节将别离从 RocketMQ 的音讯存储整体架构、PageCache 与 Mmap 内存映射以及 RocketMQ 中两种不同的刷盘形式三方面来别离开展叙述。
1.1 音讯存储整体架构
音讯存储架构图中次要有上面三个跟音讯存储相干的文件形成。
(1) CommitLog:音讯主体以及元数据的存储主体,存储 Producer 端写入的音讯主体内容, 音讯内容不是定长的。单个文件大小默认 1G,文件名长度为 20 位,右边补零,残余为起始偏移量,比方 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。音讯次要是程序写入日志文件,当文件满了,写入下一个文件;
(2) ConsumeQueue:音讯生产队列,引入的目标次要是进步音讯生产的性能,因为 RocketMQ 是基于主题 topic 的订阅模式,音讯生产是针对主题进行的,如果要遍历 commitlog 文件中依据 topic 检索音讯是十分低效的。Consumer 即可依据 ConsumeQueue 来查找待生产的音讯。其中,ConsumeQueue(逻辑生产队列)作为生产音讯的索引,保留了指定 Topic 下的队列音讯在 CommitLog 中的起始物理偏移量 offset,音讯大小 size 和音讯 Tag 的 HashCode 值。consumequeue 文件能够看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织形式如下:topic/queue/file 三层组织构造,具体存储门路为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,别离为 8 字节的 commitlog 物理偏移量、4 字节的音讯长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,能够像数组一样随机拜访每一个条目,每个 ConsumeQueue 文件大小约 5.72M;
(3) IndexFile:IndexFile(索引文件)提供了一种能够通过 key 或工夫区间来查问音讯的办法。Index 文件的存储地位是:$HOME \store\index\${fileName},文件名 fileName 是以创立时的工夫戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 能够保留 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 构造,故 rocketmq 的索引文件其底层实现为 hash 索引。
在下面的 RocketMQ 的音讯存储整体架构图中能够看出,RocketMQ 采纳的是混合型的存储构造,即为 Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog)来存储。RocketMQ 的混合型存储构造 (多个 Topic 的音讯实体内容都存储于一个 CommitLog 中) 针对 Producer 和 Consumer 别离采纳了数据和索引局部相拆散的存储构造,Producer 发送音讯至 Broker 端,而后 Broker 端应用同步或者异步的形式对音讯刷盘长久化,保留至 CommitLog 中。只有音讯被刷盘长久化至磁盘文件 CommitLog 中,那么 Producer 发送的音讯就不会失落。正因为如此,Consumer 也就必定有机会去生产这条音讯。当无奈拉取到音讯后,能够等下一次音讯拉取,同时服务端也反对长轮询模式,如果一个音讯拉取申请未拉取到音讯,Broker 容许期待 30s 的工夫,只有这段时间内有新音讯达到,将间接返回给生产端。这里,RocketMQ 的具体做法是,应用 Broker 端的后盾服务线程—ReputMessageService 不停地散发申请并异步构建 ConsumeQueue(逻辑生产队列)和 IndexFile(索引文件)数据。
1.2 页缓存与内存映射
页缓存(PageCache)是 OS 对文件的缓存,用于减速对文件的读写。一般来说,程序对文件进行程序读写的速度简直靠近于内存的读写速度,次要起因就是因为 OS 应用 PageCache 机制对读写访问操作进行了性能优化,将一部分的内存用作 PageCache。对于数据的写入,OS 会先写入至 Cache 内,随后通过异步的形式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时呈现未命中 PageCache 的状况,OS 从物理磁盘上拜访读取文件的同时,会程序对其余相邻块的数据文件进行预读取。
在 RocketMQ 中,ConsumeQueue 逻辑生产队列存储的数据较少,并且是程序读取,在 page cache 机制的预读取作用下,Consume Queue 文件的读性能简直靠近读内存,即便在有音讯沉积状况下也不会影响性能。而对于 CommitLog 音讯存储的日志数据文件来说,读取音讯内容时候会产生较多的随机拜访读取,重大影响性能。如果抉择适合的零碎 IO 调度算法,比方设置调度算法为“Deadline”(此时块存储采纳 SSD 的话),随机读的性能也会有所晋升。
另外,RocketMQ 次要通过 MappedByteBuffer 对文件进行读写操作。其中,利用了 NIO 中的 FileChannel 模型将磁盘上的物理文件间接映射到用户态的内存地址中(这种 Mmap 的形式缩小了传统 IO 将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为间接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为须要应用内存映射机制,故 RocketMQ 的文件存储都应用定长构造来存储,不便一次将整个文件映射至内存)。
1.3 音讯刷盘
(1) 同步刷盘:如上图所示,只有在音讯真正长久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个胜利的 ACK 响应。同步刷盘对 MQ 音讯可靠性来说是一种不错的保障,然而性能上会有较大影响,个别实用于金融业务利用该模式较多。
(2) 异步刷盘:可能充分利用 OS 的 PageCache 的劣势,只有音讯写入 PageCache 即可将胜利的 ACK 返回给 Producer 端。音讯刷盘采纳后盾异步线程提交的形式进行,升高了读写提早,进步了 MQ 的性能和吞吐量。
2 通信机制
RocketMQ 音讯队列集群次要包含 NameServer、Broker(Master/Slave)、Producer、Consumer4 个角色,根本通信流程如下:
(1) Broker 启动后须要实现一次将本人注册至 NameServer 的操作;随后每隔 30s 工夫定时向 NameServer 上报 Topic 路由信息。
(2) 音讯生产者 Producer 作为客户端发送音讯时候,须要依据音讯的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息。如果没有则更新路由信息会从 NameServer 上从新拉取,同时 Producer 会默认每隔 30s 向 NameServer 拉取一次路由信息。
(3) 音讯生产者 Producer 依据 2)中获取的路由信息抉择一个队列(MessageQueue)进行音讯发送;Broker 作为音讯的接收者接管音讯并落盘存储。
(4) 音讯消费者 Consumer 依据 2)中获取的路由信息,并再实现客户端的负载平衡后,抉择其中的某一个或者某几个音讯队列来拉取音讯并进行生产。
从下面 1)~3)中能够看出在音讯生产者, Broker 和 NameServer 之间都会产生通信(这里只说了 MQ 的局部通信),因而如何设计一个良好的网络通信模块在 MQ 中至关重要,它将决定 RocketMQ 集群整体的音讯传输能力与最终的性能。
rocketmq-remoting 模块是 RocketMQ 音讯队列中负责网络通信的模块,它简直被其余所有须要网络通信的模块(诸如 rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和援用。为了实现客户端与服务器之间高效的数据申请与接管,RocketMQ 音讯队列自定义了通信协议并在 Netty 的根底之上扩大了通信模块。
2.1 Remoting 通信类构造
2.2 协定设计与编解码
在 Client 和 Server 之间实现一次音讯发送时,须要对发送的音讯进行一个协定约定,因而就有必要自定义 RocketMQ 的音讯协定。同时,为了高效地在网络中传输音讯和对收到的音讯读取,就须要对音讯进行编解码。在 RocketMQ 中,RemotingCommand 这个类在音讯传输过程中对所有数据内容的封装,岂但蕴含了所有的数据结构,还蕴含了编码解码操作。
Header 字段 | 类型 | Request 阐明 | Response 阐明 |
---|---|---|---|
code | int | 申请操作码,应答方依据不同的申请码进行不同的业务解决 | 应答响应码。0 示意胜利,非 0 则示意各种谬误 |
language | LanguageCode | 申请方实现的语言 | 应答方实现的语言 |
version | int | 申请方程序的版本 | 应答方程序的版本 |
opaque | int | 相当于 requestId,在同一个连贯上的不同申请标识码,与响应音讯中的绝对应 | 应答不做批改间接返回 |
flag | int | 辨别是一般 RPC 还是 onewayRPC 得标记 | 辨别是一般 RPC 还是 onewayRPC 得标记 |
remark | String | 传输自定义文本信息 | 传输自定义文本信息 |
extFields | HashMap<String, String> | 申请自定义扩大信息 | 响应自定义扩大信息 |
可见传输内容次要能够分为以下 4 局部:
(1) 音讯长度:总长度,四个字节存储,占用一个 int 类型;
(2) 序列化类型 & 音讯头长度:同样占用一个 int 类型,第一个字节示意序列化类型,前面三个字节示意音讯头长度;
(3) 音讯头数据:通过序列化后的音讯头数据;
(4) 音讯主体数据:音讯主体的二进制字节数据内容;
2.3 音讯的通信形式和流程
在 RocketMQ 音讯队列中反对通信的形式次要有同步 (sync)、异步(async)、单向(oneway)
三种。其中“单向”通信模式绝对简略,个别用在发送心跳包场景下,无需关注其 Response。这里,次要介绍 RocketMQ 的异步通信流程。
2.4 Reactor 多线程设计
RocketMQ 的 RPC 通信采纳 Netty 组件作为底层通信库,同样也遵循了 Reactor 多线程模型,同时又在这之上做了一些扩大和优化。
下面的框图中能够大抵理解 RocketMQ 中 NettyRemotingServer 的 Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为下面的 1)负责监听 TCP 网络连接申请,建设好连贯,创立 SocketChannel,并注册到 selector 上。RocketMQ 的源码中会主动依据 OS 的类型抉择 NIO 和 Epoll,也能够通过参数配置), 而后监听真正的网络数据。拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为下面的“N”,源码中默认设置为 3),在真正执行业务逻辑之前须要进行 SSL 验证、编解码、闲暇查看、网络连接治理,这些工作交给 defaultEventExecutorGroup(即为下面的“M1”,源码中默认设置为 8)去做。而解决业务操作放在业务线程池中执行,依据 RomotingCommand 的业务申请码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,而后封装成 task 工作后,提交给对应的业务 processor 解决线程池来执行(sendMessageExecutor,以发送音讯为例,即为下面的“M2”)。从入口到业务逻辑的几个步骤中线程池始终再减少,这跟每一步逻辑复杂性相干,越简单,须要的并发通道越宽。
线程数 | 线程名 | 线程具体阐明 |
---|---|---|
1 | NettyBoss_%d | Reactor 主线程 |
N | NettyServerEPOLLSelector_%d_%d | Reactor 线程池 |
M1 | NettyServerCodecThread_%d | Worker 线程池 |
M2 | RemotingExecutorThread_%d | 业务 processor 解决线程池 |
3 音讯过滤
RocketMQ 分布式音讯队列的音讯过滤形式有别于其它 MQ 中间件,是在 Consumer 端订阅音讯时再做音讯过滤的。RocketMQ 这么做是在于其 Producer 端写入音讯和 Consumer 端订阅音讯采纳拆散存储的机制来实现的,Consumer 端订阅音讯是须要通过 ConsumeQueue 这个音讯生产的逻辑队列拿到一个索引,而后再从 CommitLog 外面读取真正的音讯实体内容,所以说到底也是还绕不开其存储构造。其 ConsumeQueue 的存储构造如下,能够看到其中有 8 个字节存储的 Message Tag 的哈希值,基于 Tag 的音讯过滤正式基于这个字段值的。
次要反对如下 2 种的过滤形式
(1) Tag 过滤形式:Consumer 端在订阅音讯时除了指定 Topic 还能够指定 TAG,如果一个音讯有多个 TAG,能够用 || 分隔。其中,Consumer 端会将这个订阅申请构建成一个 SubscriptionData,发送一个 Pull 音讯的申请给 Broker 端。Broker 端从 RocketMQ 的文件存储层—Store 读取数据之前,会用这些数据先构建一个 MessageFilter,而后传给 Store。Store 从 ConsumeQueue 读取到一条记录后,会用它记录的音讯 tag hash 值去做过滤,因为在服务端只是依据 hashcode 进行判断,无奈准确对 tag 原始字符串进行过滤,故在音讯生产端拉取到音讯后,还须要对音讯的原始 tag 字符串进行比对,如果不同,则抛弃该音讯,不进行音讯生产。
(2) SQL92 的过滤形式:这种形式的大抵做法和下面的 Tag 过滤形式一样,只是在 Store 层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由 rocketmq-filter 模块负责的。每次过滤都去执行 SQL 表达式会影响效率,所以 RocketMQ 应用了 BloomFilter 防止了每次都去执行。SQL92 的表达式上下文为音讯的属性。
4 负载平衡
RocketMQ 中的负载平衡都在 Client 端实现,具体来说的话,次要能够分为 Producer 端发送音讯时候的负载平衡和 Consumer 端订阅音讯的负载平衡。
4.1 Producer 的负载平衡
Producer 端在发送音讯的时候,会先依据 Topic 找到指定的 TopicPublishInfo,在获取了 TopicPublishInfo 路由信息后,RocketMQ 的客户端在默认形式下 selectOneMessageQueue()办法会从 TopicPublishInfo 中的 messageQueueList 中抉择一个队列(MessageQueue)进行发送音讯。具体的容错策略均在 MQFaultStrategy 这个类中定义。这里有一个 sendLatencyFaultEnable 开关变量,如果开启,在随机递增取模的根底上,再过滤掉 not available 的 Broker 代理。所谓的 ”latencyFaultTolerance”,是指对之前失败的,按肯定的工夫做退却。例如,如果上次申请的 latency 超过 550Lms,就退却 3000Lms;超过 1000L,就退却 60000L;如果敞开,采纳随机递增取模的形式抉择一个队列(MessageQueue)来发送音讯,latencyFaultTolerance 机制是实现音讯发送高可用的外围关键所在。
4.2 Consumer 的负载平衡
在 RocketMQ 中,Consumer 端的两种生产模式(Push/Pull)都是基于拉模式来获取音讯的,而在 Push 模式只是对 pull 模式的一种封装,其本质实现为音讯拉取线程在从服务器拉取到一批音讯后,而后提交到音讯生产线程池后,又“快马加鞭”的持续向服务器再次尝试拉取音讯。如果未拉取到音讯,则提早一下又持续拉取。在两种基于拉模式的生产形式(Push/Pull)中,均须要 Consumer 端在晓得从 Broker 端的哪一个音讯队列—队列中去获取音讯。因而,有必要在 Consumer 端来做负载平衡,即 Broker 端中多个 MessageQueue 调配给同一个 ConsumerGroup 中的哪些 Consumer 生产。
1、Consumer 端的心跳包发送
在 Consumer 启动后,它就会通过定时工作一直地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(其中蕴含了,音讯生产分组名称、订阅关系汇合、音讯通信模式和客户端 id 的值等信息)。Broker 端在收到 Consumer 的心跳音讯后,会将它保护在 ConsumerManager 的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保留在本地缓存变量—channelInfoTable 中,为之后做 Consumer 端的负载平衡提供能够根据的元数据信息。
2、Consumer 端实现负载平衡的外围类—RebalanceImpl
在 Consumer 实例的启动流程中的启动 MQClientInstance 实例局部,会实现负载平衡服务线程—RebalanceService 的启动(每隔 20s 执行一次)。通过查看源码能够发现,RebalanceService 线程的 run()办法最终调用的是 RebalanceImpl 类的 rebalanceByTopic()办法,该办法是实现 Consumer 端负载平衡的外围。这里,rebalanceByTopic()办法会依据消费者通信类型为“播送模式”还是“集群模式”做不同的逻辑解决。这里次要来看下集群模式下的次要解决流程:
(1) 从 rebalanceImpl 实例的本地缓存变量—topicSubscribeInfoTable 中,获取该 Topic 主题下的音讯生产队列汇合(mqSet);
(2) 依据 topic 和 consumerGroup 为参数调用 mQClientFactory.findConsumerIdList()办法向 Broker 端发送获取该生产组下消费者 Id 列表的 RPC 通信申请(Broker 端基于后面 Consumer 端上报的心跳包数据而构建的 consumerTable 做出响应返回,业务申请码:GET_CONSUMER_LIST_BY_GROUP);
(3) 先对 Topic 下的音讯生产队列、消费者 Id 排序,而后用音讯队列调配策略算法(默认为:音讯队列的平均分配算法),计算出待拉取的音讯队列。这里的平均分配算法,相似于分页的算法,将所有 MessageQueue 排好序相似于记录,将所有生产端 Consumer 排好序相似页数,并求出每一页须要蕴含的均匀 size 和每个页面记录的范畴 range,最初遍历整个 range 而计算出以后 Consumer 端应该调配到的记录(这里即为:MessageQueue)。
(4) 而后,调用 updateProcessQueueTableInRebalance()办法,具体的做法是,先将调配到的音讯队列汇合(mqSet)与 processQueueTable 做一个过滤比对。
- 上图中 processQueueTable 标注的红色局部,示意与调配到的音讯队列汇合 mqSet 互不蕴含。将这些队列设置 Dropped 属性为 true,而后查看这些队列是否能够移除出 processQueueTable 缓存变量,这里具体执行 removeUnnecessaryMessageQueue()办法,即每隔 1s 查看是否能够获取以后生产解决队列的锁,拿到的话返回 true。如果期待 1s 后,依然拿不到以后生产解决队列的锁则返回 false。如果返回 true,则从 processQueueTable 缓存变量中移除对应的 Entry;
- 上图中 processQueueTable 的绿色局部,示意与调配到的音讯队列汇合 mqSet 的交加。判断该 ProcessQueue 是否曾经过期了,在 Pull 模式的不必管,如果是 Push 模式的,设置 Dropped 属性为 true,并且调用 removeUnnecessaryMessageQueue()办法,像下面一样尝试移除 Entry;
最初,为过滤后的音讯队列汇合(mqSet)中的每个 MessageQueue 创立一个 ProcessQueue 对象并存入 RebalanceImpl 的 processQueueTable 队列中(其中调用 RebalanceImpl 实例的 computePullFromWhere(MessageQueue mq)办法获取该 MessageQueue 对象的下一个进度生产值 offset,随后填充至接下来要创立的 pullRequest 对象属性中),并创立拉取申请对象—pullRequest 增加到拉取列表—pullRequestList 中,最初执行 dispatchPullRequest()办法,将 Pull 音讯的申请对象 PullRequest 顺次放入 PullMessageService 服务线程的阻塞队列 pullRequestQueue 中,待该服务线程取出后向 Broker 端发动 Pull 音讯的申请。其中,能够重点比照下,RebalancePushImpl 和 RebalancePullImpl 两个实现类的 dispatchPullRequest()办法不同,RebalancePullImpl 类外面的该办法为空,这样子也就答复了上一篇中最初的那道思考题了。
音讯生产队列在同一生产组不同消费者之间的负载平衡,其外围设计理念是在一个音讯生产队列在同一时间只容许被同一生产组内的一个消费者生产,一个音讯消费者能同时生产多个音讯队列。
5 事务音讯
Apache RocketMQ 在 4.3.0 版中曾经反对分布式事务音讯,这里 RocketMQ 采纳了 2PC 的思维来实现了提交事务音讯,同时减少一个弥补逻辑来解决二阶段超时或者失败的音讯,如下图所示。
5.1 RocketMQ 事务音讯流程概要
上图阐明了事务音讯的大抵计划,其中分为两个流程:失常事务音讯的发送及提交、事务音讯的弥补流程。
1. 事务音讯发送及提交:
(1) 发送音讯(half 音讯)。
(2) 服务端响应音讯写入后果。
(3) 依据发送后果执行本地事务(如果写入失败,此时 half 音讯对业务不可见,本地逻辑不执行)。
(4) 依据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成音讯索引,音讯对消费者可见)
2. 弥补流程:
(1) 对没有 Commit/Rollback 的事务音讯(pending 状态的音讯),从服务端发动一次“回查”
(2) Producer 收到回查音讯,查看回查音讯对应的本地事务的状态
(3) 依据本地事务状态,从新 Commit 或者 Rollback
其中,弥补阶段用于解决音讯 Commit 或者 Rollback 产生超时或者失败的状况。
5.2 RocketMQ 事务音讯设计
1. 事务音讯在一阶段对用户不可见
在 RocketMQ 事务音讯的次要流程中,一阶段的音讯如何对用户不可见。其中,事务音讯绝对一般音讯最大的特点就是一阶段发送的音讯对用户是不可见的。那么,如何做到写入音讯然而对用户不可见呢?RocketMQ 事务音讯的做法是:如果音讯是 half 音讯,将备份原音讯的主题与音讯生产队列,而后扭转主题为 RMQ_SYS_TRANS_HALF_TOPIC。因为生产组未订阅该主题,故生产端无奈生产 half 类型的音讯,而后 RocketMQ 会开启一个定时工作,从 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC 中拉取音讯进行生产,依据生产者组获取一个服务提供者发送回查事务状态申请,依据事务状态来决定是提交或回滚音讯。
在 RocketMQ 中,音讯在服务端的存储构造如下,每条音讯都会有对应的索引信息,Consumer 通过 ConsumeQueue 这个二级索引来读取音讯实体内容,其流程如下:
RocketMQ 的具体实现策略是:写入的如果事务音讯,对音讯的 Topic 和 Queue 等属性进行替换,同时将原来的 Topic 和 Queue 信息存储到音讯的属性中,正因为音讯主题被替换,故音讯并不会转发到该原主题的音讯生产队列,消费者无奈感知音讯的存在,不会生产。其实扭转音讯主题是 RocketMQ 的罕用“套路”,回忆一下延时音讯的实现机制。
2.Commit 和 Rollback 操作以及 Op 音讯的引入
在实现一阶段写入一条对用户不可见的音讯后,二阶段如果是 Commit 操作,则须要让音讯对用户可见;如果是 Rollback 则须要撤销一阶段的音讯。先说 Rollback 的状况。对于 Rollback,自身一阶段的音讯对用户是不可见的,其实不须要真正撤销音讯(实际上 RocketMQ 也无奈去真正的删除一条音讯,因为是程序写文件的)。然而区别于这条音讯没有确定状态(Pending 状态,事务悬而未决),须要一个操作来标识这条音讯的最终状态。RocketMQ 事务音讯计划中引入了 Op 音讯的概念,用 Op 音讯标识事务音讯曾经确定的状态(Commit 或者 Rollback)。如果一条事务音讯没有对应的 Op 音讯,阐明这个事务的状态还无奈确定(可能是二阶段失败了)。引入 Op 音讯后,事务音讯无论是 Commit 或者 Rollback 都会记录一个 Op 操作。Commit 绝对于 Rollback 只是在写入 Op 音讯前创立 Half 音讯的索引。
3.Op 音讯的存储和对应关系
RocketMQ 将 Op 音讯写入到全局一个特定的 Topic 中通过源码中的办法—TransactionalMessageUtil.buildOpTopic();这个 Topic 是一个外部的 Topic(像 Half 音讯的 Topic 一样),不会被用户生产。Op 音讯的内容为对应的 Half 音讯的存储的 Offset,这样通过 Op 音讯能索引到 Half 音讯进行后续的回查操作。
4.Half 音讯的索引构建
在执行二阶段 Commit 操作时,须要构建出 Half 音讯的索引。一阶段的 Half 音讯因为是写到一个非凡的 Topic,所以二阶段构建索引时须要读取出 Half 音讯,并将 Topic 和 Queue 替换成真正的指标的 Topic 和 Queue,之后通过一次一般音讯的写入操作来生成一条对用户可见的音讯。所以 RocketMQ 事务音讯二阶段其实是利用了一阶段存储的音讯的内容,在二阶段时复原出一条残缺的一般音讯,而后走一遍音讯写入流程。
5. 如何解决二阶段失败的音讯?
如果在 RocketMQ 事务音讯的二阶段过程中失败了,例如在做 Commit 操作时,呈现网络问题导致 Commit 失败,那么须要通过肯定的策略使这条音讯最终被 Commit。RocketMQ 采纳了一种弥补机制,称为“回查”。Broker 端对未确定状态的音讯发动回查,将音讯发送到对应的 Producer 端(同一个 Group 的 Producer),由 Producer 依据音讯来查看本地事务的状态,进而执行 Commit 或者 Rollback。Broker 端通过比照 Half 音讯和 Op 音讯进行事务音讯的回查并且推动 CheckPoint(记录那些事务音讯的状态是确定的)。
值得注意的是,rocketmq 并不会无休止的的信息事务状态回查,默认回查 15 次,如果 15 次回查还是无奈得悉事务状态,rocketmq 默认回滚该音讯。
6 音讯查问
RocketMQ 反对依照上面两种维度(“依照 Message Id 查问音讯”、“依照 Message Key 查问音讯”)进行音讯查问。
6.1 依照 MessageId 查问音讯
RocketMQ 中的 MessageId 的长度总共有 16 字节,其中蕴含了音讯存储主机地址(IP 地址和端口),音讯 Commit Log offset。“依照 MessageId 查问音讯”在 RocketMQ 中具体做法是:Client 端从 MessageId 中解析出 Broker 的地址(IP 地址和端口)和 Commit Log 的偏移地址后封装成一个 RPC 申请后通过 Remoting 通信层发送(业务申请码:VIEW_MESSAGE_BY_ID)。Broker 端走的是 QueryMessageProcessor,读取音讯的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个残缺的音讯返回。
6.2 依照 Message Key 查问音讯
“依照 Message Key 查问音讯”,次要是基于 RocketMQ 的 IndexFile 索引文件来实现的。RocketMQ 的索引文件逻辑构造,相似 JDK 中 HashMap 的实现。索引文件的具体构造如下:
IndexFile 索引文件为用户提供通过“依照 Message Key 查问音讯”的音讯索引查问服务,IndexFile 文件的存储地位是:$HOME\store\index\${fileName},文件名 fileName 是以创立时的工夫戳命名的,文件大小是固定的,等于 40+500W*4+2000W*20= 420000040 个字节大小。如果音讯的 properties 中设置了 UNIQ_KEY 这个属性,就用 topic +“#”+ UNIQ_KEY 的 value 作为 key 来做写入操作。如果音讯设置了 KEYS 属性(多个 KEY 以空格分隔),也会用 topic +“#”+ KEY 来做索引。
其中的索引数据蕴含了 Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共 20 Byte。NextIndex offset 即后面读出来的 slotValue,如果有 hash 抵触,就能够用这个字段将所有抵触的索引用链表的形式串起来了。Timestamp 记录的是音讯 storeTimestamp 之间的差,并不是一个相对的工夫。整个 Index File 的构造如图,40 Byte 的 Header 用于保留一些总的统计信息,4*500W 的 Slot Table 并不保留真正的索引数据,而是保留每个槽位对应的单向链表的头。20*2000W 是真正的索引数据,即一个 Index File 能够保留 2000W 个索引。
“依照 Message Key 查问音讯”的形式,RocketMQ 的具体做法是,次要通过 Broker 端的 QueryMessageProcessor 业务处理器来查问,读取音讯的过程就是用 topic 和 key 找到 IndexFile 索引文件中的一条记录,依据其中的 commitLog offset 从 CommitLog 文件中读取音讯的实体内容。
本文由 mdnice 多平台公布