关于flink:Native-Flink-on-Kubernetes-在小红书的实践

9次阅读

共计 7370 个字符,预计需要花费 19 分钟才能阅读完成。

摘要:本文整顿自小红书数据流团队资深研发工程师何军在 Flink Forward Asia 2021 平台建设专场的演讲,介绍了小红书基于 K8s 治理 Flink 工作的建设过程,以及往 Native Flink on K8s 计划迁徙过程的一些实践经验。次要内容包含:

  1. 多云部署架构
  2. 业务场景
  3. Helm 集群管理模式
  4. Native Flink on Kubernetes
  5. 流批一体作业管控平台
  6. 将来瞻望

点击查看直播回放 & 演讲 PDF

一、多云部署架构

上图是以后 Flink 集群多云部署模式图。业务数据扩散在各个云厂商之上,为了适配业务数据处理,Flink 集群天然也进行了多云部署。这些云存储产品一方面用于外部的离线数据存储,另外一方面会用于 Flink 做 checkpoint 存储应用。

在这些云基础设施之上,咱们搭建了 Flink 引擎反对 SQL 及 JAR 工作的运行,得益于之前做的一项推动工作 SQL 化的工作,以后外部 SQL 工作和 JAR 工作比例曾经达到了 9:1。

在此之上是流批一体作业管控平台,它次要有以下几个性能:作业开发运维、工作监控报警、工作版本治理、数据血统剖析、元数据管理、资源管理等。

平台数据输出次要有以下三个局部,第一局部是业务数据,存在于业务外部的 DB 零碎里比方 MySQL 或者 MongoDB,还有一部分是前后端打点数据,前端打点次要是用户在小红书 APP 端的行为日志,后端打点次要是 APP 外部应用程序性能指标相干的数据。这些数据通过 Flink 集群解决之后,会输入到三个次要业务场景中,首先是音讯总线,比方 Kafka 集群以及 RocketMQ 集群,其次会输入到 olap 引擎中,比方 StarRocks 或 Clickhouse,最初会输入到在线零碎,比方 Redkv 或者 ES 供一些在线查问应用。

二、业务场景

Flink 在小红书外部的利用场景有很多,比方实时反欺诈监控、实时数仓、实时算法举荐、实时数据传输。本章会着重介绍一下其中两个场景。

第一个是实时举荐算法训练。上图是举荐算法训练的执行流程。

Flink 集群先接管打点服务采集过去的原始数据,对这一部分数据进行归因并将它写入到 Kafka 集群,之后会再有一个 Flink 工作对这部分数据再做一次汇总,而后失去一个 Summary 的标签数据,针对这个标签数据,前面还有三条实时处理门路:

  • 第一,Summary 标签数据会和举荐引擎举荐进去笔记的特色数据进行关联,这个关联也是在 Flink 工作中进行的,外部称其为 FeatureJoiner 工作。接着会产出一个算法训练的样本,这个样本通过算法训练之后产出一个举荐模型,而这个模型最终会反馈到实时举荐引擎中。
  • 第二,Summary 标签数据会通过 Flink 实时写到 OLAP 引擎中,比方写到 Hologres 或 Clickhouse 中。
  • 最初,Summary 标签数据会通过 Flink 写入到离线 Hive 表中,提供给后续离线报表应用。

第二个场景是实时数仓。业务数据包含前后端打点的数据,依照业务分流规定进行解决之后会写入到 Kafka 或者 RocketMQ 中,后续 Flink 会对这部分数据做实时 ETL 业务解决,最终进入实时数据中心。目前实时数据中心次要是基于 StarRocks 实现的,StarRocks 是一个性能非常弱小的 OLAP 引擎,它承载了公司很多实时相干业务。在数据中心之上,咱们还撑持了很多重要实时指标,比方实时 DAU、实时 GMV、实时直播归因、实时广告计费等。

三、Helm 集群管理模式

在正式迁入到 Native Flink on K8s 之前很长一段时间内,都是基于 Helm 来进行集群治理的。Helm 是一个 K8s 上的包管理器,它能够定义、装置和降级 K8s 利用和服务,同时具备以下几个特点:

  • 第一,能够治理比较复杂的 K8s 利用,创立 Flink 集群时会创立很多 K8s 相干的资源,例如 service 或者 config map 以及 Deployment 等,Helm 能够将这些资源对立打包成一个 Helm chart,而后进行对立治理,从而不须要感知每一种资源对应的底层形容文件。
  • 第二,比拟不便降级和回滚,只须要执行一条简略命令就能够进行降级或者回滚。同时因为它的代码是和 Flink Client 的代码做了隔离,因而在降级过程中不须要去批改 Flink Client 的代码,实现了代码解耦。
  • 第三,十分易于共享,将 Helm chart 部署在公司公有服务器上之后,曾经能够同时反对多个云产品的 Flink 集群治理。

上图是基于 Helm 治理的 Flink 工作生命周期,次要分为启动工作和进行工作两个阶段。这里有三个角色,第一个是 Client,它能够是一个 API 申请,也能够是用户在界面上的一次点击行为。启动工作时,百川平台接管到 API 申请后,会通过 Helm Client 命令去执行 install 指令,创立对应的集群资源,同时外部集成的 Flink Client 也会去查看以后集群的 JobManager 是否启动,如果曾经启动就进行 job 提交。job 提交到集群运行起来之后,Flink Client 也会一直地查看以后 job 的运行状态,这也是 Helm 管理模式下作业状态的保护机制。

第二个阶段是工作进行阶段,Client 会向百川平台发动一个 stop 命令,接管到 stop 命令之后百川平台会通过 Flink Client 向 JobManager 发动 cancel 指令,同时查看这个 cancel 指令有没有执行胜利,发现 job 被 cancel 之后,会通过 Helm Client 去执行 delete 指令,实现集群资源的销毁。

上图展现了通过 Helm 创立了哪些 K8s 资源。

  • 首先是最根底的 JobManager 和 TaskManager Deployment;
  • 第二局部是 ConfigMap,次要是针对 log4j 的配置和各大云厂商提供的云存储产品相干的配置;
  • 第三局部是 Ingress,目前次要用于 Flink web UI 应用以及拜访 JobManager 当前任务状态;
  • 第四局部是 Nodeport Service,每启动一个 JobManager,就会在 JM 上启动一个 Nodeport Service,并与 Ingress 做绑定;
  • 第五局部是指磁盘资源,次要有以下两个利用场景:应用 RocksDB Backend 的时候须要去挂载高效云盘、批处理工作须要挂载磁盘做两头数据交换;
  • 最初一部分是 ServiceMesh,TaskManager 外部会通过 sidecar 模式去拜访第三方服务,比如说 Redkv service,这些 service 的配置也是在这外面创立的。

上图能够看到 Helm Client 外面是集成了各大云厂商提供了 K8s 相干的配置,当它接管到创立工作的参数时,会依据这些参数去渲染出不同的 Helm 模板,并提交到不同的云上执行,创立出对应的集群资源。

目前的集群管理模式下,在理论生产过程中还是遇到了不少问题:

  • 第一是 K8s 资源瓶颈问题。因为每启动一个 JobManager 就会创立一个 NodePort Service,而这个 Service 会在整个集群范畴内占用一个端口和一个 ClusterIP。当作业规模达到肯定水平的时候,这些端口资源以及 IP 资源就会遇到性能瓶颈了。
  • 第二个是 ServiceMesh 配置老本过高。上文提到 TaskManager 外部会拜访第三方服务,比如说 redkv service,那么每减少一个 redkv service,就须要去批改对应的配置并实现发版,过程的老本是比拟高的。
  • 第三个是存在肯定的资源泄露问题。所有的资源创立以及销毁都是通过执行 Helm 命令来实现的,在某些异常情况下,job 失败会导致 Helm delete 命令没有被执行,这个时候就有可能会存在资源泄露的问题。
  • 第四个是镜像版本比拟难以收敛。在日常的生产过程中,某些线上工作呈现了问题,会长期出一个 hotfix 版本镜像并上线运行,长此以往线上就会存在很多版本镜像在运行,这对于前面的运维工作以及问题排查产生了十分大的挑战。
  • 最初一个问题是 UDF 治理复杂度比拟高,这是任何分布式计算平台都会遇到的一个问题。

针对上述这些问题,咱们在 Native Flink on K8s 模式下一一进行了优化解决。

四、Native Flink on Kubernetes

首先,为什么会抉择这种部署模式?因为它具备以下三个特色:

  • 更短的 Failover 工夫;
  • 能够实现资源托管,不须要手动创立 TaskManager 的 pod,也能够主动实现销毁;
  • 具备更加便捷的 HA。在 Flink 1.12 之前,实现 JobManager HA 还是依赖于第三方的 zookeeper。但在 Native Flink on K8s 模式下,能够依赖于原生 K8s 的 leader 选举机制来实现 JobManager 的 HA。

上图是 Native Flink on K8s 的体系架构图。Flink Client 外面集成了一个 K8s Client,它能够间接和 K8s API server 进行通信,实现 JobManager Deployment 以及 ConfigMap 的创立。JobManager development 创立实现之后,它外面的 resource manager 模块能够间接和 K8s API server 进行通信,实现 TaskManager pod 的创立和销毁工作,这也是它与传统 session Cluster 模式比拟大的不同之处。

外部将 UDF 分为两类:

  • 第一类是平台内置的,将平时的生产工作中常常应用到的 UDF 进行形象演绎总结,并内置到镜像外面。镜像里有对于 UDF 的配置文件,其中有 UDF 的名称以及类型,同时指定了它对应的实现类。
  • 另外一类是 User-defined UDF,在 Helm 管理模式下,针对用户自定义的 UDF 治理是比拟粗放的,将用户 project 下所有 UDF 相干的 JAR 包对立加载到 classloader 下,这会导致类抵触问题。而在 Native Flink 模式下,实现了一个 create function using JAR 的语法,能够按需加载用户所须要的 UDF 对应的 JAR 包,能够极大地缓解类抵触的问题。

在原有的模式上,镜像治理是通过将所有代码对立打包到一个大的 image 里,但这样会存在一个问题,对任何模块的批改都须要对整个代码库进行一次编译打包,而这个过程是十分耗时的。

在 Native Flink 版本下,针对镜像版本治理做了一些优化,次要是将 Flink 的 image 拆分为了三个局部,分为 Flink engine、connector 以及第三方插件。这三个局部都有各自版本号,并且能够自在进行拼装组合。这项优化升高了引擎打包的频率,也意味着能够晋升发版效率。

拆分之后,Flink 如何将这些镜像组合成一个能够运行的镜像呢?上面以加载一个 Kafka SDK 插件为例来进行论述。job 运行时会从一个动静配置仓库中获取以后这个 job 应该应用的 Kafka SDK 版本,并将其传递给百川的后端,这个 SDK 版本对应了 docker 仓库外面的一个镜像,镜像只蕴含一个 SDK 对应的 JAR 包,百川的后端在渲染 pod 模板的时候,会在 InitContainer 阶段将 image 加载进来,同时将它 Kafka 的 JAR 包挪动到 Flink container 某个指定的目录上来,以此实现加载。

在新的模式下,对 job 状态保护机制做了一次重构,引入了一个 headless 类型的 service 以及一个 status DB。在 JobManager 模块,通过 JobManager status listener 一直监听 job 状态变动,并将这个变动上传到 job ststusDB 中,百川平台能够通过 Query DB 来获取工作的状态。另外在某些场景下,可能因为 job 状态上传失败导致百川无奈获取到工作的状态,百川还是能够走原来的门路,通过 Ingress 去拜访 JobManager 来获取工作的状态。此时的 Ingress 和之前不同之处在于它绑定的是一个 headless service,不须要占用集群的 Cluster IP,这就解决了之前模式下 K8s ClusterIP 以及 nodePort 有余的问题。

实现上述优化工作当前,面临的最大的问题就是如何将老版本的工作平滑地迁徙到新版本 Flink 1.13 上,这其实是一项十分具备挑战性的工作。次要做了以下 4 个方面的工作:

  • 第一,兼容转化工具。这个工具会对 SQL 进行转化,保障 SQL 在 1.13 运行的语法校验不会出错。1.10 到 1.13 经验过几个大版本的变更,SQL 的定义在泛滥方面曾经不兼容,比方在 1.10 和 1.11 的时候,Kafka connector 的取值是 0.11,到 1.13 之后,对应取值曾经变成 universal,如果不做任何转化,原始 SQL 必定在 1.13 上没有方法运行。
  • 第二,兼容检测工具。这个工具的目标是为了查看 SQL 运行在 1.13 的时候能不能从一个低版本的 savepoint 去进行复原。次要从以下几个方面去做了查看:operator ID 降级之后,名称有没有发生变化;新旧两个版本对应的 max parallelism 有没有发生变化,因为 max parallelism 发生变化的时候,在某局部场景下是没有方法从一个老的 savepoint 来复原的。
  • 第三,预编译。在 1.13 上对转换之后的 SQL 进行预编译,看编译的后果是否可能失常通过。在兼容检测工具的过程中,也发现了很多从低版本到高版本不兼容的中央,引入了新的数据类型机制,1.11 没有应用 ExternalSerializer,而 1.12 及当前应用 ExternalSerializer 进行包装;BaseRowSerializer 曾经在 Flink 1.11 时候改名成了 RowDataSerializer;数据类型外面有一个 seriaVersionUID,之前它是一个随机的 long 类型的数字,而在 1.13 对立固定成了 1。上述种种不兼容会导致 1.13 没有方法间接从一个低版本的 savepoint 来复原的。因而针对这些问题,在引擎侧做了一些革新。
  • 第四,迁徙工具。这个工具的指标次要有以下三点:

    • 首先,对用户作业的影响工夫尽可能降到最低,为了达成这个指标,咱们对 Native Flink on K8s 的 application mode 做了比拟大的革新。原生的 application mode 是一边调度一边申请资源,为了在降级过程中升高对用户作业的影响,实现了 application mode 下能够提前申请好资源并实现 SQL 的编译 (即 JobManager 的预启动),这个过程实现之后,将旧的 job 停掉而后启动新的 job,整个过程对用户作业的影响可能管制在 30 秒以内 (中等规模工作)。
    • 其次,在迁徙的过程中要保障状态不失落,因为所有迁徙都是基于 savepoint 来启动的,所以这块的数据是不会有任何失落的。
    • 最初,如果在降级过程中产生了异样,能够反对异常情况下主动实现回滚。

在理论 Application mode 利用过程中,也发现了原生 Flink 的一些问题,并做了对应的解决计划。

例如 JobManager 在 failover 的时候会从新拉起一批新的 TM,会导致 TaskManager 的资源翻倍。如果资源池的资源不足以满足 double 的需要,就有可能导致 failover 失败。此外,即便这一次 failover 胜利了,然而新启动的 job 会基于首次启动时指定的 recover path 来进行复原,这个时候的位点可能曾经是一个十天以前的位点了,这会导致数据反复生产的问题。针对这个问题,在检测到 JobManager 产生 failover 的时候就会在引擎侧间接将 job fail 掉并告警,而后通过人工手动染指来解决。

五、流批一体作业管控平台

流批一体作业管控平台次要提供了以下几个模块的性能:作业开发及运维、版本治理、监控报警、资源管理、数据血统、元数据管理以及 SDK。其中资源管理次要分为资源隔离和资源举荐,数据血统次要用于展现 Flink 工作上下游之间的关系,元数据管理次要是针对用户 catalog 表。

上图上半局部是 SQL 开发界面,页面的主体局部 SQL 编辑器,右侧有工作的根本信息、版本信息、作业参数以及一些资源配置相干的界面元素。

下半局部是工作运维界面,下面提供了很多惯例操作,比方进行工作,或先打 savepoint 再进行工作等。

作业版本治理分为 Flink SQL 工作以及 Flink JAR 工作。在 SQL 工作界面上能够看到 SQL 经验过很多次发版,“更多”按钮提供了回滚操作。针对 Flink JAR 工作,目前有两种提交 JAR 工作的办法,能够间接将用户的 JAR 包上传到一个分布式存储门路,也能够通过指定代码仓库 tag 来指定 JAR 包的版本。

资源管理次要分为资源隔离和资源举荐。这里引入了资源池的概念,并基于以下几个维度做了切分:

  • 第一个因素是它运行所属的云环境;
  • 第二个因素是业务类型;
  • 第三个因素是资源池提供给流还是批工作应用。

另外,针对曾经运行一段时间的工作,会联合它历史运行期间的 CPU、内存、提早 lag 等指标信息,给出当前任务所须要的最佳 K8s 资源配置举荐后果。

Rugal 调度平台是公司外部一个对标 airflow 的产品,它能够通过百川提供的 SDK 定时创立工作提交到百川平台。上图左侧是一个 SQL 编辑模板,其中的很多参数信息都是通过变量的模式来展现。调用 SDK 的时候,能够将这些变量对应的理论值传入进来,并用这些值渲染出具体要执行的 SQL,从而生成具体的执行实例。

六、将来瞻望

最初是对将来工作的布局。

  • 第一,动静资源调整。目前,Flink job 一旦提交运行,就无奈在运行期间批改某个 operator 占用的资源。所以心愿将来可能在 job 不进行 restart 的状况下,调整某个算子所占用的资源。
  • 第二,跨云多活计划。目前公司外围 P0 作业根本都是双链路的,但都仅限于在单朵云上。心愿针对这些外围工作,实现跨云双活计划,其中一个云上工作呈现问题的时候,可能稳固切换到另外一朵云上。
  • 第三,批工作资源调度优化。因为批工作大多是在凌晨当前开始执行,同时会调度很多工作,有的工作可能因为抢占不到资源导致无奈及时运行,在任务调度执行策略上仍有能够优化的空间。

Flink CDC Meetup · Online

工夫:5 月 21 日 9:00-12:25

PC 端直播观看:https://developer.aliyun.com/…

挪动端 倡议关注 ApacheFlink 视频号预约观看

正文完
 0