本文作者:倪泽,Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer

01 背景

RocketMQ Streams 1.1.0版本已于近期公布,绝对之前版本有以下改良和优化:

1、API层面反对泛型,可自定义输入输出数据;

2、去掉冗余逻辑,简化代码,重写拓扑图构建和数据处理过程;

本文章承接上篇:RocketMQ Streams 1.1.0: 轻量级流解决再登程,从实现原理上介绍RocketMQ Streams是如何实现流计算拓扑图构建的以及探讨了数据流转过程和流转过程中的状态变动。

02 流解决拓扑构建过程

public class example {    public static void main(String[] args) {        StreamBuilder builder = new StreamBuilder("wordCount");        builder.source("sourceTopic", total -> {                    String value = new String(total, StandardCharsets.UTF_8);                    return new Pair<>(null, value);                })                .flatMap((ValueMapperAction<String, List<String>>) value -> {                    String[] splits = value.toLowerCase().split("\W+");                    return Arrays.asList(splits);                })                .keyBy(value -> value)                .count()                .toRStream()                .print();        TopologyBuilder topologyBuilder = builder.build();        Properties properties = new Properties();        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);        Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {            @Override            public void run() {                rocketMQStream.stop();            }        });                rocketMQStream.start();    }}

在使用者书写上述及连表达式时,产生第一次构建,即逻辑节点的增加,前后算子具备父子关系,构建后造成逻辑节点,多个逻辑节点造成链表。

逻辑构建完结后,调用StreamBuilder#build()办法进行第二次构建,将逻辑节点中可能蕴含的多个实在节点增加拓扑,造成解决拓扑图。

通过两次两次构建后,解决拓扑曾经残缺。然而为了辨别不同topic应用不同拓扑解决,须要在数据降临前的重均衡阶段,创立实在数据处理节点,这是第三次构建。

逻辑构建(第一次构建)

逻辑节点自身不包含实际操作,然而可由逻辑节点持续构建出理论节点,一个逻辑节点可能蕴含一理论节点,也可能蕴含多个理论节点,例如count逻辑算子不仅仅蕴含累加这个实际操作,累加前须要对雷同key的数据路由到同一计算实例上,因而还须要蕴含sink、source两个理论节点,然而这些只会在构建理论节点时体现进去,不会在增加逻辑节点阶段体现。

每个逻辑节点都是GraphNode的子类,构建时,将子节点算子退出父节点child汇合中,将父节点退出子节点parent汇合中。这个构建过程中应用Pipeline均为同一个实例。最终在Pipeline中,造成以root节点为根节点的链表。

增加逻辑节点逻辑:

@Overridepublic <OUT> GroupedStream<K, OUT> map(ValueMapperAction<V, OUT> mapperAction) {    //1、确定节点名称    //2、实现Supplier类,实现数据处理逻辑    //3、实例化逻辑节点类GraphNode    //4、将逻辑节点GraphNode增加到pipeline中造成链表}

能够看到逻辑节点的增加十分通用,实现不同性能的算子,只须要实现算子对应的数据理论解决逻辑即可,如将新增算子造成拓扑图等等后续工作齐全不必关怀,升高了新算子开发的门槛。

在逻辑节点的构建过程中,有两类比拟非凡的算子,一个是实现数据分组的shuffle算子,一个是实现双流聚合的Join算子。

shuffle逻辑算子的性能是将含有雷同key的数据发送到同一个队列中,不便后续算子对雷同key的数据进行统计。他通常是keyBy前面紧跟的算子,例如keyBy("年级").count(),那么count就是一个shuffle算子类型。shuffle逻辑算子蕴含三个理论处理过程:

  • 将数据依照Key的hash%queueNum发送到对应队列;
  • 从RocketMQ中拉取上述数据到本地;
  • 依照shuffle节点中定义的逻辑进行解决,例如累加。

Join算子的性能是实现双流聚合,将两个数据流聚合成一个。

Join拓扑图

在左流和右流上增加KeyBy算子,对左流和右流进行别离过滤;之后在左流和右流上别离增加标签节点,标记此数据是左流还是右流,之后将两个标签节点,指向一个独特的Join节点,数据在此实现汇聚,依照使用者给定的ValueJoinAction节点解决。

Join应用形式:

StreamBuilder builder = new StreamBuilder(jobId);RStream<T> leftStream = builder.source(...);RStream<V> rightStream = builder.source(...);ValueJoinAction<T, V, R> action = new ValueJoinAction<T, V, R>(){...};leftStream.join(rightStream)          .where(左流字段)          .equalTo(右流字段)          .apply(action)          .print();

Join实现伪代码:

//左右流依照各自字段分组,含有雷同key的字段会被回写到同一个队列外面;GroupedStream<K, V1> leftGroupedStream = leftStream.keyBy(左流字段);//因为前面左右流数据会在一起解决,为了辨别数据起源,在数据中增加标记是左流还是右流leftGroupedStream.addGraphNode(addTag);//获取leftGroupedStream最初的逻辑节点GraphNode leftLast = leftGroupedStream.getLast();    GroupedStream<K, V1> rightGroupedStream = leftStream.keyBy(右流字段);rightGroupedStream.addGraphNode(addTag);GraphNode rightLast = rightGroupedStream.getLast();//数据汇聚节点ProcessorNode<OUT> commChild = new ProcessorNode(name, temp, “聚合数据实际操作”);commChild.addParent(leftLast);commChild.addParent(rightLast);//对立数据流RStreamImpl commRStream = new RStreamImpl<>(Pipeline, commChild);//持续在对立数据流上操作commRStream...

理论构建(第二次构建)

构建逻辑节点结束后,从ROOT节点开始遍历,调用GraphNode逻辑节点addRealNode办法,构建实在节点工厂类。

在第二次构建理论节点过程中,会对逻辑节点进行拆解,对于大多数逻辑节点,只须要构建一个理论节点,然而对于某些非凡的逻辑节点须要构建多个理论节点能力与之对应,例如shuffle类型逻辑节点,他须要蕴含三个理论节点:发送数据节点、生产数据节点、解决数据节点。shuffle类型逻辑节点父节点必须是GroupBy,例如上图所示的count是shuffle节点,Window节点也能够是逻辑节点。

第二次构建并不会间接生成解决数据的Processor,而是产生ProcessorFactory对象。为什么不生成间接能解决数据的Processor对象呢?因为一个RocketMQ Streams实例须要同时拉取不同队列进行流计算,为了能将不同队列的流计算过程区别开,针对每个队列会由独立的Processor实例进行解决,因而第二次构建仅仅构建出ProcessorFactory,在重均衡确定流解决实例要拉去哪些队列后,再由ProcessorFactory实例化Processor。

第三次构建

客户程序依赖RocketMQ Streams取得流计算能力,因而客户程序实质上是就是一个RocketMQ Client(见计划架构图)。在RocketMQ Client产生重均衡时,会将RocketMQ Server所蕴含的队列在客户端中重新分配,第三次构建,也就是由ProcessorFactory实例化Processor,就产生在重均衡产生后,拉取数据前。第三次实在的构建出了解决数据的Processor,并将子节点Processor增加进入父节点Processor中。

03数据处理过程

状态复原

流处理过程中产生的计算状态保留、复原波及到流处理过程的正确性。在流解决实例宕机的状况下,该流解决实例上生产的队列会被重均衡到其余流解决实例上。如果对该队列进行了有状态计算,那么产生的状态也须要在新的流计算实例上复原。如上图中,Instance1宕机,他生产的MQ2和MQ3被别离迁徙到Instance2和Instance3上,MQ2和MQ3对应的状态(紫色和蓝色)也须要在Instance2和Instance3上复原进去。

  • 存储介质

应用本地RocksDB,近程RocketMQ的组合,作为状态存储介质。流计算在计算状态时,RocksDB在应用无限内存状况下,作为状态的长期存储,用于算子交互,在计算完结后提交生产位点时将本次计算产生的状态一并写入RocketMQ中。生产位点提交、计算结果写出、状态保留须要放弃原子状态,这一内容在前面流计算正确性中探讨。

  • 状态长久化存储

RocketMQ作为音讯长期存储,存在数据最大过期工夫,一旦过期后,数据会被删除。然而状态存储介质实质上是以KV形式存储数据,不心愿KV数据随着工夫过期而被删除。因而,应用Compact topic作为状态存储,他会对同一队列的数据依照Key对数据进行压缩,雷同Key的数据只保留offset最大的一条。

//key如果决定数据被发送到某个Broker的哪个队列int queueId = hash(key) % queueNum

然而在RocketMQ中队列数会随着Broker扩缩容而减少或者缩小,扩缩Broker数量前后,雷同的Key可能被发送到不同的队列,那么依照上述规定进行Compact后失去某个key所在的queueId就是谬误的,应用Compact topic作为KV存储就失去了意义。

因而在状态topic是Compact topic的根底上,再将状态topic创立为Static topic(Logic Queue),即状态topic即是Compact topic也是Static topic。这样能力解耦队列数量与Broker数量,使队列数量在扩缩Broker状况下依然不变,保障含有雷同Key的数据能被发送到同一队列中。

  • 状态重放

从被迁徙状态队列拉取数据到本地进行重放,须要从队列头开始生产,雷同Key的数据只保留offset最大的数据,造成K-V状态对,放入本地长期存储RocksDB中;

  • 状态topic与source topic对应关系

因为状态topic中的队列会随着source topic队列迁徙而迁徙,保障对source topic队列中数据进行有状态解决失去正确的后果,因而在队列层面,状态topic与source topic应该是一一对应的关系。即状态topic名称与source topic名称一一对应,状态topic的队列数量等于source topic队列数量。source topic队列的流计算状态保留在状态topic的对应队列中。

数据处理

图中彩色线示意控制流,黄色线示意数据流;rebalance局部先于litePull局部被调用。

重均衡局部:

  • 依据调配到的队列,到相应状态topic的相应队列中从头拉取数据,到本地重放,取得KV状态对,放入到本地RocksDB中。
  • 依据数据源topic,构建对应的数据处理器processor(即第三次构建过程),保存起来;

数据处理局部:

  • 应用litePull模式拉取数据,能够独立管制生产位点提交;
  • 数据反序列化;
  • 应用topic查找processor;
  • 将processor的子节点保存起来(子节点在第三次构建过程中增加);
  • 数据向上下文StreamContext中传递,由他将数据路由到上游节点;
  • 数据处理前,现将上游节点的子节点保存起来供后续查找;
  • 数据处理,如果有状态算子则与RocksDB交互,如果还有上游节点则持续进入StreamContext,如果没有上游节点则完结解决。

数据每次到上游节点前,先进入StreamContext中,由它对立向上游节点传递数据。StreamContext中蕴含了解决数据所须要的所有信息,包含数据起源、状态存储、下游子节点等等;

StreamContext一直递归迭代,将数据向上游传递,最终数据会被拓扑图上所有节点解决,由sink节点写出后果。

04 参加奉献

RocketMQ Streams是Apache RocketMQ的子项目,曾经在社区开源,并且提出了一些Good First Issue供感兴趣同学加入。参加RocketMQ Streams相干工作,请参考以下资源:

1、试用RocketMQ Streams,并浏览相干文档以理解更多信息;

maven仓库坐标:

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-streams</artifactId>    <version>1.1.0</version></dependency>

RocketMQ Streams文档:

https://rocketmq.apache.org/zh/docs/streams/01RocketMQ%20Streams%20Overview

2、参加奉献:如果你有任何性能申请或错误报告,请随时提交 Pull Request 来分享你的反馈和想法;

社区仓库:https://github.com/apache/rocketmq-streams

3、分割咱们:能够在 GitHub上创立 Issue,向 RocketMQ 邮件列表发送电子邮件,或在 RocketMQ Streams SIG 交换群与专家独特探讨,RocketMQ Streams SIG退出形式:增加“小火箭”微信,回复RocketMQ Streams。

邮件列表:https://lists.apache.org/list.html?dev@rocketmq.apache.org