乐趣区

关于flink:Flink-流批一体在-Shopee-的大规模实践

摘要:本文整顿自 Shopee 研发专家李明昆,在 Flink Forward Asia 2022 流批一体专场的分享。本篇内容次要分为四个局部:

  1. 流批一体在 Shopee 的利用场景
  2. 批处理能力的生产优化
  3. 与离线生态的齐全集成
  4. 平台在流批一体上的建设和演进

点击查看原文视频 & 演讲 PPT

一、流批一体在 Shopee 的利用场景

首先,先来理解一下 Flink 在 Shopee 的应用状况。

除了流工作,仅从反对的批工作来看,Flink 平台上的作业曾经达到了一个比拟大的规模。

目前 Flink 批工作曾经在 Shopee 外部超过 60 个 Project 上应用,作业数量也超过了 1000,这些作业在调度零碎的反对下,每天会生成超过 5000 个实例来反对各个业务线。

从利用场景划分,这些作业在 Shopee 次要分为以下四个局部:

  • 第一个利用场景是数据集成畛域。
  • 第二个利用场景是数仓畛域。
  • 第三个利用场景是特色工程,次要用于实时和离线特色的生成。
  • 第四个利用场景是风控反作弊畛域,用做实时反作弊和离线反作弊。

从 Shopee 外部的业务场景来看,数仓是一个流批一体施展重要作用的畛域。

目前,业内还没有这样一个端到端流式数仓的成熟解决方案,大部分都是通过一些纯流的计划 + 离线数仓计划 + 交互式查问计划叠加起来达到近似成果。

在这类 Lambda 架构中,Flink 流批一体次要带来的劣势是实现计算对立。通过计算对立去升高用户的开发及保护老本,解决两套零碎中计算逻辑和数据口径不统一的问题。

但这样的 Lambda 架构复杂性又太高了。所以针对时延要求不高的业务,Shopee 实时团队主推通过 Flink+ Hudi 的代替计划,构建近实时数仓,这种计划能够解决很多场景的问题。

这种计划的益处很显著,它实现了局部的流批一体:Flink 对立的引擎,Hudi 提供对立的存储。它的限度也很显著,Hudi 数据可见性与 Commit 的距离相干,进而与 Flink 做 Checkpoint 的工夫距离相干,这缩短了整个数据链路的时延。

目前这种 Flink+Hudi 的计划曾经在 Shopee 外部很多业务线上进行应用。比方广告业务的 Deep ads. 和 offline-platform ads. 用于给广告主和产品经营产出广告数据。又比方 Shopee 外围业务 supply chain 的 WMS。WMS 的数据服务整体应用的是 lambda 架构。但对于外围业务应用该计划生成的近实时数据,用于与离线数据做 diff,监控实时链路提供的数据品质。

上图 PPT 展现了 Datamart 的一个例子。Datamart 应用 Hudi partital update 实现 DIM 表的 Join 更新,升高资源使用量。

之前,他们每小时对最近 3 小时的数据进行计算和刷新,在保证数据及时更新的状况下,解决数据提早、Join 工夫不对齐等问题。但随着数据量的迅速增长,小时级数据 SLA 的保障难度和计算资源的耗费都在一直减少。

当初,用户一方面通过 Flink 减速计算,另一方面通过与批处理联合来确保数据的最终一致性。并通过提供分级的后果表来满足不同场景的及时性要求,实时计算产出的 Partial Update Hudi 表提供局部外围实时数据,批处理产出的 Multi-version Hudi 表提供残缺且更精确的数据。

最终,在确保数据一致性的根底上,达到了分钟级提早,并无效升高了计算资源的耗费。

除了业务线应用之外,目前 Shopee 外部提供的一些平台服务也在应用 Flink。

第一个例子是 Data Infra 团队提供的 Data Hub, 它提供了一些离线集成和实时集成的罕用模块。之前他们必须引入不同的引擎来反对不同的集成模块,导致我的项目依赖简单,用户也须要理解多套引擎。

应用 Flink 后,在之后新的需要中,Data Hub 不再须要引入不同的引擎来解决批和流两套数据的集成。

第二个例子是 Feature Station,Feature Station 是 Shopee 外部提供的一个 特色生成的平台。它提供了一些升高用户运维老本的性能,比方 Feature 生成 SQL 化,反对多业务线并行开发等等。

之前这个平台的工作依赖 Spark,起初从 Spark 全副迁徙到了 Flink。Flink 带来的一个很大的劣势是便于扩大。如果之后用户有实时特色需要,用户能够将离线特色的生成逻辑十分疾速的复用到实时特色上。

下面介绍的都是 Shopee 外部流批一体利用场景的一些例子,咱们外部还有很多团队也正在尝试 Flink 的流批一体,将来会应用的更宽泛。

二、批处理能力的生产优化

Flink 在流解决方面始终有着人造的劣势,相对而言,批的能力较弱一些。咱们都能看到,社区最近几个版本中,始终在鼎力推动 Flink 批处理的能力。而对批反对的好坏也始终是用户抉择 Flink 流批一体的一个重要影响因素。上面将基于外部的实际,我将介绍一下 Shopee 对 Flink 批在生产上的一些优化,次要分为稳定性和易用性两个方向。

2.1 稳定性

批作业个别都是通过调度零碎周期性调度的。用户个别会治理大量的批作业,所以在生产实践中,他们十分关注作业的稳定性。

Flink Batch 在应用过程中,咱们次要遇到了以下的问题:

  • 当大作业执行工夫长时,工作越容易遇到各种问题,失败次数会显著减少。
  • Task 失败后 failover 的老本过高,作业的整体耗时会被重大拉长。

这样就造成了恶性循环,执行工夫越长越容易失败,而失败又反向拉长了执行工夫。这里的基本的问题是 Shuffle。

Flink 目前提供了两种 Shuffle,Hash Shuffle 和 Sort Shuffle,但这两种 Shuffle 的不同次要是体现在 Shuffle 数据的构造上,从 Shuffle 的整体架构上看,两者都是 Internal Shuffle。Internal Shuffle 就是 Shuffle 服务与 Task 共享过程,TaskManager 在 Task 执行实现之后还要持续保留去做 Shuffle 服务。

Internal Shuffle 的问题次要有两个:

第一个是:Shuffle 服务的稳定性会被有问题的 task 所影响

  • 这个有问题 task 可能来自 job 自身,也可能是同机器的其余 job。在 Shopee 外部,Spark 与 Flink Batch 跑在雷同的离线集群,所以也会受到其余类型离线工作的影响。同样,yarn 的稳定性也会影响 Flink batch 工作。
  • 依照当初的 failover 逻辑,TaskManager 无论是因为外部起因还是内部起因导致解体,Task 都会重跑,Shuffle 数据也都会失落。只管可能只是局部 Task 重跑,但因为咱们目前应用的 1.15 没有揣测执行,所以也会导致 Job 整体执行工夫重大拉长。

第二个是:当 Task 实现之后,因为 TaskManager 不能立即被开释,还要提供 Shuffle 服务,这就导致 Yarn 必须保护 Task 执行完的 container,造成集群资源利用率不高。

针对 Internal Shuffle 的问题,其实业界也曾经有了成熟的计划,那就是 Remote Shuffle。

上图中展现了两张架构图,一个是 Internal Shuffle,另一个是 Remote Shuffle,其实还有一个 external Shuffle,External Shuffle 就是把 Shuffle 服务拆分到另一个过程中。Spark 应用的 yarn auxilary external Shuffle,就是把 Shuffle 服务挪到了 Node Manager 外面,然而这还是存储跟计算混合在一起的架构。

所以咱们抉择一步到位,应用 Remote Shuffle。就是转门搭建一套 Shuffle 集群来提供 Shuffle 服务。

这种存储与计算拆散的架构有以下几个益处:

  • 计算和存储再也不会相互影响。Shuffle 服务与用户的代码齐全隔离。
  • 将 Shuffle 的工作转移到 Remote Shuffle 集群后,Task 执行结束时,Task Manager 的资源能够立即被开释。
  • 在这种架构下,计算跟资源解耦 了,咱们能够自在的扩大或者膨胀各自的资源量。

业界有不少 Remote Shuffle 的计划,比方阿里云的 Celeborn,字节的 Cloud ShuffleService,另外还有 Uber Remote Shuffle Service,Splash 等等。然而这些 Remote Shuffle 大部分次要是为了反对 Spark,反对 Flink 的并不多,另外有一些只在外部版本中反对 Flink。

最初在选型的规范外面,咱们次要思考了我的项目自身的成熟度,社区对 Flink 的反对度,与 Flink 的匹配水平,最终还是采纳了 Flink Remote Shuffle。

这个计划有几个益处:

  • Flink Remote Shuffle 是 Flink 的一个扩大我的项目,原生就是为了反对 Flink, 社区的反对力度大,之后有了问题能够跟社区多交换。
  • 目前 Flink 的 Batch 正在疾速倒退,每个版本都有很大的变动和进步,比方 1.16 的 hybrid Shuffle 和揣测执行,其余的 Remote Shuffle 不可能这么疾速跟进。

尽管 Flink Remote Shuffle 也有毛病,但临时能够忍耐。另外 Remote Shuffle 其实是跟计算引擎拆散的,等之后 Flink Batch 的个性稳固了,咱们最终心愿是离线能共用一套 Remote Shuffle service。

在集群部署计划上,咱们采取了跟 Presto 混部的计划。次要的思考是为了充沛的利用资源,Presto 和 Remote Shuffle 在资源应用上刚好互补。Remote Shuffle 自身是一个存储服务,它不怎么应用 CPU 和 memory,但会占用大量的磁盘。相同,Presto 会占用大量的 CPU 和 memory,磁盘使用量绝对较少。

另外,从工夫上看,Batch 工作更多的集中在早晨,交互式查问更多集中在白天,这也有利于资源复用。再就是为了防止相互影响,咱们应用 Ggroup 来为两个服务提供资源限度和隔离。

最初,咱们搭建了一个有 145 个节点的 Shuffle 集群,为线上的 Batch 工作提供 Shuffle 服务。其中每个节点应用一个 3TB 的 SSD 来保留数据,无效保障 Shuffle 数据的存取性能。

在集群搭建好之后,咱们也在 Remote Shuffle Service 上做了一些测试和生产验证。从上图就能够看到成果。

从性能上看,相比 Hash Shuffle , Remote Shuffle 的性能晋升了 19.3%,相比 Sort Shuffle, 性能晋升了 6.1%。

从稳定性上看,咱们取了一个之前十分不稳固的 project。后果是 Task 失败率升高了超过 70%。

所以无论从性能还是稳定性,Remote Shuffle 都能带来很好的收益。

当然,Remote Shuffle 这个我的项目也还有一些问题。

  • 网络环境的异样稳定会导致 Shuffle 服务不稳固,体现进去次要是 ShuffleClient 与 ShuffleWorker 之间连贯中断。这反映了一个问题,就是数据重传机制的缺失。
  • 另外就是没有多租户资源隔离机制,无论是带宽还是磁盘资源,目前都没有隔离机制,这会导致不同 Job 之间相互影响。

当然,这些问题也都在不断改进中,总的来看,Flink Remote Shuffle 对 Flink Batch 有很大的帮忙。

2.2 易用性

除了下面针对 Shuffle 的优化之外,Shopee 也在易用性方面做了很多工作。大家都晓得,对于流批一体,Flink SQL 为外围载体。在应用过程中,SQL 也存在一系列应用上的艰难。

第一个问题是 SQL 工作有问题后,对于用户而言定位艰难。之前咱们的流工作次要依赖 web UI, 没有 History Server。有了 History Server 之后,定位 Task 的问题失去了缓解。然而还有一个比拟麻烦的事件,就是 SQL 工作通过 Planner 优化之后,执行打算与 SQL 构造上有了较大差别,用户应用过程中,常常很难依据 Task 信息定位到相干的 SQL 语句。

第二个问题是 SQL 配置艰难。SQL 工作各 Task 之间资源应用常常不平衡,有的是 CPU 密集型,有的是内存密集型,很难通过对立的 TM 配置来解决。社区 SQL API 也并没有提供细粒度资源配置的接口。导致一些高级用户心愿优化资源使用量时,SQL 工作的资源配置十分困难。

针对 SQL 问题剖析定位的难点,咱们做了两点优化:

  • 在用户提交 SQL 工作之前,展现作业的 streamgraph,让用户执行之前就能看到 SQL 的执行逻辑,以判断是否合乎本人的预期。
  • 第二个优化就像上图中展现的一样,将执行节点转换成对应 SQL,让用户晓得每个 Task 的对应的 SQL 段,帮忙定位问题地位。

另外,一些算子为了在不同的数据下有更好的性能,同一个算子会有多种实现计划,比方 join。一些用户在排查问题时,会关怀优化器对 SQL operator 的具体实现逻辑。所以,除了展现每个 Task 的对应的 SQL 之外,咱们还提供展现 SQL 算子对应生成的 Java code,以确定算子底层实现逻辑,辅助排查 SQL 故障。

针对第二个 SQL 工作资源优化的问题,咱们在展现 streamgraph 的根底上,容许为不同的 operator 配置不同的并发度,链接策略还有 slot group 等等。

在资源配置上,咱们并没有应用社区提供的 operator 级别的细粒度资源配置。次要有两个起因:

  • Slot 资源使用量用户很难监控,目前最多监控到 TM 粒度。这导致用户没有监控根据,无奈精确预估每个 slot 的资源使用量。
  • 动静资源切割机制导致机器上呈现大量碎片。

咱们最初应用了本人开发的 SlotGroup 级别的资源配置,整体思路是不同的 SlotGroup 申请不同规格的 TM,Slot 仍然是均分 TaskManager 的资源,但能够通过为不同的 Operator 设置不同的 SlotGroup,进而设置不同的资源量。

这种计划让用户能够很不便的根据 TaskManager 应用监控,定位到配置不合理的 SlotGroup 和 Operator, 进而调整 TM 资源配置,优化作业的整体资源利用率。

上图中的性能依赖于咱们外部开发的“SlotGroup 粒度的资源调度”。

当然除了以上对 Batch 的优化之外,咱们还进行很多其余的优化。比方复用 stream 模式下 compact 小文件的逻辑;调整容错机制, 反对 Batch SQL 的小文件 compact

还有就是 parquet 的 nested projection/filter pushdown;优化超过 64 位 GroupId 生成策略;优化 FileSourceCoodinator 创立逻辑等等。

这些优化都无效解决了生产过程中 Shopee 各个业务线遇的问题。

三、与离线生态的齐全集成

在流批一体落地的过程中,用户最关怀的就是技术架构的改变老本和潜在危险。作为 Flink 平台,面临的一个很重要的挑战就是如何兼容好用户曾经广泛应用的离线批处理能力。所以第三局部次要介绍与离线生态的集成,次要波及开发和执行两个层面的问题。

3.1 开发层面

开发层面次要是复用的问题,复用的目标是为了升高用户的应用老本。因为很多用户曾经在其余引擎上积攒的大量的业务 UDF,所以咱们提供的对立 UDF 来解决 UDF 复用的问题。

对立 UDF 的指标是为了用户能在 Flink 平台上无缝拜访各种 UDF。目前咱们曾经反对了很多类型的 UDF。

  • Flink 自身的 UDF,咱们将很多 Flink build-in function 下放反对低版本。
  • 减少了一些 Shopee 外部罕用的 UDF,用户也能够上传共享自定义的 UDF。
  • 针对其余引擎的 UDF,咱们依赖 load module 反对了的 Hive UDF。对于 Spark build in 的 UDF,为了升高用户应用老本,咱们也把大量罕用的 Spark UDF 迁徙到了 Flink。
  • 值得一提的是,咱们团队目前曾经反对了 SQL 语句中退出 Java 代码并解析成 UDF。上图中有个例子,之后咱们还会反对 lambda 表达式等等,这将大大不便用户对 UDF 的应用。

除了复用 UDF 以外,咱们还通过对立元数据来复用曾经存在的离线数据模型。与其余各自已有的元数据管理一起,加上依赖 HDP scheme registry 构建的实时元数据,一起构建造成 Unity Catalog。

用户能够只通过 Unity Catalog 来拜访底层不同的数据,在平台提供的 SQL IDE 中,能够非常不便的拜访已有的 Catalog 和数据表。目前曾经反对了 Kafka,Hive,Hudi, Redis, Hbase 这几个不同的数据类型。

3.2 执行层面

在执行层面,随着 Flink 能力的加强,用户心愿 Flink SQL 批工作嵌入到以后的数据加工过程中,作为两头的一个环节。所以咱们将 Flink Batch 接入了 Shopee 外部的对立调度平台 Data Scheduler。并且通过对立的数据 marker 服务来进行数据依赖。最终将 FlinkBatch 与已有的其余数据处理引擎买通,更好的服务用户。

另外,在离线畛域,清晰的血统是对数据进行追溯和影响剖析的根底。当数据有了清晰的血统和归属,零碎中的数据就有了清晰的构造。

咱们团队目前除了通过上一张 PPT 提到的数据 marker 来提供数据依赖关系之外,

还从 gragh 中抽取 Source,Sink,Lookup 的元数据信息,报告给 Datamap,以生成更残缺的数据血统。

当然除了离线数据的元数据之外,咱们也正在设计将实时数据的元数据整合到现有的数据血统中,彻底将所有数据的归属买通。

四、平台在流批一体上的建设和演进

最初我想介绍一下咱们 Flink 平台在流批一体上的建设和演进。其实在下面介绍中,曾经展现了不少平台的性能。所以这一部分,我只会重点介绍一下平台对运维工具 History Server 的优化。

其实 Flink 流工作对 History Server 的需要并不大,因为流工作实践上始终在运行,咱们能够用 web UI。然而对于批工作,History Server 却是一个十分无效的运维追溯工具。

4.1 HistoryServer 接入 Yarn 日志

首先我要宣传一下 1.16 的新个性:跳转内部 log。

尽管咱们平台曾经将用户的日志接入的 kibana,然而因为日志是混合的,所以查问的时候用户要先定位到 subTask,而后须要输出各种筛选条件查问,查问流程比拟长,速度也比较慢。所以咱们始终想优化这个流程,在最近公布的 1.16 中,反对了接入内部 log 的性能,咱们针对日志较少的 Batch 工作,间接应用该个性跳转到 yarn 的 history log,非常不便查看问题 Task 的全量日志。

4.2 HistoryServer 小文件问题

另外,History Server 还有一个小文件的问题。从上图左侧能够看到,History Server 将历史工作存储为大量 Json 小文件用于服务 Web UI。当只反对流工作的时候这个问题并不显著,然而随着咱们平台反对批工作后,历史工作的数量剧增。

数量的上涨带来的几个问题:

  • 大拓扑,大并发的工作的解压对 History Server 服务产生压力。
  • 历史工作产生的大量文件对部署节点的文件系统产生大量存储开销。大量小文件导致单个 History Server 只能保留很短时间的历史工作。不然就会将单机的 inode 耗光。
  • History Server 目前重启后须要从新拉取历史 Job 信息并解压。

这些都给生产带来了问题。

4.3 HistoryServer 优化计划

所以咱们对 History Server 整体架构做了优化,整体的思路是,只对须要的历史 Job 进行解压。

第一,是拆分拉取和解压两个性能,将原有的 Fetcher Executor 拆分成了 Fetcher Executor 和 Unzip Executor。Unzip Executor 专门解决 archivedJobFile 解压。

第二,减少 archivedJobs 目录存储压缩后的历史工作文件,从远端拉取的历史工作不立即进行解压。而是当用户拜访时减少一个解压工作进行解压。

这样就缩小了 History Server 的工作量,升高了 History Server 的负载,也升高了部署节点的存储开销。这个计划在咱们线上应用后,将存储开销升高了 90% 以上,成果非常显著。

4.4 Flink 平台的演进

最初,简略介绍一下 Shopee Flink 平台反对批工作的倒退过程。

咱们外部反对 Flink 的批是从去年三季度开始的,到当初为止一年多。从革新平台反对 Batch,到并入离线生态,买通依赖和血统,再到搭建 Remote Shuffle。无效的撑持起了 Shopee 各个业务线对 Flink 流批一体的需要。

整个落地过程中,最次要教训的是要站在用户的视角对待问题,正当地评估用户的改变老本以及收益,帮忙用户找出业务迁徙的潜在危险,升高用户应用的门槛。

将来布局次要还是在业务拓展方面。咱们会加大 Flink 批工作的推广,摸索更多流批一体的业务场景。同时跟社区一起,在适合的场景下,减速用户向 SQL 和流批一体的转型。

点击查看原文视频 & 演讲 PPT


更多内容

流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://click.aliyun.com/m/1000372333/

退出移动版