乐趣区

关于消息队列:RocketMQ-Streams拓扑构建与数据处理过程

本文作者:倪泽,Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer

01 背景

RocketMQ Streams 1.1.0 版本已于近期公布,绝对之前版本有以下改良和优化:

1、API 层面反对泛型,可自定义输入输出数据;

2、去掉冗余逻辑,简化代码,重写拓扑图构建和数据处理过程;

本文章承接上篇:RocketMQ Streams 1.1.0: 轻量级流解决再登程,从实现原理上介绍 RocketMQ Streams 是如何实现流计算拓扑图构建的以及探讨了数据流转过程和流转过程中的状态变动。

02 流解决拓扑构建过程

public class example {public static void main(String[] args) {StreamBuilder builder = new StreamBuilder("wordCount");


        builder.source("sourceTopic", total -> {String value = new String(total, StandardCharsets.UTF_8);
                    return new Pair<>(null, value);
                })
                .flatMap((ValueMapperAction<String, List<String>>) value -> {String[] splits = value.toLowerCase().split("\W+");
                    return Arrays.asList(splits);
                })
                .keyBy(value -> value)
                .count()
                .toRStream()
                .print();


        TopologyBuilder topologyBuilder = builder.build();


        Properties properties = new Properties();
        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");


        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);




        Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
            @Override
            public void run() {rocketMQStream.stop();
            }
        });
        
        rocketMQStream.start();}
}

在使用者书写上述及连表达式时,产生第一次构建,即逻辑节点的增加,前后算子具备父子关系,构建后造成逻辑节点,多个逻辑节点造成链表。

逻辑构建完结后,调用 StreamBuilder#build() 办法进行第二次构建,将逻辑节点中可能蕴含的多个实在节点增加拓扑,造成解决拓扑图。

通过两次两次构建后,解决拓扑曾经残缺。然而为了辨别不同 topic 应用不同拓扑解决,须要在数据降临前的重均衡阶段,创立实在数据处理节点,这是第三次构建。

逻辑构建(第一次构建)

逻辑节点自身不包含实际操作,然而可由逻辑节点持续构建出理论节点,一个逻辑节点可能蕴含一理论节点,也可能蕴含多个理论节点,例如 count 逻辑算子不仅仅蕴含累加这个实际操作,累加前须要对雷同 key 的数据路由到同一计算实例上,因而还须要蕴含 sink、source 两个理论节点,然而这些只会在构建理论节点时体现进去,不会在增加逻辑节点阶段体现。

每个逻辑节点都是 GraphNode 的子类,构建时,将子节点算子退出父节点 child 汇合中,将父节点退出子节点 parent 汇合中。这个构建过程中应用 Pipeline 均为同一个实例。最终在 Pipeline 中,造成以 root 节点为根节点的链表。

增加逻辑节点逻辑:

@Override
public <OUT> GroupedStream<K, OUT> map(ValueMapperAction<V, OUT> mapperAction) {
    //1、确定节点名称


    //2、实现 Supplier 类,实现数据处理逻辑


    //3、实例化逻辑节点类 GraphNode


    //4、将逻辑节点 GraphNode 增加到 pipeline 中造成链表
}

能够看到逻辑节点的增加十分通用,实现不同性能的算子,只须要实现算子对应的数据理论解决逻辑即可,如将新增算子造成拓扑图等等后续工作齐全不必关怀,升高了新算子开发的门槛。

在逻辑节点的构建过程中,有两类比拟非凡的算子,一个是实现数据分组的 shuffle 算子,一个是实现双流聚合的 Join 算子。

shuffle 逻辑算子的性能是将含有雷同 key 的数据发送到同一个队列中,不便后续算子对雷同 key 的数据进行统计。他通常是 keyBy 前面紧跟的算子,例如 keyBy(“ 年级 ”).count(),那么 count 就是一个 shuffle 算子类型。shuffle 逻辑算子蕴含三个理论处理过程:

  • 将数据依照 Key 的 hash%queueNum 发送到对应队列;
  • 从 RocketMQ 中拉取上述数据到本地;
  • 依照 shuffle 节点中定义的逻辑进行解决,例如累加。

Join 算子的性能是实现双流聚合,将两个数据流聚合成一个。

Join 拓扑图

在左流和右流上增加 KeyBy 算子,对左流和右流进行别离过滤;之后在左流和右流上别离增加标签节点,标记此数据是左流还是右流,之后将两个标签节点,指向一个独特的 Join 节点,数据在此实现汇聚,依照使用者给定的 ValueJoinAction 节点解决。

Join 应用形式:

StreamBuilder builder = new StreamBuilder(jobId);


RStream<T> leftStream = builder.source(...);
RStream<V> rightStream = builder.source(...);


ValueJoinAction<T, V, R> action = new ValueJoinAction<T, V, R>(){...};


leftStream.join(rightStream)
          .where(左流字段)
          .equalTo(右流字段)
          .apply(action)
          .print();

Join 实现伪代码:

// 左右流依照各自字段分组,含有雷同 key 的字段会被回写到同一个队列外面;GroupedStream<K, V1> leftGroupedStream = leftStream.keyBy(左流字段);
// 因为前面左右流数据会在一起解决,为了辨别数据起源,在数据中增加标记是左流还是右流
leftGroupedStream.addGraphNode(addTag);
// 获取 leftGroupedStream 最初的逻辑节点
GraphNode leftLast = leftGroupedStream.getLast();
    
GroupedStream<K, V1> rightGroupedStream = leftStream.keyBy(右流字段);
rightGroupedStream.addGraphNode(addTag);
GraphNode rightLast = rightGroupedStream.getLast();


// 数据汇聚节点
ProcessorNode<OUT> commChild = new ProcessorNode(name, temp,“聚合数据实际操作”);
commChild.addParent(leftLast);
commChild.addParent(rightLast);


// 对立数据流
RStreamImpl commRStream = new RStreamImpl<>(Pipeline, commChild);
// 持续在对立数据流上操作
commRStream...

理论构建(第二次构建)

构建逻辑节点结束后,从 ROOT 节点开始遍历,调用 GraphNode 逻辑节点 addRealNode 办法,构建实在节点工厂类。

在第二次构建理论节点过程中,会对逻辑节点进行拆解,对于大多数逻辑节点,只须要构建一个理论节点,然而对于某些非凡的逻辑节点须要构建多个理论节点能力与之对应,例如 shuffle 类型逻辑节点,他须要蕴含三个理论节点:发送数据节点、生产数据节点、解决数据节点。shuffle 类型逻辑节点父节点必须是 GroupBy,例如上图所示的 count 是 shuffle 节点,Window 节点也能够是逻辑节点。

第二次构建并不会间接生成解决数据的 Processor,而是产生 ProcessorFactory 对象。为什么不生成间接能解决数据的 Processor 对象呢?因为一个 RocketMQ Streams 实例须要同时拉取不同队列进行流计算,为了能将不同队列的流计算过程区别开,针对每个队列会由独立的 Processor 实例进行解决,因而第二次构建仅仅构建出 ProcessorFactory,在重均衡确定流解决实例要拉去哪些队列后,再由 ProcessorFactory 实例化 Processor。

第三次构建

客户程序依赖 RocketMQ Streams 取得流计算能力,因而客户程序实质上是就是一个 RocketMQ Client(见计划架构图)。在 RocketMQ Client 产生重均衡时,会将 RocketMQ Server 所蕴含的队列在客户端中重新分配,第三次构建,也就是由 ProcessorFactory 实例化 Processor,就产生在重均衡产生后,拉取数据前。第三次实在的构建出了解决数据的 Processor,并将子节点 Processor 增加进入父节点 Processor 中。

03 数据处理过程

状态复原

流处理过程中产生的计算状态保留、复原波及到流处理过程的正确性。在流解决实例宕机的状况下,该流解决实例上生产的队列会被重均衡到其余流解决实例上。如果对该队列进行了有状态计算,那么产生的状态也须要在新的流计算实例上复原。如上图中,Instance1 宕机,他生产的 MQ2 和 MQ3 被别离迁徙到 Instance2 和 Instance3 上,MQ2 和 MQ3 对应的状态(紫色和蓝色)也须要在 Instance2 和 Instance3 上复原进去。

  • 存储介质

应用本地 RocksDB,近程 RocketMQ 的组合,作为状态存储介质。流计算在计算状态时,RocksDB 在应用无限内存状况下,作为状态的长期存储,用于算子交互,在计算完结后提交生产位点时将本次计算产生的状态一并写入 RocketMQ 中。生产位点提交、计算结果写出、状态保留须要放弃原子状态,这一内容在前面流计算正确性中探讨。

  • 状态长久化存储

RocketMQ 作为音讯长期存储,存在数据最大过期工夫,一旦过期后,数据会被删除。然而状态存储介质实质上是以 KV 形式存储数据,不心愿 KV 数据随着工夫过期而被删除。因而,应用 Compact topic 作为状态存储,他会对同一队列的数据依照 Key 对数据进行压缩,雷同 Key 的数据只保留 offset 最大的一条。

//key 如果决定数据被发送到某个 Broker 的哪个队列
int queueId = hash(key) % queueNum

然而在 RocketMQ 中队列数会随着 Broker 扩缩容而减少或者缩小,扩缩 Broker 数量前后,雷同的 Key 可能被发送到不同的队列,那么依照上述规定进行 Compact 后失去某个 key 所在的 queueId 就是谬误的,应用 Compact topic 作为 KV 存储就失去了意义。

因而在状态 topic 是 Compact topic 的根底上,再将状态 topic 创立为 Static topic(Logic Queue),即状态 topic 即是 Compact topic 也是 Static topic。这样能力解耦队列数量与 Broker 数量,使队列数量在扩缩 Broker 状况下依然不变,保障含有雷同 Key 的数据能被发送到同一队列中。

  • 状态重放

从被迁徙状态队列拉取数据到本地进行重放,须要从队列头开始生产,雷同 Key 的数据只保留 offset 最大的数据,造成 K - V 状态对,放入本地长期存储 RocksDB 中;

  • 状态 topic 与 source topic 对应关系

因为状态 topic 中的队列会随着 source topic 队列迁徙而迁徙,保障对 source topic 队列中数据进行有状态解决失去正确的后果,因而在队列层面,状态 topic 与 source topic 应该是一一对应的关系。即状态 topic 名称与 source topic 名称一一对应,状态 topic 的队列数量等于 source topic 队列数量。source topic 队列的流计算状态保留在状态 topic 的对应队列中。

数据处理

图中彩色线示意控制流,黄色线示意数据流;rebalance 局部先于 litePull 局部被调用。

重均衡局部:

  • 依据调配到的队列,到相应状态 topic 的相应队列中从头拉取数据,到本地重放,取得 KV 状态对,放入到本地 RocksDB 中。
  • 依据数据源 topic,构建对应的数据处理器 processor(即第三次构建过程),保存起来;

数据处理局部:

  • 应用 litePull 模式拉取数据,能够独立管制生产位点提交;
  • 数据反序列化;
  • 应用 topic 查找 processor;
  • 将 processor 的子节点保存起来(子节点在第三次构建过程中增加);
  • 数据向上下文 StreamContext 中传递,由他将数据路由到上游节点;
  • 数据处理前,现将上游节点的子节点保存起来供后续查找;
  • 数据处理,如果有状态算子则与 RocksDB 交互,如果还有上游节点则持续进入 StreamContext,如果没有上游节点则完结解决。

数据每次到上游节点前,先进入 StreamContext 中,由它对立向上游节点传递数据。StreamContext 中蕴含了解决数据所须要的所有信息,包含数据起源、状态存储、下游子节点等等;

StreamContext 一直递归迭代,将数据向上游传递,最终数据会被拓扑图上所有节点解决,由 sink 节点写出后果。

04 参加奉献

RocketMQ Streams 是 Apache RocketMQ 的子项目,曾经在社区开源,并且提出了一些 Good First Issue 供感兴趣同学加入。参加 RocketMQ Streams 相干工作,请参考以下资源:

1、试用 RocketMQ Streams,并浏览相干文档以理解更多信息;

maven 仓库坐标:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-streams</artifactId>
    <version>1.1.0</version>
</dependency>

RocketMQ Streams 文档:

​https://rocketmq.apache.org/zh/docs/streams/01RocketMQ%20Streams%20Overview​

2、参加奉献:如果你有任何性能申请或错误报告,请随时提交 Pull Request 来分享你的反馈和想法;

社区仓库:​​https://github.com/apache/rocketmq-streams​

3、分割咱们:能够在 GitHub 上创立 Issue,向 RocketMQ 邮件列表发送电子邮件,或在 RocketMQ Streams SIG 交换群与专家独特探讨,RocketMQ Streams SIG 退出形式:增加“小火箭”微信,回复 RocketMQ Streams。

邮件列表:​​https://lists.apache.org/list.html?dev@rocketmq.apache.org​

退出移动版