*作者:倪泽,RocketMQ 资深贡献者, RocketMQ-Streams 维护者之一,阿里云技术专家。
*
RocketMQ-Streams 是一款轻量级流解决引擎,利用以SDK 的模式嵌入并启动,即可进行流解决计算,不依赖于其余组件,最低1核1G可部署,在资源敏感场景具备很大劣势。同时它反对 UTF/UTAF/UTDF 多种计算类型。目前曾经宽泛使用于平安,风控,边缘计算等场景。
本期将率领大家从源码的角度,解析RocketMQ-Streams的构建,数据流转过程。也会探讨RocketMQ-Streams是如何实现故障复原和扩缩容的。
应用示例
代码示例:
public class RocketMQWindowExample { public static void main(String[] args) { DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); source.fromRocketmq( "topicName", "groupName", false, "namesrvAddr") .map(message -> JSONObject.parseObject((String) message)) .window(TumblingWindow.of(Time.seconds(10))) .groupBy("groupByKey") .sum("字段名", "输入别名") .count("total") .waterMark(5) .setLocalStorageOnly(true) .toDataSteam() .toPrint(1) .start(); }}
pom文件依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams-clients</artifactId> <version>1.0.1-preview</version></dependency>
上述代码是一个简略的应用例子,它次要的性能是从RocketMQ中指定topic读取数据,通过转化成JSON格局,以groupByKey字段值分组、10秒一个窗口,对OutFlow字段值进行累加,后果输入到total字段,并打印到管制台上。上述计算中还容许输出乱序5秒,即窗口工夫达到后不会马上触发,而是会期待5s,如果这个段时间内,有窗口数据达到仍然无效。上述setLocalStorageOnly为true示意不对状态进行近程存储,仅应用RocksDB做本地存储。目前1.0.1的RocketMQ-Streams版本仍然应用Mysql作为近程状态存储,下一版本将应用RocketMQ作为近程状态存储。
RocketMQ总体架构图
RocketMQ-Streams 作为轻量流解决引擎,实质上是作为RocketMQ 的客户端生产数据,一个流解决实例能够解决多个队列,而一个队列只能被一个实例生产。若干RocketMQ-Streams 实例组成消费者组独特生产数据,通过扩容实例达到减少解决能力的生产,缩小实例则会产生rebalance,生产的队列主动重均衡到其余生产实例上。从上述图中,咱们还能够看出计算实例间不须要间接替换任何数据,可各自独立实现所有计算解决。这种架构简化了RocketMQ-Streams 自身的设计,同时也可十分不便的进行实例扩缩容。
解决拓扑
处理器拓扑为利用定义了流处理过程的计算逻辑,它由一系列的处理器节点和数据流向组成。例如,在结尾的代码示例中,整个解决拓扑由source、map、groupBy、sum、count、print等解决节点组成。有两种非凡的解决节点:
• source节点
他没有任何上游节点,从内部读入数据到RocketMQ-Streams,并交由上游解决。
• sink节点
他没有任何上游节点,他将解决后的数据写出到内部。
解决拓扑仅仅是流解决代码的逻辑形象,在流计算启动时将会被实例化。为了设计简略,目前一个流解决实例中仅有一张计算拓扑。
在所有流解决算子之中,有两种特地的算子,一种是波及数据分组的算子groupBy,另一种是有状态计算例如count等。这两种算子会影响整个计算拓扑的构建,上面将具体分析RocketMQ-Streams是如何解决他们的。
groupBy
分组算子groupBy非凡是因为通过groupBy操作,后续算子冀望对雷同key的数据进行操作,例如通过groupBy("年级")之后再进行sum就是对依照年级分组求和,这就要求须要将具备雷同“年级”的数据从新路由到一个流计算实例上解决,如果不这样做,每个实例上得出的后果都将是不残缺的,整体输入后果也将是谬误的。
RocketMQ-Streams 采纳 shuffle topic 这种形式来解决。具体说来,计算实例将groupBy数据从新发回RocketMQ的一个topic,并且在发回过程中依照key的hash值来抉择指标队列,再从这个topic读取数据进行后续流解决。依照key hash后雷同的key肯定在一个队列外面,而一个队列只会被一个流解决实例生产,这样就达到雷同key被路由到一个实例上解决的成果。
有状态算子
有状态算子与无状态算子绝对。如果计算结果只与以后输出无关,和上一次输出无关就是无状态算子,例如filter、map、foreach后果只与以后输出有关系。还有一种算子的输入后果不仅与以后算子有关系还与上一次输出无关,例如sum,须要对一段时间内输出进行求和,他就是有状态算子。
RocketMQ-Streams利用RocksDB作为本地存储,Mysql作为近程存储来保留状态数据。他具体做法是:
- 当发现音讯来自新的队列时,查看是否须要加载状态,如果须要异步加载状态到RocksDB。
- 数据达到有状态算子时,如果加载实现应用RocksDB中状态进行计算,如果没有,应用Mysql中状态计算。
- 计算实现后,将状态数据保留到RocksDB和Mysql中。
- 窗口触发后,从RocksDB中查问出状态数据,并将后果向上游算子传递。
整体数据流向图如下:
扩缩容与故障复原
扩缩容和故障复原是一个硬币的两面,即同一个事物的两种表白,计算集群如果能正确扩缩容就等于具备故障复原的能力,反之亦然。通过后面介绍咱们晓得,RocketMQ-Streams具备十分良好的扩缩容性能,扩容时只须要新部署一个流计算实例即可,缩容时进行计算实例即可。对于无状态的计算来说比较简单,扩容后,数据计算不须要之前的状态。有状态计算的扩缩容波及到状态的迁徙。有状态的扩缩容可由下图示意:
当计算实例从3个缩容到2个,借助于RocketMQ的rebalance,MQ会在计算实例之间重新分配。
Instance1上生产的MQ2和MQ3被调配到Instance2和Instance3上,这两个MQ的状态数据也须要迁徙到Instance2和Instance3上,这也暗示,状态数据是依据源数据分片保留的;扩容则是刚好相同的过程。
具体实现上,RocketMQ-Streams采纳零碎音讯来触发状态的加载和长久化。
零碎音讯类别:
//新增生产队列NewSplitMessage//不在生产某个队列RemoveSplitMessage//客户端长久化生产位点到MQCheckPointMessage
当发现音讯来自一个新的RocketMQ队列(MessageQueue),RocketMQ-Streams之前没有解决过来自该队列的音讯,会先于数据前发送NewSplitMessage音讯,通过解决拓扑上游算子传递,当有状态算子收到该音讯时会将新增队列对应的状态加载到本地内存RocksDB中,当数据真正达到时,就依据这个状态持续计算。
当因为计算实例减少或者RocketMQ集群变动,rebalance后,计算实例不再生产某个队列(MessageQueue)时,会收回RemoveSplitMessage音讯,有状态算子删除本地RocksDB中的状态。
CheckPointMessage是一种特地的零碎音讯,他的作用与实现exactly-once无关。咱们在扩缩容过程中须要做到exactly-once,能力保障扩缩容或故障复原对计算结果没有影响。RocketMQ-streams向broker提交生产offset前会产生CheckPointMessage音讯,向上游拓扑传递,他将保障行将提交生产位点的所有音讯都曾经被sink解决掉。
*开源地址:
RocketMQ-Streams 仓库地址:
https://github.com/apache/roc...
RocketMQ 仓库地址:
https://github.com/apache/roc...*
退出 Apache RocketMQ 社区
十年铸剑,Apache RocketMQ 的成长离不开寰球靠近 500 位开发者的积极参与奉献,置信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅能够结识社区大牛,晋升技术水平,也能够晋升集体影响力,促成本身成长。
社区 5.0 版本正在进行着热火朝天的开发,另外还有靠近 30 个 SIG(兴趣小组)等你退出,欢送立志打造世界级分布式系统的同学退出社区,增加社区开发者微信:rocketmq666 即可进群,参加奉献,打造下一代音讯、事件、流交融解决平台。