关于flink:Flink-ML-API为实时机器学习设计的算法接口与迭代引擎

37次阅读

共计 8894 个字符,预计需要花费 23 分钟才能阅读完成。

摘要:本文整顿自阿里巴巴高级技术专家林东、阿里巴巴技术专家高赟(云骞)在 Flink Forward Asia 2021 核心技术专场的演讲。次要内容包含:

  1. 面向实时机器学习的 API
  2. 流批一体的迭代引擎
  3. Flink ML 生态建设

点击查看直播回放 & 演讲 PDF

一、面向实时机器学习的 API

Flink ML API 指的是提供给用户应用算法的接口。通过把所有算法打包为对立的 API 提供给用户,让所有使用者的体验保持一致,也能升高学习和了解算法的老本,此外算法之间也能够更好地交互和兼容。

举个例子,在 Flink ML API 中提供一些根底的性能类,通过应用这些性能类能够把不同算子连贯组合成一个高级的算子,能够大大提高了算法的开发效率。同时,通过应用对立的 Table API,让所有的数据都以 Table 格局进行传输,能够使得不同公司开发的算法可能相互兼容,升高不同公司反复开发的算子的老本,晋升算法单干的效率。

之前版本的 Flink ML API 还是存在不少痛点。

首先是表达能力方面。之前的 API 的输出只反对单个 Table 的模式,无奈表白一些常见的算法逻辑。比方有些训练算法的输出表白是一张图,把数据通过不同的 Table 传进来,这种状况下单个 Table 输出的接口就不实用了。再比方有些数据预处理的逻辑须要将多个输出失去的数据进行交融,用单个 Table 输出的 API 也不适宜。因而咱们打算把算法接口扩大为反对多输出多输入。

其次是实时训练方面。之前的 API 无奈原生反对实时机器学习场景。在实时机器学习中,咱们心愿训练算法能够实时产生模型数据,并将模型数据以流的形式实时传输到多个前端服务器中。然而现有的接口只有一次性的训练和一次性的推理 API,无奈表白这种逻辑。

最初是易用性方面。之前采纳 toJson() 和 fromJson() 来导出和加载模型数据,并且容许用户贮存这些数据。然而有些模型的数据量高达几个 G,在这种状况下将模型数据以 string 的形式进行贮存,效率会非常低,甚至可能无奈实现。当然,存在一些 hacky 办法,能够把模型数据贮存到一个近程终端,再把相干的 url 通过 toJson() 办法传导进去。然而这种状况下会存在易用性的问题,算法使用者须要本人去解析 URL,并从近程获取这些数据。

受限于以上几个方面的因素,咱们决定对 Flink ML API 进行扩大。

在通过大量探讨以及思考之后,咱们对新的 API 赋予了以下个性,解决了下面所有问题。

  • 第一,在 API 上减少了取得模型数据的接口,比方在 model 上减少了 getModelData() 和 setModelData() API,可能帮忙实现实时机器学习场景;
  • 第二,对算子的接口做了扩大,让算子能够反对多输出多输入,还能够将不同算子以有向无环图的形式进行整合,打包成更高级的算子;
  • 第三,新的 API 是基于 datastream 实现的,能够反对流批一体的个性,可能同时实现基于无限流和有限流的在线训练;
  • 第四,对算法的参数存取 API 做了改良,新的算法参数的存取 API 更容易应用;
  • 第五,对模型的存取 API 做了改良,新的模型存取 API 采纳了 save() 和 load() API,模型数据十分大的状况下用户也毋庸思考这方面的复杂度,只须要调用 save() 和 load() 就能够实现相干的性能;
  • 第六,提供了无模型语义的抽象类。

上图是最新的 API 构架图。最上层有一个 WithParams interface,它提供了存取参数的 API。咱们对这个接口做了改良,用户不再须要表白 isOptional 之类的 field。这个接口之下是一个 stage 接口,它蕴含了所有算法模块,并提供了存取模块的 API,即 save() 和 load()。save() 负责把模型数据和参数贮存下来,load() 负责把模型数据和参数读取进去,还原原先的 stage 实例。用户不必思考参数存取的复杂度。

Stage 下分为两块,一块是表白训练逻辑的 Estimator,另一块是表白推理逻辑的 AlgoOperator 和 Model 类。Estimator 的外围 API 是 fit()。与之前不同的是当初反对多个 Table 的输出,能够用来表白须要多个 Table 输出逻辑,比方特色的拼接。Estimator::fit() 输入的是一个 Model。Model 属于 AlgoOperator。AlgoOperator 表白的是计算逻辑,反对多个 Table 作为输出和输入,每个 Table 都是一个数据源,能够用来表白通用的计算逻辑。

AlgoOperator 之下是 Transformer,能够表白对数据做转换的逻辑。它与 AlgoOperator 具备雷同的 API 格局,然而它们的抽象概念却有所不同。Transformer 是一个具备模型语义的数据转换逻辑。在计算中,比方数据预处理,存在一些更通用的将不同数据进行拼接转换的操作,例如对数据进行过滤,在通用的概念下可能并不适用于 Transformer。因而咱们特意减少了 AlgoOperator 类,不便用户的了解和应用。

Transformer 之下是 model 类。咱们减少了 setModelData() 和 getModelData() API。这两个 API 是为实时机器学习专门设计的,能够让用户把模型数据实时导出到多个近程终端做在线的推理。

上图是一个比拟简化但经典的实时机器学习场景。

这里的数据起源次要有两个,静态数据来自于 HDFS,动态数据来自于 Kafka。由 AlgoOperator 读取来自以上两个数据源的数据,将它们拼接之后造成一个 Table 输出到 Estimator 逻辑。Estimator 读取方才拼接失去的数据并产生一个 Model,而后能够通过 getModelData() 拿到代表模型数据的 Table。再通过 sink() API 将这些数据传输到 Kafka topic。最初在多个前端服务器下面运行程序,这些程序能够间接创立一个 Model 实例,从 Kafka 中读出模型数据造成一个 Table,再通过 setModelData() 把这些数据传递给 Model,应用失去的 Model 做在线推理。

在反对在线训练和在线推理之后,咱们进一步提供了一些根底组件,不便用户通过简略的算子构建更简单算子,这个组件便是 FLIP-175 提供的 GraphBuilder。

假如用户的输出也是与上文统一的两个数据源,最终输入到一个数据源。用户的外围计算逻辑能够分为两块,第一块是数据预处理,比方特色拼接,把两个数据源的数据读进来之后做整合,以 Table 的模式输入到 Estimator,执行第二块的训练逻辑。咱们心愿先执行训练算子,失去一个 Model。而后将预处理算子和 Model 连贯,表白在线推理逻辑。

用户须要做的只是通过 GraphBuilder API 将上述步骤连贯进行,不须要专门为在线推理逻辑再写一遍连贯逻辑。GraphBuilder 会主动从后面一个图生成,并与前面图中的算子造成一一对应的关系。AlgoOperator 在训练图中的模式是间接转换为推理图中的算子,而 Estimator 在训练图中失去的 Model 会成为推理图中对应的节点,通过将这些节点相连,便失去了最初的 ModelA,最终用作在线推理。

二、流批一体的迭代引擎

Flink 是一个基于 Dag 形容执行逻辑的流批一体的解决引擎,然而在许多场景下,尤其是机器学习 \ 图计算类型的利用中,用户还须要数据迭代解决的能力。例如,一些算法的离线训练、在线训练以及模型部署后依据后果动静调整模型参数的场景,都须要数据迭代解决。

因为理论的场景同时会涵盖离线和在线解决的案例,因而须要在迭代这一层可能同时反对离线和在线解决。

前文提到的三个场景的解决逻辑既存在区别,也存在共性。

对于离线训练,以逻辑回归为例,在迭代中能够应用一个节点来缓存整个模型,这个节点会将最新的模型发送给训练节点,而训练节点会在迭代开始前事后读取整个数据集,随后在每次收到最新的模型后,从数据集中抉择一个 mini-batch 数据对模型进行更新,并把后果发送回模型缓存节点。

对于在线计算,因为训练数据是从内部源源不断达到的,无奈事后读取所有训练数据,个别的做法是动静读取一个 mini-batch 的数据,计算完模型的更新后将其发送给模型缓存的节点,等模型的缓存节点进一步发送更新后的模型当前,再读取下一个 mini-batch 数据,这也就要求训练节点必须采纳优先级读的形式对数据进行读取,从而最终实现一一解决 mini-batch 的能力。

在这两种场景下的训练都存在同步和异步形式,具体取决于模型缓存节点是否要收集到所有更新后再开始下一轮训练。此外还存在一些模型,在预测的时候会对参数进行动静的更新,解决完每一条数据之后都要立即评估是否会进行参数更新,如果须要就再发动更新。

这几种场景下的计算逻辑存在肯定的共性,首先都须要在作业图中引入迭代的构造来反对数据的循环解决,并且在数据循环解决之后须要进行是否终止迭代的判断。另一方面,计算过程中还须要在每一轮数据接管实现后,接管到相应的告诉,触发特定的计算,比方离线训练中,接触残缺个模型后就要开始下一轮的计算。

这里其实存在两个抉择,一个是在迭代这一层,间接提供将数据集划分为多个 mini-batch,并且对每个 mini-batch 赋予迭代的能力。它在逻辑上能够承受所有类型的迭代,然而如果想要同时反对一一 mini-batch 解决与多个 mini-batch 并行处理的逻辑,就必须引入一套新的基于 mini-batch 的流解决接口,并且在实际层反对这两种解决逻辑。

另外在线训练和离线训练的 mini-batch 产生的形式也不一样,离线 mini-batch 是通过本地事后读取所有数据,而后在每一轮解决中进行选取来实现。而在线 mini-batch 通过从内部数据中读取特定数据量的数据来实现的。因而如果想要兼容这两种状况,会进一步减少接口和实现的复杂度。

最初如果要兼容单个 per-record 的解决,还必须引入无限大的 mini-batch,或将每条记录看作一个独自的 mini-batch,前者会进一步减少接口的复杂度,而后者须要每一记录对算子进行一次告诉,会减少运行时的开销。

思考到上述情况,咱们在迭代中只提供了整个数据集级别的告诉,而将划分 mini-batch 的性能放在了迭代之上。在离线训练中,用户能够通过从整个数据集中选取一部分数据来高效实现 mini-batch 抉择的性能。而在在线计算中,用户能够通过 Flink 自带的运行集输出算子,实现一一 mini-batch 解决的性能。

基于上述思路,整个迭代的设计如上图所示,次要由 4 局部组成。初始模型这类有回边的输出、数据集这类无回边的只读输出、迭代回边的地位以及最终输入。其中回边对应的数据集与有回边的输出是一一对应的,从回边返回的数据与初始数据进行 union 之后,实现数据的迭代解决。

迭代外部为用户提供了数据集解决实现的告诉性能,即进度追踪的能力。用户基于这一能力能够实现解决实现数据集的某一轮之后执行特定操作。比方在离线训练的时候,用户能够在某个算子收到模型的更新数据之后,计算模型的下一轮更新。

此外对于没有回边的输出数据,容许用户指定每一轮是否进行重放。对算子也提供了每轮新建一个算子以及通过一个算子的实例解决所有轮次数据的能力。通过这种形式,用户毋庸重建算子实例就能实现在轮次之间缓存数据的能力。用户也能够通过回放输出数据并重建算子来复用迭代外的算子,比方 Reduce、Join 这种罕用算子,输出数据进行重放并且算子会在某一轮进行重建,这种状况下用户能够间接复用这些迭代外的算子。

同时,咱们提供了两种终止判断逻辑,一种是当整个迭代中曾经没有数据在解决的时候,会天然终止迭代。另外一种是在无限流的状况下,也容许用户指定特定数据集,如果这一数据集在某轮没有数据产生,用户能够提前终止整个迭代。

DataStream<double[]> initParameters = …
DataStream<Tuple2<double[], Double>> dataset = …
​
DataStreamList resultStreams =
    Iterations.iterate(DataStreamList.of(initParameters),
          ReplayableDataStreamList.notReplay(dataset),
          IterationConfig.newBuilder().setOperatorRoundMode(ALL_ROUND).build();
          (variableStreams, dataStreams) -> {DataStream<double[]> modelUpdate = variableStreams.get(0);
                    DataStream<Tuple2<double[], Double>> dataset = dataStreams.get(0);
 
                  
                    DataStream<double[]> newModelUpdate = …
                    DataStream<double[]> modelOutput = …
                    return new IterationBodyResult(DataStreamList.of(newModelUpdate), 
       DataStreamList.of(modelOutput)
                });
         
DataStream<double[]> finalModel = resultStreams.get("final_model");

上图是应用迭代 API 来构建迭代的例子。用户须要指定有回边和无回边的输出列表、算子是否须要每轮重建以及迭代体的计算逻辑的等。对于迭代体,用户须要返回回边对应的数据集以及迭代的最终输入。

public static class ModelCacheFunction extends ProcessFunction<double[], double[]>
        implements IterationListener<double[]> {private final double[] parameters = new double[N_DIM];
 
        public void processElement(double[] update, Context ctx, Collector<O> output) {
            // Suppose we have a util to add the second array to the first.
            ArrayUtils.addWith(parameters, update);
        }
 
        void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector) {if (epochWatermark < N_EPOCH * N_BATCH_PER_EPOCH) {collector.collect(parameters);
            }
        }
 
        public void onIterationEnd(int[] round, Context context) {context.output(FINAL_MODEL_OUTPUT_TAG, parameters);
        }
    }

对于迭代内的算子,如果它实现了 IterationListener 接口,就会在所有数据集解决完某一轮之后,告诉迭代的算子。如果整个迭代都解决终止则会通过 onIterationTerminated 对算子进行进一步告诉,用户能够在这两个回调中实现须要的计算逻辑。

在迭代的实现中,对于用户通过代码来创立的迭代解决构造,会减少一部分迭代外部的节点,并对用户所有的解决节点进行 wrap 操作,从而达到治理算子生命周期并对数据类型进行转换的目标。最初, 迭代基于 Colocation 与本地内存实现了回边,这样在调度器看来整个作业逻辑依然是一个 DAG,从而能够间接复用当初的调度逻辑。

在迭代中插入的专用算子次要包含 input、output、head 与 tail 算子,其中 input 和 output 负责数据类型的转换,内部数据进入迭代内时会为每一条记录减少一个迭代头,外面记录了该 record 解决的轮次,每个算子的 wrap 会将迭代头去掉后交给用户原始的算子解决。

head 和 tail 算子负责实现回边及计算某一轮迭代是否曾经全副解决实现。head 算子从 input 读取残缺个输出,并在最初插入一条非凡的 EpochWatermark 事件,来标记第零轮迭代的终止。因为 head 算子可能会存在多个并发,所以必须等 head 算子的所有并发都读取完输出后,能力发送第 0 轮终止的事件。

head 算子通过 Operator Coordinator 来同步所有并发。Operator Coordinator 是一个位于 JobManager 中的全局组件,它能够与所有 head task 进行双向通信,所有 head 算子并发都收到每一轮解决实现的告诉后,就会发送全局播送给所有 head task,通知他们这一轮的解决曾经全副完结。head 发送 EpochWaterMark 之后,所有迭代中的算子都会与这一音讯进行对齐。算子从所有输出中都读取到特定轮次的 EpochWatermark 之后,就会认为这一轮解决实现,并调用这一轮完结的回调。当 tail 节点收到某一轮数据或 EpochWatermark 之后,会将轮次加 1,而后通过回边再次发送给 head,从而实现数据循环解决。

最初咱们也反对了有迭代状况下的 checkpoint 性能。因为 Flink 默认的 checkpoint 机制无奈反对带环的计算图,因而咱们对 Flink 的 checkpoint 机制进行了扩大,实现了带环的 Chandy-Lamport 算法,会同时缓存来自回边的音讯。另外 head 算子在对齐的时候,除了要读取失常输出的 barrier 之外,也会期待来自 Operator Coordinator 的非凡的 barrier。每一轮全局完结的音讯也是来自 Operator Coordinator 播送,能够保障每个 checkpoint 中所有迭代内的算子都处在同一轮,从而简化算子后续进行并发批改的操作。

另外还有一个开发中的优化,Operator Coordinator 会将收到的 barrier 提早到下一个全局对齐音讯之后,再发送告诉,从而使得整个迭代内的算子的 state 都是恰好处于读取完某一轮数据之后。许多同步算法都是先将缓存收到的数据存储在算子中,直到读取完一轮所有数据之后再进行对立解决。这一优化能够保障在进行 snapshot 操作的时候,正好清空所有缓存,从而使整个 checkpoint 中缓存的数据量最小。

以上是对于 Flink 流批一体迭代引擎的介绍,这些引擎能够同时在在线和离线场景中应用,并且反对 exactly-once 的容错。将来咱们将进一步反对 batch 的执行模式,并提供更多的下层开发工具。

三、Flink ML 生态建设

最近咱们曾经将 Flink ML 相干代码从 Flink 外围代码库中移入一个独自的 Flink ML 代码库。这样做的首先是为了不便 Flink ML 的疾速迭代,其次也心愿通过这个伎俩缩小 Flink 外围代码库的复杂度,防止 Flink 外围代码库过于臃肿。

另外,咱们在 Github 上建设了一个中立的组织 Flink-extended,可能为所有 Flink 社区的开发者提供平台来奉献一些他们心愿开源的我的项目。不便大家分享不带有特定公司的名字的代码,使不同公司的开发人员能够把代码奉献进去,不便 Flink 社区来应用和共建。咱们心愿借此促成 Flink 生态的凋敝倒退。

目前中立我的项目中曾经有一些比拟重要的我的项目,比方 Deep Learning on Flink,它是由阿里大数据团队次要开发的一个开源我的项目,核心作用是能够把 Tensorflow 打包成 Java 算子在 Flink 中运行,不便将 Flink 的预处理程序与 Tensorflow 深度学习的训练算法相结合,造成端到端的训练以及推理。

最近咱们曾经在 Flink ML 中新增了若干常见算法,之后还会持续提供更多开箱可用的算法。

上图是咱们目前正在进行中的重要工作,其中最外围的工作是将现有的阿里巴巴开源的 Alink 代码库进行革新,使其中的算法可能适配新设计的 Flink ML API,并将革新后的算法奉献到 Apache 我的项目,不便 Flink 用户失去更多开箱可用的算法。

此外,咱们还与 360 一起单干共建 Clink 我的项目,外围指标是在离线计算中用 Java 去运行某些算子,失去训练后果。另一方面,这些算子须要可能以非常低的提早做在线推理。然而低提早在线推理很难用 Java 实现,通常须要用 C++ 来实现。为了使开发者只写一遍算法就能同时利用于 Java 和 C++ 环境,Clink 提供了一些打包的根底类的性能,不便算法开发者写好 C++ 算子之后,可能应用 JNI 打包成 Java 算子,并在 Flink 中应用这些算子。

最初,咱们打算在 Flink ML 中开发对于 Python 的反对,其中包含容许算法使用者通过写 Python 程序将 Flink ML 中的 Java 算子进行连贯和组合应用,心愿能进步机器学习开发者的效率和应用体验。

以上工作根本都曾经进入开源我的项目,其中算法 API 的设计在 FLIP-173 中,迭代引擎的设计次要在 FLIP-176 中,FLIP-174 和 FLIP-175 别离提供了算法参数的 API 以及 GraphBuilder 的 API。Clink 和 Deep Learning on Flink 等我的项目也曾经在 Flink-extended 的组织上,欢送大家应用。

点击查看直播回放 & 演讲 PDF


更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…

正文完
 0