关于后端:流批一体在-AI-核心电商领域的探索与实践

40次阅读

共计 6698 个字符,预计需要花费 17 分钟才能阅读完成。

摘要:本文整顿自阿里巴巴高级技术专家祝海峰,在 Flink Forward Asia 2022 流批一体专场的分享。本篇内容次要分为四个局部:

  1. 背景及平台历程
  2. 平台架构及实际
  3. 平台优化
  4. 将来布局

点击查看直播回放和演讲 PPT

一、背景及平台历程

搜寻、举荐、广告是电商畛域十分重要的导购场景,而这些引擎的原始数据,比方商品、卖家的事务数据、算法数据,用户点击日志等都散布在不同的存储系统里,这就须要有一个离线零碎将这些数据聚合到一张大宽表上,再提供给引擎应用。

这样的离线零碎,它有两个典型的特色:

  • 所有的历史数据都须要通过批处理导入到引擎里。
  • 实时数据须要更快的速度流入引擎里。

咱们以前开发这样的离线零碎,会面临以下若干痛点。总的来说,流批两次开发和数据口径问题较难解决。须要理解计算引擎和存储系统,运维简单、性能调优的门槛较高,须要有业余的大数据团队进行业务开发。

为了升高离线开发和运维的门槛,缩小业务接入的老本和进步业务迭代的效率,咱们从 2016 年开始研发和建设搜寻、广告、举荐离线平台,简称 SARO 平台。

这个平台是从开发到运维的一站式平台,用户能够通过利落拽 UI 形式开发,没有大数据背景的人也能应用。平台屏蔽背地的大数据技术,进一步升高用户运维的门槛。在开发上,从数据源到引擎,一个 ETL 流程,一次开发流批一体,平台治理背地的作业依赖和存储的对接。

上图列举了平台上反对的局部业务。到目前为止,平台领有千级利用规模,日治理万级作业量,PB 级日解决数据量,百万级增量 TPS,秒级增量延时,间断六年胜利反对双十一。

这里是平台的一个倒退历程,从 2016 年开始到当初一共经验了五个倒退阶段。

简略来说,从最后的 MR、自研的 iStream 流式框架多套引擎,之后逐渐通过 Blink 替换外面的局部作业,到当初曾经全面对立在一套 Flink 引擎上。在流批开发的 API 上,也紧跟 Blink 的倒退步调,从最后批流作业都跑在 DataStream 上,到起初的 Table API,再到当初全副对立到了 SQL 上。对立的计算引擎和开发 API 能够升高平台本身的开发和运维老本。

而存储的倒退绝对简略,最后是应用 HBase,当初咱们应用的是阿里云的 Hologres 实时数仓。

二、平台架构及实际

上图两头框里的是平台的技术架构,之上是平台承载的业务,上面是平台的技术底座。平台通过 ETL 开发 Console 前端,提供用户数据源开发、公布治理、部署治理、运维治理等开发运维治理能力。而这些能力则通过 Console 上面的 ETL Manager Service 来提供。

ETL Manager Service 次要负责 ETL 的生命周期治理。在它的上面一共有两个局部:

  • ETL Executor,它次要负责 ETL 的具体执行。在咱们的实际中,咱们通过 Airflow 调度和 Flink 来共同完成整个 ETL 的过程。
  • Catalog,它次要负责维表存储、其余存储的创立、Meta 治理。

此外咱们还有两个模块,别离是用户自定义插件代码的编译治理模块和数据血统模块。

上面以最简略的一个用户开发案例,来给大家讲一讲平台在流批一体上的具体实际。上图是用户开发的 ETL 解决流程图,它由两局部组成,别离是商品维度的数据和卖家维度的数据。

商品维度的数据,次要有商品表、商品图片表、算法模型数据,这三张表通过字段改名等业务解决,最终 Merge 到商品维表上。须要额定阐明的是,一个商品有多张图片,所以咱们须要有个 Udaf 把它聚合到商品维度上。

卖家维度的数据,次要有卖家表和对应的算法模型数据表,同样会 Merge 到卖家维表上。之后商品维表和卖家维表之间会进行 DimJoin,再通过 Udtf 插件解决,最终输入给引擎。

咱们把用户画的这个图叫做 Business Graph,图上的所有输出和输入都是存储表,两头是业务解决算子。咱们把业务解决算子分成两局部,一部分是数据聚合的解决算子,这里重点列举了 Merge 维表聚合、DimJoin 维表 Join,一对多的 Udaf。另外一部分是业务逻辑解决算子,这里重点列举了 Udtf 插件。

后面讲过所有的输入输出都是存储表,咱们进一步对存储表形象成一个动静表。动静表由一个全量的批表和一个增量的流表组成,它们共用一份 Schema 语义。后面用户画的图就能够既表白批处理过程,也能够表白流处理过程,一次开发流批一体。

用户画的那个图,在平台背地是如何变成数据处理过程的呢?

首先有一组批流工作,将商品维度的表同步到商品维表上。同样有一组批流工作,把卖家维度的表同步到卖家维表上。在两张维表都同步实现后,咱们会有一组批流工作,将这两个维表进行 DimJoin,并产出给引擎。

此外,咱们还有个流工作,它会去查问卖家到商品的索引表,而后将卖家的变更触发到商品上,这样卖家的变动就能实时反映到引擎里。

总的来说能够分为三个阶段,别离是同步、Join、索引表查问触发,后面两个阶段里都有批流工作,到底是先跑批工作再起流工作,还是批流始终起,这取决于维表存储的设计和选型。

后面提到了数据处理过程,其实维表存储的设计也至关重要。咱们以商品维表为例,商品表和商品图片表有数据库的 Binlog 能够放弃实时同步,但算法模型数据是 T+1 的,每天都须要变更。所以咱们依据这个个性就把这两类表分成了两张表,此外咱们还须要一张索引表,将卖家的变更转换到商品上。如此一来,一个维表上其实有三类表。

对于维表的读写个性,批次写入的时候咱们心愿不要对实时链路产生影响,因此心愿它具备 Bulkload 的能力。批次读的时候,咱们提到外面有多张表,那么就须要在 scan 多张表的同时有更高效的 Join 能力。

对于流的写入,增量在维表上大部分状况是局部 Schema 字段写入,因此须要 Upsert 的能力。对于维表的 Join,咱们须要更高效的 KV 点查能力以及索引查问能力。

后面提过增量总是更新维表下面的局部字段,其余字段的补全目前咱们是在计算阶段去实现。如果维表存储有 CDC 的能力,咱们能够间接生产维表的 Binlog 而不必做额定补全计算。此外咱们还能够基于 CDC 做更多的优化,前面会有介绍咱们基于 CDC 上做的优化。

个别的数据处理过程,首先会起一个批工作,在批工作实现之后,再起流工作去回追。如果批工作的工夫较长,会对上游增量回追造成较大的压力。如果维表存储有 MVCC 个性,批流工作能够同时启动,没有任何回追,对上游引擎来说也更敌对。

此外,咱们会常常加减字段,所以维表存储也须要有 Alter Schema 的能力。

上图是用户图变成具体的数据处理过程的步骤,最右边是用户画的图,通过校验、解析、优化等步骤失去优化后的图。其中一步优化是剖析维表到底有几张表,有没有索引表,索引字段是谁。

优化后的图通过 Flow Generate,这个阶段次要依据后面说的数据处理模型的几个阶段里的批流工作的依赖关系,绘制出的一个依赖关系图 Flow Graph。Flow Graph 再通过 Flow Code Translate 翻译成具体调度的代码。在咱们实际中,咱们应用的是 Airflow 调度,所以咱们会翻译成 Airflow Python Code,之后由 Flow Runner 把调度给启动起来。在 Airflow 的节点里可能是一个作业节点,也可能是一些其余的预处理节点。

上图是 Airflow 上一个作业节点 Python 代码的案例,咱们次要看黄色高亮的局部。

首先这是一个增量同步的 Executor,这个 Executor 次要负责拉起增量同步作业。上面 snapshot 是指用户画的图的具体协定,再上面是这个增量同步作业无关的子图。这个 Executor 拿到用户画的图的协定之后,再依据子图就能够启动对应的作业。

上图是 Executor 拉起一个作业的具体步骤。最右边的依然是用户化的图,和后面一样,它通过一系列的优化,失去了优化后的图。

而后再通过 Job Generate,这个过程次要是依据子图的信息,还原出子图,最初转换出 Job Graph。这个 Job Graph 通过 Job Code Translate 会翻译成具体作业的代码。在咱们的实际中,咱们次要翻译成 Flink SQL。之后咱们会将这个 SQL 提交给 Job Runner,让它运行起来。值得一提的是,Job Code Translate 和 Job Runner 都是插件化定制的,这么做是为了不便咱们这么多年来计算引擎的继续降级和咱们外围模型的解耦。

上图是一组批流 SQL 的案例,咱们只看黄色高亮的局部。右边是 MySQL,左边是 drc,它是阿里数据库 Binlog 的中间件。两头批流解决的过程根本是统一的,字段也是统一的,到了最初写出的时候,他们会写到同一张维表上,有所不同的是流式工作还会写到音讯队列上。

三、平台优化

还是以商品维表为例,这里的 V1、V2 代表的是全量版本,以 V1 为明天的全量,V2 为今天的全量为例。后面提到商品有多张表,一张是放弃数据库实时同步的表,还有一张是 T+1 表。那么咱们基于这个个性把作业也分成两局部,第一局部是左上角的商品表、商品图像表的批流工作,第二局部是下面两头框里的算法模型数据的批工作,左下角还有一组批流工作,实现这两张表的聚合。

当开始今天 V2 版本全量的时候,因为商品表和商品图像表通过 Binlog 始终放弃同步,所以这部分的全量同步咱们能够优化掉。只须要对今天的算法模型数据,再进行一个 V2 版本的批工作同步,写到今天 V2 版本的 T+1 表里。同样,咱们在今天起一组批流工作去生产 V1 版本的数据库实时同步表和 V2 版本的 T+1 表,提供给引擎应用。

这样的优化对用户是无感知的,咱们会去比拟 V1 和 V2 两个版本用户图的变动。如果数据库局部没有任何变动,咱们会主动进行优化执行。此外,V2 版本的数据在引擎服务上线当前,咱们才会将 V1 版本的 T+1 数据和 V1 版的局部增量作业进行下线。对用户来说,版本的切换他是感知不到的,他感觉到的就是 24 小时不间断的增量。

上图次要是针对大型业务的优化。在这个例子中,业务的全量流程到了 T2 时刻实现,引擎才开始索引构建和服务上线。但留给引擎的工夫只有 T2 到 24 点之间,数据量十分大的时候,引擎也有可能实现不了索引构建。而且在晚顶峰时,在线工作可能会对离线工作进行压抑,更容易呈现长尾。一旦呈现失败的状况,当天可能就无奈实现在线引擎的新索引上线。

咱们对数据进行了剖析之后,发现大量的数据在搜索引擎里占据着 80% 以上的曝光,大量的数据只占据 20% 不到的曝光,合乎一个二八准则的法则。所以咱们据此将所有的数据分成冷热两局部。咱们先起一个批工作,将热数据进行解决。在这个例子中,到 T1 时刻热数据处理实现,那么引擎就能够从原来的 T2 提前到 T1 时刻将热数据和昨天的冷数据合并在一起,开始做索引的构建和后续流程。在 T1 时刻之后,咱们还会起一个无限流工作,持续解决残余的冷数据。同时咱们还会起一个增量工作,将所有的热数据和冷数据的实时变更同步到引擎里。

这样一个优化上线当前,咱们有大型业务的全量工夫从七小时缩短到一小时,大大提前了索引切换上线的工夫。以前会呈现的 24 小时内无奈实现索引构建的状况不再呈现,而且在失败之后,离线和引擎都有足够的工夫去再次重跑。

一条增量音讯来的时候,并不是所有字段都有变动,也就意味着局部计算是能够被裁剪的。咱们基于 CDC 做了一个计算剪枝的优化,上面我以一个最简略的作业为例,来给大家讲讲这个优化。

首先左边框里的这个作业有一张 a、b 两个字段的源表,通过一个 UDTF1 解决,新增了一个 d 字段,再通过一个 DimJoin 读进一个 c 字段,再通过一个 UDTF2,新增一个 e 字段,这样就有 a、b、c、d、e 五个字段写出到后果表里。

用户 UDTF 的开发是基于平台提供的插件框架来进行,用户能够在插件框架之上开发多字段进和多字段出的逻辑处理单元,咱们叫做 Processer。一个 UDTF 里有多个 Processer,它们之间通过字段依赖造成字段和逻辑解决的血缘关系图。

为了简化案例,在我这个例子中,UDTF1 和 UDTF2 只有一个 Processer。那么每一个 Processer 如何判断本人是否能够被裁剪呢?这就须要有一个运行时的 Plan。右边就是这个 Plan 生成的过程,最右边是一个 JobGraph,就是我刚刚说的这个作业。

这个 JobGraph 首先通过 Transform,这个阶段次要是去解析用户插件代码,提取出字段和逻辑解决的血统图,之后再通过一个 Weld,它次要是将这些小图焊接成大图之后再通过一个编译,失去一个裸的 Plan。这个 Plan 通过编码表的组装和编码过程,最终失去一份编码后的 Plan。编码的 Plan 里次要以 bitset 的形式组织,这么做是为了在运行时有更好的查问性能。每一个 Processor 都去加载本人所须要的编码后的 Plan,当 CDC 音讯来的时候,这个 Processor 就能够查问这个 Plan,来决定本人是否要被裁剪掉。

来看一个例子,如果有一条 CDC 音讯变更字段是 a,那么 P1 和 P2 会被执行,J1 会被裁剪掉。如果这个时候是 b 字段的变更,P1、P2 和 J1 都会被执行,没有任何人被裁剪。如果是 c 的变更,P2 则会被裁剪。如果还有一条音讯 f 字段变更,它并不是咱们抉择的字段。当咱们收到 CDC 音讯时,咱们在源头上就会去查 Plan,发现 f 并不是咱们 Plan 里的字段,那么这条音讯就会被抛弃掉,从而裁剪此类的音讯。

这个优化上线当前,咱们双十一大促淘宝局部表 DimJoin 有 60% 的裁剪率,日常裁剪率在 30%,节俭资源 40%。

后面次要跟大家介绍的是零碎层面的优化,上面我抉择一部分作业层面的优化给大家介绍一下。

第一个是预测执行。在离线混布环境中,因为不同机型的差别以及在线对离线工作的压抑,离线工作常常会呈现长尾或者跑不完的状况。所以咱们在去年 7 月份的时候,在 Blink 上开发了预测执行的性能,在 75% 的 Task 实现之后,咱们会去找出长尾的 Task,再开启一个 Task 进行双跑,最初抉择先跑完的数据作为最终的数据。

这个优化上线当前,咱们有业务全量作业时长缩短了 50%,以前不太能跑的进去的作业,当初也能稳固产出。并且在去年 12 月份,咱们团队的同学也独特参加了把预测执行性能回馈社区的开发。

第二个是维表内多张表的 Join。后面提到一个维表是由多张表组成的,这些表的数据其实是依据 Key 通过排序的。咱们基于这个个性,在 Scan Connector 里做了一个优化,边读取这两张表的数据并进行 Local Join。

第三个是异步化。咱们在维表读取和写入的 Connector 外部做了一些异步读写的优化。

第四个是引擎音讯去重。咱们对发往引擎的音讯做了一个 Udaf 解决,比拟之前的音讯,如果和之前的音讯没有任何变动,咱们就会不发往引擎,如果有变动,咱们就计算出变动的字段,以晋升引擎索引的效率。这个优化上线当前,咱们有的业务音讯去重率达到 50% 以上。

在资源类型上,咱们的 JobManager 和 Stream 作业都跑在在线资源上,批作业跑在离线资源上。

在离线混部环境外面,不同机型有较大的性能差别,且整体集群规模较大,调度压力也较大,此外还有一些单机的问题,因此大型工作 Pod 比拟多的状况下,它的启动工夫会很久,所以咱们在下面做了一些优化,调整到适合的 numberOfSlot、应用 Chain 等,应用大规格的 Pod,能够缩小 Pod 的申请量,来实现启动提速。此外,咱们对重要业务进行的资源预留,以保障它更高效的启动速度。

在 FO 代价上,咱们对资源开释保留一段时间,这样能够在 FO 时应用原有的资源,实现更高效的 FO。对于不同优先级的作业,咱们会分不同的 fo-cost,这样在集群的碎片整顿和作业驱赶的时候,就能够更有针对性。

在机器热点上,解决长尾问题的预测执行后面曾经讲过,这里就不再反复了。

四、将来布局

目前咱们平台下面有大量增量极低的长尾业务,咱们心愿以多租户的形式反对这部分业务,所以前面会在多租户方面做一些摸索,Flink Session Mode 可能是技术候选之一。另一方面,平台治理着万级作业,为了进一步晋升整体的资源应用效率,咱们还会在弹性上做一些摸索。此外在顽劣的在离线混部环境稳固的运行咱们的 Flink 作业也是未来的方向之一。

点击查看直播回放和演讲 PPT<


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/product/bigdata/sc

正文完
 0