关于flink:AlinkTensorflow-on-Flink-在京东的应用

10次阅读

共计 7363 个字符,预计需要花费 19 分钟才能阅读完成。

本文整顿自京东搜寻举荐算法工程师张颖、刘露在 Flink Forward Asia 2021 的分享。次要内容包含:

  1. 背景
  2. 京东搜寻举荐机器学习现状
  3. 基于 Alink 实现在线学习
  4. Tensorflow on Flink 利用
  5. 布局

FFA 2021 直播回放 & 演讲 PDF 下载

一、背景

搜寻和举荐是互联网利用的两个外围入口,大多数流量都来自于搜寻和举荐这两个场景。京东批发按站点,分为主站、京喜、海内站以及一些垂直畛域站点。

对于搜寻业务来讲,每个站点下会有关键词搜寻、下拉发现、以及店铺、优惠券、订单等细分页面的搜寻;对举荐业务来讲,按照利用场域不同,划分了大大小小几百种举荐位。

以上每一种业务场景下,都蕴含了十多种策略环节,须要机器学习模型反对。基于海量的商品数据、海量的用户行为,作为机器学习的特色样本。

除了搜推广畛域中典型的用意辨认、召回、排序、相关性模型以外,京东搜寻举荐为了更好的保护用户、商家、平台这三方的生态,在智能经营、智能风控、成果剖析这些环节,也越来越多的引入模型进行决策。

二、京东搜寻举荐机器学习现状

咱们根据服务场景和服务时效的差别,将这些机器学习场景进行分成了三类:

  • 一种模型是在用户拜访搜寻或举荐页面时,即时申请到的商品召回、排序、用意辨认等模型,这类模型在服务层面对响应工夫要求极高,预估服务位于在线零碎中。
  • 另一种模型是对服务响应工夫要求不高,但对模型的训练和预估有肯定的时效性要求,比方实时用户画像、实时反作弊模型,这里咱们把它称作是近线场景。
  • 第三种是纯离线的模型场景,比方商品或用户的长期画像、针对于各种素材标签的常识图谱,这些场景的训练和预测对时效要求绝对较低,全在离线环境下进行。

咱们来看下以后次要的模型服务架构是怎么样的:

京东搜寻和举荐零碎因为业务零碎自身的差别,别离由不同的 kernel 链式模块,组成为搜寻零碎和举荐零碎。

一次用户搜寻,会逐级申请链路上的各级服务,先对关键词过 QP 服务,走用意辨认模型;再由召回服务并行申请各路的召回,会顺次调用召回模型、相关性模型、粗排模型;而后排序服务汇总后果集后,会调用精排模型、重排模型等。

一次用户拜访举荐的业务过程,有一些差别,但整体上流程比拟靠近。

这两个大业务上层,会共享一些离线、近线根底用处的模型,比方用户的画像、素材标签、各种指标剖析。

他们拜访的模型服务架构,都由训练 + 预估两局部组成,两头由模型仓库和参数服务桥接起来;特色方面,在线场景须要特色服务器,离线场景则由数据链路组成。

从模型状态上,咱们能够把现有模型划分成两种状态:

  • 左侧一类模型,单体规模绝对简单,采纳数据并行的模式对同一组参数进行训练,应用自研参数服务器对超大规模稠密参数进行训练,训练和预估的架构互相拆散。
  • 右侧一类模型,单体模型绝对简略,数据量和业务粒度繁多,按不同业务粒度进行数据划分,别离建模,由流式计算框架来驱动数据流转,做到训练和预估的架构一体。

基于在线服务和离线训练的架构差别,少数模型零碎会是这种在线和离线拆散的零碎状态。训练过程是基于 Tensorflow、Pytorch 进行一层封装。

样本生产和预处理,是基于 Flink 构建出的样本链路框架,其中很多在线业务的特色,源于线上服务的 Featurelog 特色日志;模型训练和样本生产形成了离线局部,依赖一些公共的根底组件比方 Hive、Kafka、HDFS;预估过程基于自研的预估引擎,在 CPU 或 GPU 上进行 Inference 计算,大规模稠密向量由独立的参数服务器提供;特色服务为预估过程提供输出数据,也是由自研的特色服务形成,因为预估时特色起源和训练时不同,有一层对立的特色数据获取接口,以及对应的特色抽取库。

特色抽取和模型预估形成了在线局部和离线局部拆散。

在模型迭代的状态上,对时效有较高要求的模型,个别是先离线应用历史累积的批式数据训练失去 Base 模型,部署上线之后,持续用实时数据流样本在其根底上继续的训练和迭代上线。

因为预估和训练在两套架构下,继续迭代的过程就波及两套架构的交互、数据传递,以及一致性方面的要求。

训练以及预估须要联合数据状态,自主实现容错转移、故障复原的能力。如何将数据的分布式解决和模型的分布模式联合为一个整体,便于部署和保护,也是一个不易实现的性能。对不同模型,加载和切换预训练参数的模式也难做到对立。

三、基于 Alink 实现在线学习

首先,咱们来剖析一下在线学习零碎的痛点:

  1. 离线 / 流式训练架构难以对立:典型的在线学习首先由离线的少量数据训练出一个模型作为 basic model,之后在这个 basic model 的根底上继续的进行流式训练,然而这个链路下流式训练和离线训练是两套不同的零碎、代码体系,比如说,个别 offline train 和 online train 是两套不同的架构体系。offline 的训练可能是一个一般的离线工作,在线的训练可能是单机启动的一个继续的训练任务,这两种工作零碎不同、体系不同,甚至如果在线训练是用 Spark/Flink 跑的话,可能代码自身也不同。
  2. 数据模型:上述讲到了整个训练架构难以对立,因而,一个业务引擎外面用户须要保护两套环境、两套代码,许多共性不能复用,数据的品质和一致性很难保障;且流批底层数据模型、解析逻辑可能不统一,导致咱们须要做大量的拼凑逻辑,甚至为了数据一致性须要做大量的同比、环比、二次加工等的数据比照,效率极差,并且非常容易出错。
  3. 预估服务:传统的模型预估都是须要部署一个独自的模型服务,而后由工作以 http/rpc 模式去调用来获取预估后果,然而这种模式须要多余的人力去保护服务端,且实时 / 离线预估场景下 rpc/http server 并不需要始终存在,它们只须要随着工作的开始而开始,随着工作的完结而完结就能够了;且离线训练进去的模型如何服务于在线又是一个令人头疼的问题。
  4. 模型降级:模型任何模式的降级都会对模型带来肯定的影响,在这里,咱们次要探讨模型的降级对模型参数失落带来的影响。

这是一个简略的在线学习的经典流程图,上面我来解释一下这个流程图在 Alink 链路是如何实现的:

  1. 离线训练任务:该 Alink 工作去 hdfs load 训练数据,先将训练数据进行特色工程等的加工之后,将模型进行离线的训练,训练实现之后将 model info 和模型参数数据写入 parameter server,该工作天级运行,每次运行训练比如说 28 天的数据。
  2. 实时训练任务:实时工作方面,该 Alink 工作从 kafka 读取样本数据,将样本数据进行一 定积攒之后比如说小时级、分钟级、条数等进行小批量的训练,先去 parameter server pull 模型参数和超参数据,load 模型之后如果有预估需要的话,可能进行一次 predict,如果没有预估需要,能够间接进行模型训练,并且将训练之后的模型数据 push 给 parameter server。

接下来咱们次要来看看实时学习的模型如何服务于在线预估的场景:

  • 首先,实时的训练必定不会影响模型构造的,即实时训练只会影响模型参数的更新;
  • 第二,预估和训练的 ps 必定是要离开的,因而,这个问题就变成了如何去同步预估和训练的 ps 的数据。

在这里业界大略有两种实现计划:

  • 计划 A:这个是针对一些小模型的训练,能够让 Alink 的工作间接将训练好的参数同时 push 给离线 PS 和在线 PS。
  • 计划 B:引入一个相似 PS controller 的角色,该角色负责计算参数,同时将参数同时 push 给离线 PS 和在线 PS。

不过,咱们也能够让 Alink 的训练任务写训练 PS,同时结构一个相似 ps server 的角色来同步参数,将 server 的更新同时写一个相似 kafka 的队列,启动一个预估 ps server 生产 kafka 队列 外面的参数信息,这样做到训练 PS 和预估 PS 之间的一个数据同步。

计划很多,抉择本人适合的就好了。

上面咱们来先看一下模型版本升级为什么会带来的参数失落:

  • 假如 1 号凌晨的时候训练的前 28 天的数据,训练完了之后将参数写入了参数服务器,1 号到 2 号之间始终在流式的训练,始终在增量写参数服务器,始终到 2 号凌晨。
  • 2 号凌晨的时候开始训练前 28 天的数据,假如训练工夫为 1h,此时如果间接写入 PS 的话,那么该 1h 的数据将被间接笼罩,对于一些工夫不敏感的模型倒也还好,至多不会报错。然而对于该业务外面 prophet 工夫序列模型来说会出问题,因为该模型参数少了 1h 的数据,模型可能会因而降级准确度。

其实总结起来就是模型迭代的时候,因为离线训练实现须要肯定工夫,如果间接笼罩的话,会造成这段时间的参数失落。因而,咱们必须保障 PS 外面的参数在工夫上是间断的。

这个图外面咱们次要介绍了 PS 冷启动和热切换的流程:

  1. 模型训练冷启动之后因为参数失落问题模型临时不可用,期待第一次 warm start 之后模型进入可用状态;
  2. Parameter Server 反对多 scope 多 versiion,模型热切换的时候只更新 ps new scope,warm start 的时候更新所有 scope;
  3. 模型每次 predict 的时候都只 pull old scope 的数据,进行 warm start 的时候 pull new scope。

上面具体承受一下整个链路的流程:

  1. 冷启动的时候因为离线的工作训练模型须要肯定工夫,因而,这时候 PS 外面的参数短少了该时间段的数据,所以只能先进行 warm start 将参数补全,并写入 PS old scope 和 PS new scope;
  2. 之后进行失常的预测和 warm start 过程,其中 predict 的时候只 pull ps old scope,因为 ps new scope 外面的数据会再热切换的时候被笼罩造成参数失落,失落参数的 ps 不能进行预测;
  3. 等到第二天凌晨的时候进行热切换,只更新 ps new scope;
  4. 之后失常 pull ps old scope 进行 predict,pull ps new scope 进行 warm start 的流程。

接下来我来介绍一下流式训练的痛点:

  1. 对于在线的训练不反对 failover。大家应该都晓得,在线训练难免会因为各种各样的起因 (比方网络抖动) 中断,这种状况下,适合的 failover 策略是十分重要的。咱们将 Flink 的 Failover 策略引入咱们自研的模型训练算子,进而反对模型的 Failover。
  2. 适合的 pretrain 策略:任何模型的训练 embedding 层都是不须要每次从 PS 外面 pull 的,个别业界会自研一些相似 local ps 的模式来在本地存储这些稠密向量,当然咱们也能够将这些 local ps 引入到 Flink 外部来解决这个问题,然而对于 flink 来说,咱们在一些场景下齐全能够用状态后端来代替 local ps。利用 Flink 的 state 和 parameter server (参数服务器) 交融,init 或者是 failover 的时候将 parameter server 的局部热数据 load 到 state 外面对模型进行 pretrain。
  3. 很难实现分布式的需要。如果是一些自身是反对分布式的架构倒还好,然而有一些算法自身是不反对分布式的 (比方 facebook 开源的 prophet),在这种状况下如果数据量大而且还不必分布式的话,跑完一大批数据可能会极其消耗工夫;Alink 人造反对分布式,Alink 是基于 Flink 的下层算法库,因而,Alink 具备 Flink 所有的分布式性能,反对 Flink Master 的所有调度策略,甚至能够反对各种精密的数据散发策略。

流式训练的 failover 策略:

在线分布式训练的时候常常会有某台机器因为某些起因 (如网络) 异样的状况,这种状况下如果要复原个别有两种状况:

  1. 容许数据失落

    个别的训练任务都是容许大量数据失落的,因而咱们心愿能够就义一些数据进而换来整体工作的继续训练,引入部分复原的策略能够大大提高工作的持续性,防止了工作因为一些内部起因造成的单点故障而全副复原的状况。

  2. 不容许数据失落

    在这里咱们只探讨 at least once 的状况 (exactly once 要求 PS 反对事务),如果说业务对数据的要求比拟高,咱们能够采取 global failover 的策略,当然了,个别单点重部署异样的状况下也会走 global failover 的策略在该业务中,咱们采纳部分复原的策略来优先保障工作的继续训练。

上面具体介绍一下训练任务的重启的时候策略:

  1. global recovery。这里就是 Flink 外面罕用的 Failover 的概念,不再过多赘述。
  2. singal task recovery。在该状况下某个 taskmanager 因为网络异样呈现了心跳超时,此时为了保证数据一致性,Flink 工作会产生 failover 并且从上次的 checkpoint 复原,然而如果容许大量的数据失落且为了保障工作的继续输入,能够开启部分复原,此时工作只会重启该 taskmanager,能够保障训练的持续性。
  3. 单点重部署异样。如果工作呈现了任何起因的故障,导致工作单点复原的过程中呈现了异样导致单点复原失败,这种时候就产生了单点重部署异样,该异样无奈解决,只能通过将工作 failover 来解决问题,此时可依据工作须要配置从 checkpoint 复原或者是不复原继续训练。
  4. 这里我着重介绍一下工作 failover 的时候从 checkpoint 复原的场景:工作 fail 的时候首先执行 save 办法,将以后 PS 的状态 snapshot 保存起来,将 Flink 状态后端的数据也保存起来,工作复原的时候执行 load 办法,将 PS 复原。认真想能够发现,该操作会造成局部参数的反复训练 (cp 的工夫点和 save 的工夫点不统一),心愿大家留神。

基于 Alink 的流式训练 pretrain 策略大抵可分为冷启动、全局复原和单点复原三个模式:

  1. 冷启动的时候大略是先从 PS 外面 pull 模型参数和超参信息,而后初始化 ListState、MapState、ValueState 等状态后端,同时初始化 PS 的 scope 和 version 信息。
  2. 全局复原也就是 Flink 默认的 Failover 策略,在该模式下工作首先会 save model,行将 PS 外面的模型信息序列化至硬盘上,之后 save flink 工作外面的状态后端的数据,而后初始化的时候就不须要再 pull 超参等信息了,而是间接抉择从状态后端复原超参,并且 reload 模型的 参数进行继续的训练。
  3. singal task recovery,该模式是容许大量的数据失落且为了保障工作的继续输入才会采取的,在该模式下工作只会重启该 tm,能够最大水平的保障训练任务的稳固继续训练。

  1. 以后比拟风行的 3D 并行、5D 并行架构外面,数据并行是最根底也是最重要的一个环节。
  2. Flink 最最根底的数据散发策略有包含 rebalence、rescale、hash、broadcast 等的多种抉择,且用户能够通过实现 streampartitioner 自在的控制数据的散发策略,用户能够通过 load balance 等自在的实现数据并行来解决数据歪斜带来的模型参数间相互期待的问题。
  3. 在该模式下,咱们买通了 Alink 模式下分布式调用 python 办法的通路,能够最大水平的进步数据并行的效率。
  4. 数据并行是疏忽流、批的,咱们集成 Alink 的 mapper 组件,实现了 train 和 update model variable 批流一体化。

四、Tensorflow on Flink 利用

上面我先介绍一下 Tensorflow on Flink 预估服务和传统的在线预估链路的不同:

  1. 区别于在线预估,实时 / 离线预估不须要服务始终存在,且 load 到 tm 外部能够大幅节约人力保护和资源老本。
  2. 整个链路架构不同导致数据模型、数据处理、模型训练、模型推理等须要别离保护不同的零碎和代码构造。

Tensorflow on Flink 预估服务目前有多个计划,比方:

  • 计划 A:部署一个 rpc 或者是 http server,用 flink 通过 rpc 或者是 http 以 client 的形式去调用。
  • 计划 B:将 Tensorflow 模型 load 到 flink tm 外部,间接调用。

其中计划 A 有如下弊病:

  1. rpc 或者是 http server 端须要多余的保护人力。
  2. 实时 / 离线预估和在线预估不同的点是该 rpc 或者是 http server 端并不需要始终存在,它们只须要随着工作的开始而开始,随着工作的完结而完结就能够了,始终存在是对资源的节约,然而如果改成这种架构,那么无疑会更加的消耗人力保护老本。
  3. 还是下面的架构不对立问题,rpc 或者是 http server 端和实时 / 离线数据处理往往不是一套零碎,这就还是波及到了之前始终强调的架构不批准问题,不再赘述。

五、布局

  1. 采纳 Flink sql 实现批流一体的模型训练,争取使模型训练更加不便。
  2. Tensorflow Inference on Flink 实现反对大模型,基于 PS 实现动静 embedding 的存储:搜寻、举荐等业务场景中存在大量的 id 类特色,id 类特色通常采纳 embedding 的形式,这些特色在特定状况下会急剧收缩,进而吞掉 taskmanager 的大部分内存,且原生 tensorflow 的 variable 应用起来会有诸如须要预先指定维度大小,不能反对动静扩容等不便,因而,咱们打算将内嵌的 Parameter Server 替换为咱们自研 PS,反对千亿规模的分布式 serving。
  3. 将 PS 外面的 embedding 动静 load 到 taskmanager 的 state 外面,实现升高对 PS 拜访压力的需要:Flink 外部通常应用 keyby 操作来将某些固定的 key hash 到不同的 subtask 上,因而咱们能够将这些 key 所对应的 embedding 缓存到 state 外面,升高对 PS 的拜访压力。

六、鸣谢

  1. 首先感激京东数据与智能部数据时效 Flink 优化团队的所有共事的帮忙与反对。
  2. 感激 Alink 社区全副共事的帮忙与反对。
  3. 感激阿里云计算平台事业部 Flink 生态技术团队所有共事的帮忙与反对。

上面是 Alink 和 flink-ai-extend 的 github 链接,欢送大家 star。

https://github.com/alibaba/Al…

https://github.com/flink-exte…


FFA 2021 直播回放 & 演讲 PDF 下载

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

正文完
 0