关于阿里云开发者:全面升级-Apache-RocketMQ-50-SDK-的新面貌

41次阅读

共计 8311 个字符,预计需要花费 21 分钟才能阅读完成。

简介: 长久以来,RocketMQ 易于部署、高性能、高可用的架构,撑持了数十年来团体内外海量的业务场景。时至今日,为了迎接现在云原生时代的新挑战,咱们重磅推出了 RocketMQ 5.0 新架构。

作者 | 凌楚

引言

长久以来,RocketMQ 易于部署、高性能、高可用的架构,撑持了数十年来团体内外海量的业务场景。时至今日,为了迎接现在云原生时代的新挑战,咱们重磅推出了 RocketMQ 5.0 新架构。

在 5.0 新架构中,咱们更新了整个 RocketMQ 的网络拓扑模型,着眼于将更下层的业务逻辑从 broker 中剥离到无状态的 proxy,这样独立的计算节点能够无损地承当日后的降级公布工作,与此同时将 broker 解放出来承当纯正的存储工作,为将来打造更强的音讯存储引擎做好铺垫。通信层方面,出于标准化,多语言的思考咱们摒弃了 RocketMQ 应用多年的 RemotingCommand 协定,采纳了 gRPC 来实现客户端与服务端之间的通信逻辑。

针对于用户侧,咱们心愿尽可能少的叨扰客户进行降级,维持逻辑轻量,易于保护,可观测性良好,可能能够达到“一次性把事件做对”。

目前,保障了接口齐全兼容的,基于 RocketMQ 5.0 的商业化版本 Java SDK 曾经在私有云 release 实现,开源版本也行将 release。SDK 将同时反对云上 proxy 架构的云上版本和开源版本的 Broker。上面将开展叙述 RocketMQ 5.0 新架构下的 SDK 做了哪些迭代与演进。

全面异步化

1、异步的初衷

因为波及诸多的网络 IO,因而 RocketMQ 对音讯发送凋谢了同步和异步两套 API 提供给用户应用。旧有架构从 API 针对于同步和异步保护了两套相似的业务逻辑,十分不利于迭代。思考到这一点,此次新架构 SDK 心愿在底层就能够将它们对立起来。

以音讯发送为例,一个残缺的音讯发送链路包含获取:

  1. 获取 topic 对应的路由;
  2. 依据路由抉择对应的分区;
  3. 发送音讯到指定的分区,如果发送到该分区失败,则对下一个分区进行发送重试直到达到最大重试次数;如果发送胜利,则返回发送后果。

其中从远端获取 topic 对应的路由是一个重 IO 操作,而发送音讯自身也是一个重 IO 操作。在以往的发送实现中,即便是异步发送,对于路由的获取也是同步的,路由的获取自身并没有计入用户的发送耗时中,用户自身是能够自主设置音讯发送的超时工夫的,而因为自身音讯的发送是同步的,无奈做到超时工夫的精准管制,而在应用异步 Future 之后,能够十分不便地通过管制 Future 的超时工夫来做到。

2、异步对立所有实现

实质上 RocketMQ 里所有的重 IO 操作都能够通过异步来进行对立。得益于 gRPC 自身提供了基于 Future 的 stub,咱们将网络层的 Future 一层层串联到最终的业务层。当用户须要同步 API 时,则进行同步期待;当用户须要异步 API 时,则在最外层的 Future 增加回调进行监听。

实际上基于 Future 设计的思维是贯通整个客户端实现的。譬如,音讯生产也是通过惟一的基于 Future 的实现来实现的:

/** * Deliver message to listener. * * @param messageExtList message list to consume * @param delay message consumption delay time * @param timeUnit delay time unit. * @return future which contains the consume status. */public ListenableFuture<ConsumeStatus> consume(List<MessageExt> messageExtList, long delay, TimeUnit timeUnit) {// Omit the implement}

针对于程序音讯生产失败这种须要本地 suspend 一段时间从新投递的状况,生产接口减少了延时参数。然而无论是一般音讯还是程序音讯,都只会返回含有生产状态的 Future。下层再针对含有生产状态的 Future 来进行音讯的 ACK/NACK。特地地,针对于服务端向客户端投递特定音讯进行生产验证的场景,也是调用以后 Future 接口,再对生产后果进行包装向服务端响应生产后果。

RocketMQ 自身的发送和生产过程中充斥着大量的异步逻辑,应用 Future 使得大量的接口实现失去了精简和对立。尤其在咱们的基于 gRPC 新架构协定的 IDL 中,为了放弃简略全部都是应用 unary rpc(非流式),使得咱们全副能够应用 gRPC 的 Future stub 来实现通信申请。

可观测性加强

下面这张图来自于 Peter Bourgon 2017 年的一篇重要博文,零碎且具体地论述了 metrics、tracing 和 logging 三者之间的特色与定义,以及他们之间的关联。

  • Metrics:具体聚合同类数据的统计信息,用于预警和监控。
  • Tracing:关联和剖析同一个调用链上的元数据,判断具体调用链上的异样和阻塞行为。
  • Logging:记录离散的事件来分析程序的行为。

云原生时代,可观测性是云产品的外围竞争力之一。因此可观测性加强的基调是整个新架构开发之初就曾经确定的。旧有架构客户端逻辑简单的同时,可观测性的缺失也导致咱们在面临客户工单时更加不足足够直观简便的伎俩,因而新架构中咱们围绕 Tracing、Logging 和 Metrics 这三个重要方面进行了全方位的可察看性晋升。

1、全链路 Tracing

Tracing 体现在消息中间件中,最根本的,即对每条音讯自身的发送、拉取、生产、ACK/NACK、事务提交、存储、删除等过程进行全生命周期的监控记录,在 RocketMQ 中最根本的实现就是音讯轨迹。

旧有的音讯轨迹采纳公有协定进行编解码,对于音讯生命周期的观测也仅限于发送、生产和事务相干等阶段。没有和开源标准进行对立,也不具备音讯本身的轨迹和用户链路的 trace 共享上下文的能力。

新的实现中,拥抱了最新的 CNCF OpenTelemetry 社区协定标准,在客户端中嵌入了一个 OTLP exporter 将 tracing 数据批量发送至 proxy,proxy 侧的计划则比拟多样了,既能够自身作为一个 collector 将数据进行整合,也能够转发至其余的 collector,proxy 侧也会有绝对应的 tracing 数据,会和客户端上报来的 tracing 数据合并进行解决。

因为采纳开源规范的 OTLP exporter 和协定,使得用户本人定义对应的 collector 地址成为可能。在商业版本中咱们将用户客户端的 tracing 数据和服务端的 tracing 数据进行收集整合后进行托管存储,开源版本中用户也能够自定义本人的 collector 地址将 tracing 数据上报到本人的平台进行剖析和解决。

针对于整个音讯的生命周期,咱们从新设计了所有的 span 拓扑模型。以最简略的音讯发送、承受、生产和 ACK/NACK 过程为例:

其中:

  • Prod:Produce,示意音讯的发送,即起始工夫为音讯开始发送,完结工夫为收到音讯发送后果(音讯外部重试会独自进行记录一条 span);
  • Recv:Receive,示意音讯的接管,即起始工夫为客户端发动承受音讯的申请,完结工夫为收到对应的响应;
  • Await:示意音讯达到客户端直到音讯开始被生产;
  • Proc:Process,示意音讯的生产过程;
  • Ack/Nack 示意音讯被 Ack/Nack 的过程。

这个过程,各个 Span 之间的关系如下:

商业版的 ONS 在管控侧也对新版本 Trace 进行了反对,针对于用户关怀的音讯生产耗时、具体生产情况、生产耗时、期待耗时,生产次数等给出了更加详尽的展现。

通过 SLS 的 trace 服务察看生产者和消费者 span 的拓扑关系(link 关系没有进行展现,因而图中没有 receive 相干的 span):

OpenTelemetry 对于 messaging span 相干的 specification 也在社区一直迭代,这波及到具体的 tracing 拓扑,span 属性定义(即 attribute semantic conventions)等等。咱们也在第一工夫将 RocketMQ 相干的内容向社区 OpenTelemetry specification 发动了初步的 Pull Request,并失去了社区的收录和必定。也得益于 OpenTelemetry specification 详尽和标准的定义,咱们在 tracing 数据减少了包含且不限于程序运行时、操作系统环境和版本等(即 resource semantic conventions)大量有利于线上问题发现和排查的信息。

对于 tracing context propagation,咱们采纳了 W3C 的规范对 trace context 进行序列化和反序列化在客户端和服务端之间来回传递,在下个版本中也会提供让用户自定义 trace context 的接口,使得用户能够很不便地关联 RocketMQ 和本人的 tracing 数据。

新架构中咱们针对于音讯生命周期的不同节点,裸露了很多 hook point,tracing 的逻辑也基于这些 hook point 进行实现,因而也能放弃绝对独立。在残缺的新架构推向开源之后,整个 tracing 的相干逻辑也会被抽取成专门的 instrumentation library 奉献给 openTelemetry 社区。

2、精确多样的 Metrics

Tracing 更多地是从调用链的角度去察看音讯的走向,更多的时候对于有共性的数据,咱们心愿能够有聚合好的 Metrics  和对应 dashboard 能够从更加宏观的角度来进行观测。如果说 tracing 能够帮忙更好更快地发现问题和定位问题,那么 Metrics 则提供了重要的多维察看和预警伎俩。

在收集到足够多的 tracing 数据之后,服务端会对这些数据进行二次聚合,计算得出用户发送、期待以及生产工夫等数据的百分位数,对很多毛刺问题能很好地做出判断。

3、规范化的 Logging

咱们在开发实际中严格地依照 Trace、Debug、Info、Warn、Error 的级别进行日志内容的定义,譬如 Trace 级别就会对每个 RPC 申请和响应,每条音讯从进入客户端到进行记录,Error 级别的日志一旦被打印,必然是值得咱们和客户关注的。在去除大量冗余信息的同时,要害节点,譬如负载平衡,发送失败重试等要害链路也补全了大量信息,单行日志的信息密度大大增加。

另外,对于日志模块的实现,RocketMQ 本来是自行开发的,相比拟于 logback,log4j2 等内部实现而言,性能绝对繁多,二次开发老本也绝对较高。选型时没有应用 logback 基本上其实只是想要防止与用户日志模块抵触的问题,在调研了诸多计划之后,抉择了 shade logback 的形式进行了替换。这里的 shade 不仅仅只是替换了包名和坐标,同时也批改了 logback 官网的日志配置文件名和诸多外部环境参数。

比方默认配置文件:

<span class=”lake-fontsize-1515″> 库 </span> <span class=”lake-fontsize-1515″> 默认配置文件 </span>
<span class=”lake-fontsize-1515″>standard logback</span> <span class=”lake-fontsize-1515″>logback.groovy/logback-test.xml/logback.xml</span>
<span class=”lake-fontsize-1515″>logback for rocketmq</span> <span class=”lake-fontsize-1515″>rocketmq.logback.groovy/rocketmq.logback-test.xml/rocketmq.logback.xml</span>

如果用户在援用 rocketmq 的同时本人也引入了 logback,残缺的配置文件和环境参数的隔离保障了两者是互相独立的。特地的,因为新架构 SDK 中引入了 gRPC,咱们将 gRPC 基于 JUL 的日志桥接到了 slf4j,并通过 logback 进行输入。

`
// redirect JUL logging to slf4j.// see https://github.com/grpc/grpc-…();SLF4JBridgeHandler.install();
`

# 生产模型的更新

RocketMQ 旧有架构的生产模型是非常复杂的。topic 中的音讯自身依照 MessageQueue 进行存储,生产时客户端按 MessageQueue 对音讯进行拉取、缓存和投递。

ProcessQueue 与 RocketMQ 中的 MessageQueue 一一对应,也基本上是客户端生产端逻辑中最为简单的构造之一。在旧架构的客户端中,拉取到音讯之后会先将音讯缓存到 ProcessQueue 中,当须要生产时,会从 ProcessQueue 中取出对应的音讯进行生产,当生产胜利之后再将音讯从 ProcessQueue 中 remove 走。其中重试音讯的发送,位点的更新在这个过程中交叉。

## 1、设计思路

在新客户端中,pop 生产模式的引入使得独自解决重试音讯和位点更新的逻辑被去除。用户的消费行为变为

1. 拉取音讯
2. 生产音讯
3. ACK/NACK 音讯到远端

因为拉取到的音讯在客户端内存是会先进行缓存,因而还要在生产和拉取的过程中计算音讯缓存的大小来对程序进行爱护,因而新客户端中每个 ProcessQueue 别离保护了两个队列:cached messages 和 pending messages。音讯在达到客户端之后会先放在 cached messages 里,筹备生产时会从 cached messages 挪动到 pending messages 中,当音讯生产完结并被 Ack 之后则会从 pending messages 中移除。

新架构的客户端精简了 ProcessQueue 的实现,封装性也做到了更好。对于消费者而言,最为外围的接口其实只有四个。

`
public interface ProcessQueue {/ Try to take messages from cache except FIFO messages. @param batchMaxSize max batch size to take messages. @return messages which have been taken. / List<MessageExt> tryTakeMessages(int batchMaxSize); / Erase messages which haven been taken except FIFO messages. @param messageExtList messages to erase. @param status consume status. / void eraseMessages(List<MessageExt> messageExtList, ConsumeStatus status); / Try to take FIFO message from cache. @return message which has been taken, or {@link Optional#absent()} if no message. / Optional<MessageExt> tryTakeFifoMessage(); / Erase FIFO message which has been taken. @param messageExt message to erase. @param status consume status. */ void eraseFifoMessage(MessageExt messageExt, ConsumeStatus status);}
`

对于一般消费者(非程序生产)而言,ProcessQueue#tryTakeMessages 将从 Cached messages 中取出音讯(取出之后音讯会主动从 Cached messages 挪动至 Pending messages),当音讯生产完结之后再携带好对应的生产后果去调用 ProcessQueue#eraseMessages,对于程序消费者而言,惟一不同的是对应的办法调用替换成 ProcessQueue#tryTakeFifoMessage 和 ProcessQueue#eraseFifoMessage。
而 ProcessQueue#tryTakeMessages 和 ProcessQueue#tryTakeFifoMessage 自身曾经蕴含了生产限流和程序生产时为了保障程序对队列上锁的逻辑,即做到了:一旦 ProcessQueue#tryTakeMessages/ProcessQueue#tryTakeFifoMessage 能够取到音讯,那么音讯肯定是满足被生产条件的。当消费者获取到生产后果之后,再带上生产后果执行 ProcessQueue#eraseMessage 和 ProcessQueue#eraseFifoMessage,erase 的过程会实现音讯的 ACK/NACK 和程序生产时队列解锁的逻辑。

简化之后,下层的生产逻辑基本上只须要负责往生产线程中提交生产工作即可了,任何说得上是 ‘Process’ 的逻辑都在新的 ProcessQueue 实现了闭环。

# 兼容性与品质保障

整个新架构的 SDK 依赖了 protocol buffers, gRPC-java, openTelemetry-java 等诸多类库。在简化 RocketMQ 自身代码的同时也带来了一些兼容性问题。RocketMQ 自身放弃着对 Java 1.6 的兼容性,然而:

* gRPC-java 在 2018 年的 1.15 版本之后不再反对 Java1.6;
* openTelemetry-java 只反对 Java8 及以上版本。

在此期间,咱们也调研了 AWS、Azure 等友商相干 SDK 的现状,发现放弃对 Java 1.6 的反对曾经是业内规范做法。但囿于老客户猛攻 Java 1.6 的状况,咱们也进行了一些革新:

* 对 protocol buffers 的代码进行了 Java 1.6 的等义替换,并通过了 protocol buffers 所有的单测;
* 对 gRPC 的代码进行了 Java 1.6 的等义替换,并通过了 gRPC 所有的单测;
* 对于 openTelemetry,在进行等义替换的同时进行了大量的功能性测试;

单测方面,目前客户端保障了 75% 以上的行覆盖率,不过相比拟优良的开源我的项目还有比拟长的间隔,这一点咱们也会在后续的迭代中不断完善。

# 最初

RocketMQ 5.0 是自开源以来架构降级最大的一次版本,具体实现过程还有十分多的细节没有披露,碍于篇幅无奈八面玲珑,后续开源过程中也欢送大家在社区中提出更多更贵重的意见。

## 相干链接

* Pull Requesthttps://github.com/open-telemetry/opentelemetry-specification/pull/1904

* Metrics, Tracing, and Logginghttps://peter.bourgon.org/blog/2017/02/21/metrics-tracing-and-logging.html
* Apache rocketmq apishttps://github.com/apache/rocketmq-apis
* OpenTelemetry specificationhttps://github.com/open-telemetry/opentelemetry-specification

阿里巴巴云原生消息中间件团队招聘中,强烈欢送大家自荐和举荐!

有意者请分割:

@凌楚 (yangkun.ayk@alibaba-inc.com) 

@尘央 (xinyuzhou.zxy@alibaba-inc.com)

点击下方链接,查看更多招聘详情!

https://job.alibaba.com/zhaopin/position\_detail.htm?spm=a2obv.11410903.0.0.674944f6oxzDCj&positionId=134677

> 版权申明: 本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

正文完
 0