共计 3841 个字符,预计需要花费 10 分钟才能阅读完成。
* 作者:倪泽,RocketMQ 资深贡献者,RocketMQ-Streams 维护者之一,阿里云技术专家。
*
RocketMQ-Streams 是一款轻量级流解决引擎,利用以 SDK 的模式嵌入并启动,即可进行流解决计算,不依赖于其余组件,最低 1 核 1G 可部署,在资源敏感场景具备很大劣势。同时它反对 UTF/UTAF/UTDF 多种计算类型。目前曾经宽泛使用于平安,风控,边缘计算等场景。
本期将率领大家从源码的角度,解析 RocketMQ-Streams 的构建,数据流转过程。也会探讨 RocketMQ-Streams 是如何实现故障复原和扩缩容的。
应用示例
代码示例:
public class RocketMQWindowExample {public static void main(String[] args) {DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
source.fromRocketmq(
"topicName",
"groupName",
false,
"namesrvAddr")
.map(message -> JSONObject.parseObject((String) message))
.window(TumblingWindow.of(Time.seconds(10)))
.groupBy("groupByKey")
.sum("字段名", "输入别名")
.count("total")
.waterMark(5)
.setLocalStorageOnly(true)
.toDataSteam()
.toPrint(1)
.start();}
}
pom 文件依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
<version>1.0.1-preview</version>
</dependency>
上述代码是一个简略的应用例子,它次要的性能是从 RocketMQ 中指定 topic 读取数据,通过转化成 JSON 格局,以 groupByKey 字段值分组、10 秒一个窗口,对 OutFlow 字段值进行累加,后果输入到 total 字段,并打印到管制台上。上述计算中还容许输出乱序 5 秒,即窗口工夫达到后不会马上触发,而是会期待 5s,如果这个段时间内,有窗口数据达到仍然无效。上述 setLocalStorageOnly 为 true 示意不对状态进行近程存储,仅应用 RocksDB 做本地存储。目前 1.0.1 的 RocketMQ-Streams 版本仍然应用 Mysql 作为近程状态存储,下一版本将应用 RocketMQ 作为近程状态存储。
RocketMQ 总体架构图
RocketMQ-Streams 作为轻量流解决引擎,实质上是作为 RocketMQ 的客户端生产数据,一个流解决实例能够解决多个队列,而一个队列只能被一个实例生产。若干 RocketMQ-Streams 实例组成消费者组独特生产数据,通过扩容实例达到减少解决能力的生产,缩小实例则会产生 rebalance,生产的队列主动重均衡到其余生产实例上。从上述图中,咱们还能够看出计算实例间不须要间接替换任何数据,可各自独立实现所有计算解决。这种架构简化了 RocketMQ-Streams 自身的设计,同时也可十分不便的进行实例扩缩容。
解决拓扑
处理器拓扑为利用定义了流处理过程的计算逻辑,它由一系列的处理器节点和数据流向组成。例如,在结尾的代码示例中,整个解决拓扑由 source、map、groupBy、sum、count、print 等解决节点组成。有两种非凡的解决节点:
• source 节点
他没有任何上游节点,从内部读入数据到 RocketMQ-Streams,并交由上游解决。
• sink 节点
他没有任何上游节点,他将解决后的数据写出到内部。
解决拓扑仅仅是流解决代码的逻辑形象,在流计算启动时将会被实例化。为了设计简略,目前一个流解决实例中仅有一张计算拓扑。
在所有流解决算子之中,有两种特地的算子,一种是波及数据分组的算子 groupBy,另一种是有状态计算例如 count 等。这两种算子会影响整个计算拓扑的构建,上面将具体分析 RocketMQ-Streams 是如何解决他们的。
groupBy
分组算子 groupBy 非凡是因为通过 groupBy 操作,后续算子冀望对雷同 key 的数据进行操作,例如通过 groupBy(“ 年级 ”)之后再进行 sum 就是对依照年级分组求和,这就要求须要将具备雷同“年级”的数据从新路由到一个流计算实例上解决,如果不这样做,每个实例上得出的后果都将是不残缺的,整体输入后果也将是谬误的。
RocketMQ-Streams 采纳 shuffle topic 这种形式来解决。具体说来,计算实例将 groupBy 数据从新发回 RocketMQ 的一个 topic,并且在发回过程中依照 key 的 hash 值来抉择指标队列,再从这个 topic 读取数据进行后续流解决。依照 key hash 后雷同的 key 肯定在一个队列外面,而一个队列只会被一个流解决实例生产,这样就达到雷同 key 被路由到一个实例上解决的成果。
有状态算子
有状态算子与无状态算子绝对。如果计算结果只与以后输出无关,和上一次输出无关就是无状态算子,例如 filter、map、foreach 后果只与以后输出有关系。还有一种算子的输入后果不仅与以后算子有关系还与上一次输出无关,例如 sum,须要对一段时间内输出进行求和,他就是有状态算子。
RocketMQ-Streams 利用 RocksDB 作为本地存储,Mysql 作为近程存储来保留状态数据。他具体做法是:
- 当发现音讯来自新的队列时,查看是否须要加载状态,如果须要异步加载状态到 RocksDB。
- 数据达到有状态算子时,如果加载实现应用 RocksDB 中状态进行计算,如果没有,应用 Mysql 中状态计算。
- 计算实现后,将状态数据保留到 RocksDB 和 Mysql 中。
- 窗口触发后,从 RocksDB 中查问出状态数据,并将后果向上游算子传递。
整体数据流向图如下:
扩缩容与故障复原
扩缩容和故障复原是一个硬币的两面,即同一个事物的两种表白,计算集群如果能正确扩缩容就等于具备故障复原的能力,反之亦然。通过后面介绍咱们晓得,RocketMQ-Streams 具备十分良好的扩缩容性能,扩容时只须要新部署一个流计算实例即可,缩容时进行计算实例即可。对于无状态的计算来说比较简单,扩容后,数据计算不须要之前的状态。有状态计算的扩缩容波及到状态的迁徙。有状态的扩缩容可由下图示意:
当计算实例从 3 个缩容到 2 个,借助于 RocketMQ 的 rebalance,MQ 会在计算实例之间重新分配。
Instance1 上生产的 MQ2 和 MQ3 被调配到 Instance2 和 Instance3 上,这两个 MQ 的状态数据也须要迁徙到 Instance2 和 Instance3 上,这也暗示,状态数据是依据源数据分片保留的;扩容则是刚好相同的过程。
具体实现上,RocketMQ-Streams 采纳零碎音讯来触发状态的加载和长久化。
零碎音讯类别:
// 新增生产队列
NewSplitMessage
// 不在生产某个队列
RemoveSplitMessage
// 客户端长久化生产位点到 MQ
CheckPointMessage
当发现音讯来自一个新的 RocketMQ 队列(MessageQueue),RocketMQ-Streams 之前没有解决过来自该队列的音讯,会先于数据前发送 NewSplitMessage 音讯,通过解决拓扑上游算子传递,当有状态算子收到该音讯时会将新增队列对应的状态加载到本地内存 RocksDB 中,当数据真正达到时,就依据这个状态持续计算。
当因为计算实例减少或者 RocketMQ 集群变动,rebalance 后,计算实例不再生产某个队列(MessageQueue)时,会收回 RemoveSplitMessage 音讯,有状态算子删除本地 RocksDB 中的状态。
CheckPointMessage 是一种特地的零碎音讯,他的作用与实现 exactly-once 无关。咱们在扩缩容过程中须要做到 exactly-once,能力保障扩缩容或故障复原对计算结果没有影响。RocketMQ-streams 向 broker 提交生产 offset 前会产生 CheckPointMessage 音讯,向上游拓扑传递,他将保障行将提交生产位点的所有音讯都曾经被 sink 解决掉。
* 开源地址:
RocketMQ-Streams 仓库地址:
https://github.com/apache/roc…
RocketMQ 仓库地址:
https://github.com/apache/roc…*
退出 Apache RocketMQ 社区
十年铸剑,Apache RocketMQ 的成长离不开寰球靠近 500 位开发者的积极参与奉献,置信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅能够结识社区大牛,晋升技术水平,也能够晋升集体影响力,促成本身成长。
社区 5.0 版本正在进行着热火朝天的开发,另外还有靠近 30 个 SIG(兴趣小组)等你退出,欢送立志打造世界级分布式系统的同学退出社区,增加社区开发者微信:rocketmq666 即可进群,参加奉献,打造下一代音讯、事件、流交融解决平台。