关于腾讯云:大数据云原生系列-微信-Flink-on-Kubernetes-实战总结

37次阅读

共计 6566 个字符,预计需要花费 17 分钟才能阅读完成。

前言

架构转型,拥抱云原生服务生态

以后微信外部的大数据计算平台是基于自研的 Yard 资源调度零碎来建设,Yard 的设计初衷除了提供在线服务资源隔离外,另一方面是为了进步在线服务机器的整体资源利用率,其外围策略是在机器闲暇时能在下面跑一些大数据离线工作。然而对接业界各种大数据计算框架(例如 Hadoop MapReduce、Spark、Flink 等)都须要专门定制化开发,迭代保护十分不灵便,难以跟上开源社区倒退的步调。为此,咱们开始转向应用 Kubernetes,并基于腾讯云的 TKE 容器平台逐渐搭建咱们的大数据计算平台。

思考到咱们 Yard 平台上 Flink 作业还不是特地多,历史包袱绝对较少,所以咱们首先开始 Flink on Kubernetes 实战之路。

微信 Flink 实时计算平台整体详情

微信 Flink 作业数据流转图

下图是咱们大多数业务的 Flink 作业实时计算数据流转图,数据经采集上报到音讯队列 Pulsar,用户的 Flink 作业生产 Pulsar 计算(必要时也会拜访其余内部存储,如 Redis、FeatureKV 等),计算结果能够落地到多种存储系统,例如对于报表类业务,计算结果写入 mysql/pg;对于实时样本特色拼接作业,计算结果写入 hdfs,为上游模型训练一直提供样本;对于一些两头后果,则写入 Pulsar,以便对接上游 Flink 作业。

上面具体论述上图中 Flink 作业是如何提交部署的。

集群及 Flink 作业部署

Flink on TKE 半托管服务,极致的 Flink 云原生应用体验

Flink on TKE 半托管服务提供了 Flink 集群部署、日志、监控、存储等一站式的服务,用户能够将其余在线业务与 Flink 运行在同一个集群中,从而最大水平进步资源资源使用率,达到对立资源、对立技术栈、对立运维等能力。

咱们基于腾讯云的 TKE 容器平台构建 Flink Kubernetes 计算集群。依据已有的 Flink 作业经营行状况,咱们发现绝大多数 Flink 作业次要是消耗内存,而 CPU 利用率广泛较低,在机型抉择上咱们举荐抉择内存型机器。

对于 Flink 作业的提交部署,Flink on Kubernetes 有多种部署模式(具体介绍请参考 TKE 团队出品的文章:Flink on kubernetes 部署模式分析),Flink 开源社区先后推出了基于 Standalone 的 Kubernetes 申明式部署以及 Kubernetes Native 部署形式,基于 Standalone 的 Kubernetes 申明式部署步骤繁琐且不易治理,所以不思考,另外社区的 Flink on Kubernetes Native 部署形式是从 1.12 起正式推出,性能还不够欠缺,并且尚未被大规模生产验证,咱们在这之前其实曾经开始调研部署,通过一番比拟后,咱们应用的是 TKE 容器团队提供的 Flink on TKE 半托管服务(基于 Kubernetes Operator),其提交部署流程大抵如下图所示。

通过 Flink Operator,客户端就能够通过一个简略的申明式 API 提交部署 Flink 作业,各组件的生命周期对立由 Operator 管制,例如:

apiVersion: flinkoperator.Kubernetes.io/v1beta1
kind: FlinkCluster
metadata:
  name: flink-hello-world
spec:
  image:
    name: flink:1.11.3
  jobManager:
    resources:
      limits:
        memory: "1024Mi"
        cpu: "200m"
  taskManager:
    replicas: 2
    resources:
      limits:
        memory: "2024Mi"
        cpu: "200m"
  job:
    jarFile: /opt/flink/examples/streaming/helloword.jar
    className: org.apache.flink.streaming.examples.wordcount.WordCount
    args: ["--input", "/opt/flink/README.txt"]
    parallelism: 2
  flinkProperties:
    taskmanager.numberOfTaskSlots: "2"

Flink Operator 提交流程大抵如下图所示,首先会启动一个 Flink Standalone Session Cluster,而后拉起一个 Job Pod 运行用户代码,向 Standalone Session Cluster 提交 Job,提交实现后会一直去跟踪 Job 的运行状态。所以运行过程中会有三类 Pod,即 JobManager、TaskManager、Job Pod。

起源: https://github.com/lyft/flink…

应用 Flink Operator 部署 Flink 作业的益处显而易见,客户端不须要像 Flink on Kubernetes Native 部署形式那样须要 kubeconfig,能够间接通过 http 接口拜访 API Server。尽管 Flink on Kubernetes Native 部署能够做到按需主动申请 TM,然而实际上咱们的利用场景根本都是单 Job 的流计算,用户当时布局好资源也可承受,而且基于 Flink Operator,咱们能够做批调度,即 Gang Schedule,能够防止资源无限的状况下作业之间相互期待资源 hold 住的状况(例如大作业先提交,局部 TaskManager 长时间处于资源期待状态,小作业后提交,小作业申请不到资源也 hold 在那里傻等)。

主动下载用户上传资源

作业与 Flink 内核动静拆散,进步灵活性

通过上述的申明式 API 形式提交部署,咱们能够看到用户 jar 包须要当时打到 image 里,作为平台提供方,当然不可能让每个用户本人去打 docker image,有些用户甚至都不晓得怎么用 docker,所以咱们应该对用户屏蔽 docker image,用户只须要上传 jar 包等资源即可。Flink Operator 提供了 initContainer 选项,借助它咱们能够实现主动下载用户上传资源,然而为了简略,咱们间接批改 docker entrypoint 启动脚本,先下载用户上传的资源,再启动 Flink 相干过程,用户上传的资源通过环境变量申明。例如:

apiVersion: flinkoperator.Kubernetes.io/v1beta1
kind: FlinkCluster
metadata:
  name: flink-hello-world
spec:
  image:
    name: flink:1.11.3
  envVars:
    - name: FLINK_USER_JAR
      value: hdfs://xxx/path/to/helloword.jar
    - name: FLINK_USER_DEPENDENCIES
      value: hdfs://xxx/path/to/config.json,hdfs://xxx/path/to/vocab.txt
  ...

用户上传的依赖能够是任意文件,跟 Flink on Yarn 的形式不同,咱们不必通过 submit 来散发依赖,而是在容器 docker entrypoint 启动脚本中间接下载到工作目录,以便用户能够在代码里以相对路径的形式(例如 ./config.json)拜访到,如果依赖文件是 jar,则须要将其附加到 classpath 中,为了不批改 flink 的脚本,咱们将 jar 附加到环境变量 HADOOP_CLASSPATH 上,最初 Flink 相干过程启动的时候会被加到 Java 的 classpath 中。

对于用户主类所在的 jar(即环境变量 FLINK_USER_JAR),只须要在 Job Pod 的 Container 中下载,如果同样下载到当前目录,那么它也会被附加到 classpath 中,在提交的时候可能会呈现如下类加载链接谬误,这是因为 Java 启动的时候加载了一遍,在执行用户 main 函数的时候 Flink 又会去加载一遍,所以咱们将主 jar 包下载到一个专门固定目录,例如 /opt/workspace/main/,那么提交时通过 spec.job.jarFile

参数指定到 /opt/workspace/main/xxx.jar 即可。

java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "org/apache/pulsar/client/api/Authentication"
    at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_152]
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[?:1.8.0_152]
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_152]
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[?:1.8.0_152]
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[?:1.8.0_152]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[?:1.8.0_152]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[?:1.8.0_152]

总的来说,每类 pod 的启动流程如下图所示:

与微信后盾服务买通

云原生架构下的资源类型 Demonsets,简化架构转型复杂度

用户的 Flink 作业常常须要在运行过程中与微信的后盾服务进行交互,在传统的裸机上拜访微信的后盾服务须要机器部署 Agent 及路由配置,对于 Kubernetes 集群,在咱们基础架构核心的共事反对下,微信后盾根底 Agent 以 DeamonSet 形式打包到部署到每个节点上,咱们在起 Flink 相干 Container 的时候,带上 HostIPC 选项并挂载路由配置门路,就能够像应用裸机一样拜访微信的后盾服务。

此外,因为局部 Agent 的 unix sock 文件在母机 /tmp 下,咱们须要在容器里挂载目录 /tmp,然而 Flink 运行过程中 shuffle、web 以及一些临时文件(例如解压进去的 so 等)默认都是放到 /tmp 目录下,这就会导致作业即便失败也会残留一些垃圾到母机上,长此以往,/tmp 目录势必会被撑爆,所以咱们在启动 Java 过程时设置参数 -Djava.io.tmpdir=/opt/workspace/tmp,将 Java 的默认长期目录改到容器内的门路,这样作业失败,容器销毁不至于残留垃圾。

属性配置、日志及监控

日志与监控,晋升可观测性

从下面的申明式 yaml 配置能够看到,提交 Flink 作业时是通过 flinkProperties 选项来指定 Flink 属性参数,事实上 Flink Operator 会将 flinkProperties 指定的属性参数以 ConfigMap 模式部署,会笼罩 image 中的 flink/conf 目录,所以咱们不能将零碎默认属性配置放到 flink image 中,为此,咱们在客户端保护一份 Flink 零碎默认配置,在提交的时候会合并用户填的属性配置,填充到 flinkProperties 选项中,能够不便咱们灵便调整 Flink 零碎默认配置。

默认状况下,Flink on Kubernetes 部署的作业,其在 Docker Container 中运行的过程都是前台运行的,应用 log4j-console.properties 配置,日志会间接打到控制台,这样就会导致 Flink UI 无奈展现 log,只能去查看 Pod 日志,此外用户通过 System.out.println 打的日志也会混在 log4j 的日志中,不易辨别查看。所以咱们从新定义了 log4j-console.properties,将 log4j 日志打到 FLINK_LOG_DIR 目录下的文件中,并按大小滚动,为了能在 Flink UI 上也能看到用户 stdout 的输入,在过程启动命令 flink-console.sh 最初加上 2>\&1 | tee ${FLINK_LOG_PREFIX}.out,能够把控制台输入的日志旁路一份到日志目录的文件中。最初 Flink UI 展现的日志如下图所示:

对于历史失败作业,咱们在 Kubernetes 上也部署了一个 Flink History Server,能够灵便地扩缩容,从此再也不必放心中午作业挂了主动重启无奈追溯起因了。

对于资源及作业的监控,TKE 提供了收费的云原生 Prometheus 服务 TPS,能够一键部署并关联咱们的 TKE 集群,然而咱们在晚期曾经采纳支流的 Prometheus + Grafana 组合部署了监控平台,这里就没有应用 TPS。以后咱们有集群资源、利用组(Namespace)资源、作业资源利用状况的监控,大抵如下图所示。前面咱们会再将每个作业 Flink Metric 推到 Prometheus,便于监控作业级别的反压、gc、operator 流量等信息。

数据利用平台对接

基于上述根底的 Flink-on-Kubernetes 能力,就能够将 Flink 对接到咱们的各种数据利用平台上。如下图所示,咱们曾经反对用户应用多样化的形式应用 Flink,用户能够在机器学习平台拖拽节点或者注册定制化节点以 Jar 包或 PyFlink 的形式应用,另外也能够在 SQL 剖析平台上写 Flink SQL。

对于 Jar、PyFlink 的形式应用就不具体开展,对于 Flink SQL 的反对,咱们目前是联合咱们本身的元数据体系,利用 Flink 已有的 SQL 性能。以后实时数仓被业界宽泛提起,咱们晓得传统的离线数仓,如 Hive,无外乎是在 HDFS 上套了一层 Schema,那么实时数仓也相似,数据源通常是 Kafka、Pulsar 这类音讯队列零碎,在这之上套一层 Schema 将实时数据管理起来,就能够称之为实时数仓了。咱们基于 SQL 剖析平台的元数据管理体系,构建 Flink SQL 能力,用户能够在 SQL 剖析平台上注册 / 治理库表元数据,为了架构简略,咱们并没有去实现本人的 Flink Catalog(元数据操作间接在 SQL 剖析平台上实现,无需实现 create、drop 等 API),而是采纳如下图所示的流程来提交 SQL。

用户在 SQL 剖析平台上注册库表元数据(能够精密受权管控),而后编辑 SQL 提交,首先 SQL 剖析平台会做语法校验、权限及合法性校验,没问题后,将 SQL 波及到的元数据加密打包,连同申明式配置 Yaml 提交给对立调度平台,在对立调度平台上咱们开发了一个 FlinkSQL 类型的作业,实质上就是一个惯例的 Flink Jar 作业,即 FlinkSQLDriver,用于承受 SQL 及其从属的参数,FlinkSQLDriver 被提交后,解析传过来的配置,组装残缺的 SQL 语句(包含 DDL、DML),而后调用 tableEnvironment.executeSql 逐条执行,所以实质上是将库表长期注册到 default catalog 中。

小结

本文从整体上介绍了微信 Flink-on-Kubernetes 实战经验以及 Flink 数据利用平台的详情,一方面咱们提供最根底的 Flink 计算平台能力,借助 Kubernetes 无效管控集群,另一方面咱们在已有的数据通道及元数据平台上构建实时数仓,提供 Flink SQL 能力,进一步升高用户应用门槛,对于 Flink SQL 的反对目前还比拟高级和原始,前面咱们将联合业务应用状况摸索更多深层次的优化。

【腾讯云原生】云说新品、云研新术、云游新活、云赏资讯,扫码关注同名公众号,及时获取更多干货!!

正文完
 0