关于flink:Demo-示例如何原生的在-K8s-上运行-Flink

5次阅读

共计 9165 个字符,预计需要花费 23 分钟才能阅读完成。

Kubernetes 简介

什么是 Kubernetes?

Kubernetes 置信大家都比拟相熟,近两年大家都在探讨云原生的话题,探讨 Kubernetes。那么什么是 Kubernetes 呢?

  • K8s 是一个资源管理零碎。

如果大家对 Yarn、Mesos 相熟,假如给定一批裸的物理机,将资源管理零碎部署下来之后,能够在此之上基于它的 API 或者 SDK 开发一些分布式软件或者应用程序。例如能够在 Yarn 上开发传统的 MapReduce,在 K8s 上能够开发一些分布式的 Web Server,或者是大数据计算工作等等。

  • K8s 是一个容器编排零碎。

不同于传统的 Yarn,K8s 在所有的过程运行过程中,是全副基于容器化的,但这里的容器并不只是单纯的 Docker 容器,它也包含 Rocket 等其余相干的隔离措施。如果在生产环境中要求比拟高的话,可能会有一些平安容器,比方 Kata Containers 等等。K8s 在 Slave 上部署的应用程序,都是用容器化的形式去做散发和治理,同时用容器化的技术做隔离。

  • K8s 是一个自动化运维零碎。

它是一个申明式的 API,咱们只须要通知 K8s 集群须要创立一个 Deployment,设置的正本数量,须要达到一个什么样的状态,调度零碎也就是 K8s 就会帮忙咱们维持状态,直到达到设置的状态为止。如果两头产生了一些 failover 或者产生了一些失败,它会主动地将工作迁徙到其余的机器上,来满足以后的调度。

  • 云原生。

目前简直所有的云厂商都曾经提供了 K8s 服务反对,包含国内的阿里、国内上的 Amazon、Google 等等,包含传统的微软都曾经提供了对于 K8s 的 Managed 服务或者是 Unmanaged 服务。随着目前 Lambda 表达式或者 Function 计算的利用,Serverless 形式也变得更加风行。除了传统的部署小集群以外,通过云产生一个 manager,构建一个大的 Serverless 集群,而后用户按需进行计算资源付费,这也是一种新的模式。

Kubernetes 的架构

上图是 K8s 根本的架构,这是一个十分典型的 Master-Slave 的架构。

  1. 在 Master 上,是由 Controller,API Server,Scheduler 以及包含做存储的 Etcd 等形成。Etcd 能够算成 Master,也能够作为独立于 Master 之外的存储来看待。Master 的 Controller、API Server、Scheduler 都是独自的过程模式。这和 Yarn 有一些不同,Yarn 的整个 Master 是一个单过程的模式。K8s 的 Master 还能够在多个 Master 之间实现自发的选举,而后由 active 状态的 Master 对外提供服务。
  2. 在 Slave 上,它次要是包含 Kube proxy、Kubelet,以及 Docker 等相干的组件,每个 Node 上部署的相干组件都是相似的,通过它来治理下面运行的多个 Pod。
  3. 依据不同用户的习惯,能够通过 UI 或者 CLI 的形式向 K8s 提交工作。用户能够通过 K8s 提供的 Dashboard Web UI 的形式将工作进行提交,也能够通过 Kubectl 命令行的形式进行提交。

Kubernetes 的一些概念

  • ConfigMap

ConfigMap 是一个 K-V 数据结构。通常的用法是将 ConfigMap 挂载到 Pod,作为配置文件提供 Pod 里新的过程应用。在 Flink 中能够将 Log4j 文件或者是 flink-conf 文件写到 ConfigMap 外面,在 JobManager 或者 TaskManger 起来之前将它挂载到 Pod 里,而后 JobManager 去读取相应的 conf 文件,加载其配置,进而再正确地拉起 JobManager 一些相应的组件。

  • Service(简称 SVC)

一种对外裸露服务的形式。如果当初外部有一个服务,须要在 K8s 内部进行拜访,此时能够通过 Service,而后用 LoadBalancer 或者 NodePort 的形式将其裸露进来。

如果有一个 Service,不心愿或不须要将其对外裸露,能够把它设置为 Cluster IP 或者是 None 这种 Headless 的模式。这个时候,它能够用于服务之间相互连接,例如传统的前端去联后端服务,或者是在 Flink 中非 HA 的状况下,TaskManager 去连 JobManager 等等。

  • Pod

Pod 是 K8s 里最小的调度单元。K8s 都是以 Pod 进行调度的。每个 Pod 能够蕴含一个或者多个 Container。每个 Container 都会有本人的资源,相互之间资源也是曾经隔离的,然而所有 Container 共享同一个网络,这就意味着所有的 Container 能够通过 localhost 间接进行通信。

同时,Container 之间能够通过 Volume 共享一些文件。比方 JobManager 或 TaskManager 的 Pod 里产生了一些日志,在同一个 Pod 里再去起另外一个过程收集不合乎 K8s 的原生语义。能够通过 SideCar 的形式去起另外一个 Container,把 JobManager 产生的日志收走。这就是一个 Pod 多个 Container 的具体用处。

  • Deployment

因为 Pod 是能够随时被终止的,所以当 Pod 终止之后,就无奈再拉起来去做 failover 等其余相干操作。Deployment 是在 Pod 之上提供了更高一层的形象。Deployment 能够设置 Pod 的状态,比方须要起 5 个 TaskManager,Deployment 会维持以后状态。当有 TaskManager 挂了当前,它会起新的 TaskManager,来补上。这样能够防止本人汇报 Pod 的状态,能够去做一些更简单的治理 failover 等等。这也是最根底的概念——运维自动化。

目前都有什么样的工作在 K8s 上运行?

除了传统的 Web 以及挪动端一些无状态的如 MySQL、Kafka 等存储相干的工作外,有状态的服务也一直地在 K8s 上做适配和运行。除此之外,深度学习框架 Tensorflow 原生即可在 K8s 上运行,包含 Spark、Flink 等等,一些大数据相干的框架也在一直地去兼容,一直地去适配,以便让更多的大数据服务能够更好地在 K8s 上运行。

从这一点咱们能够看出,K8s 相比于 Yarn 或传统的 Hadoop 具备更好的包容性,它能够把存储、深度学习、大数据包含 OLAP 剖析等多种计算框架、引擎都运行在 K8s 之上。这样就会带来一个很大的益处,整个公司只须要去治理一个调度架构,就能够把所有的存储,实时计算,批量计算,包含深度学习,OLAP 剖析等等,都在一个集群外面运行。除了治理更不便以外,也能够达到更好的集群利用率。

Flink On Kubernetes 的部署演进

Flink 在 K8s 上最简略的形式是以 Standalone 形式进行部署。这种形式部署的益处在于不须要对 Flink 做任何改变,同时 Flink 对 K8s 集群是无感知的,通过内部伎俩即可让 Flink 运行起来。

Standalone Session On K8s

Standalone 形式在 k8s 运行步骤:

如图所示:

  • 步骤 1,应用 Kubectl 或者 K8s 的 Dashboard 提交申请到 K8s Master。
  • 步骤 2,K8s Master 将创立 Flink Master Deployment、TaskManager Deployment、ConfigMap、SVC 的申请分发给 Slave 去创立这四个角色,创立实现后,这时 Flink Master、TaskManager 启动了。
  • 步骤 3,TaskManager 注册到 JobManager。在非 HA 的状况下,是通过外部 Service 注册到 JobManager。
  • 至此,Flink 的 Sesion Cluster 曾经创立起来。此时就能够提交工作了。
  • 步骤 4,在 Flink Cluster 上提交 Flink run 的命令,通过指定 Flink Master 的地址,将相应工作提交上来,用户的 Jar 和 JobGrapth 会在 Flink Client 生成,通过 SVC 传给 Dispatcher。
  • 步骤 5,Dispatcher 会发现有一个新的 Job 提交上来,这时会起一个新的 JobMaster,去运行这个 Job。
  • 步骤 6,JobMaster 会向 ResourceManager 申请资源,因为 Standalone 形式并不具备被动申请资源的能力,所以这个时候会间接返回,而且咱们曾经提前把 TaskManager 起好,并且曾经注册回来了。
  • 步骤 7 -8,这时 JobMaster 会把 Task 部署到相应的 TaskManager 上,整个工作运行的过程就实现了。

Standalone perjob on K8s

当初咱们看一下 Perjob 的部署,因为 Session Cluster 和 Perjob 别离都有不同的实用场景,一个 Session 外面能够跑多个工作,然而每个工作之间没有方法达到更好的隔离性。而 Perjob 的形式,每个 job 都会有一个本人独立的 Flink Cluster 去运行,它们之间互相独立。

■ Perjob 的特点:

  1. 用户的 Jar 和依赖都是在镜像里提前编译好,或者通过 Init Container 形式,在真正 Container 启动之前进行初始化。
  2. 每个 Job 都会启动一个新的 Cluster。
  3. 一步提交,不须要像 Session Cluster 一样先启动集群再提交工作。
  4. 用户的 main 办法是在 Cluster 里运行。在非凡网络环境状况下,main 办法须要在 Cluster 里运行的话,Session 形式是无奈做到的,而 Perjob 形式是能够执行的。

■ 执行步骤:

由 Standalone JobCluster EntryPoint 执行,从 classpath 找到用户 Jar,执行它的 main 办法失去 JobGrapth。再提交到 Dispathcher,这时候走 Recover Job 的逻辑,提交到 JobMaster。JobMaster 向 ResourceManager 申请资源,申请 slot,执行 Job。

Helm Chart 形式

Helm 相似于 Linux 上的 Yum。

K8s 里的 Helm 是一个包管理工具,能够很不便的装置一个包。部署一个 Flink 集群等操作,只须要 helm install 就能够将之前很多步的安装操作,一步去实现。实质上没有什么差异,只是它用 Helm 从新组织,包含一些模板等等,用起来会更加不便。

Flink Kubernetes Operator

  • 工作生命周期治理

应用 Operator 的形式来治理 Flink,次要是来治理多个 Cluster 的状况,可起到工作生命周期治理的作用。它和 Standalone、Native 的形式,实质上不是在一个档次上,它相似于一个更下层的做工作治理的工具。

  • 基于 K8s Operator,不便创立 Flink Cluster。

之前去创立一个 Perjob Cluster,可能须要部署屡次,如果工作要做降级,甚至可能须要把之前的删掉,而后批改配置,再重新部署。

引入 K8s Operator 就只须要做一些简略操作。比方 Operator 中有本人的一套 yaml 形容形式,批改其中某一个字段,如批改 image 的 version 字段,此时后盾会主动触发一些重启,包含对目前正在执行的工作做 savepoint,而后把 Cluster 销毁掉,再进行新的定向就能够将集群拉起,等一系列自动化的操作。对 Flink 的配置做批改等也都能够在后盾自动化实现。

目前 Operater 有 Lyft 和 Google 两个开源的 operator,他们在性能上相似,而且都是曾经通过生产测验,与目前的 Standalone Cluster 联合的比拟好的,曾经达到生产可用的规范。

参考:
1.lyft/flinkk8soperator
https://github.com/lyft/flinkk8soperator
2.GoogleCloudPlatform/flink-on-k8s-operator
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator

总结

当然,Flink on K8s 以后也存在一些有余:

  • 无论 Operator、Helm Chart 或者是间接应用 Kubectl Yaml 的形式,Flink 都感知不到 K8s 的存在。
  • 目前次要应用动态的资源分配。须要提前确认好须要多少个 TaskManager,如果 Job 的并发须要做一些调整,TaskManager 的资源状况必须相应的跟上,否则工作无奈失常执行。
  • 用户须要对一些 Container、Operator 或者 K8s 有一些最根本的意识,这样能力保障顺利将 Flink 运行到 K8s 之上。
  • 对于批处理工作,或者想在一个 Session 里提交多个工作不太敌对。无奈实时申请资源和开释资源。因为 TaskManager 的资源是固定的,批处理工作可能会分多个阶段去运行,须要去实时地申请资源、开释资源,以后也无奈实现。如果须要在一个 Session 里跑多个 Job 并且陆续运行完结以后也无奈实现。这时如果维持一个比拟大的 Session Cluster,可能会资源节约。但如果维持的 Session Cluster 比拟小,可能会导致 Job 跑得慢或者是跑不起来。

基于这几点,咱们在社区推动了一个 Native 的集成计划,这个 Native 相似于 Yarn 这种原生的集成,就是让 Flink 原生的感知到上层 Cluster 的存在。

Navtive Integration 的技术细节

为什么叫 Native 形式?包含如下几个含意。

  • 资源申请形式:Flink 的 Client 内置了一个 K8s Client,能够借助 K8s Client 去创立 JobManager,当 Job 提交之后,如果对资源有需要,JobManager 会向 Flink 本人的 ResourceManager 去申请资源。这个时候 Flink 的 ResourceManager 会间接跟 K8s 的 API Server 通信,将这些申请资源间接下发给 K8s Cluster,通知它须要多少个 TaskManger,每个 TaskManager 多大。当工作运行完之后,它也会通知 K8s Cluster 开释没有应用的资源。相当于 Flink 用很原生的形式理解到 K8s Cluster 的存在,并通晓何时申请资源,何时开释资源。
  • Native 是绝对于 Flink 而言的,借助 Flink 的命令就能够达到自治的一个状态,不须要 引入内部工具 就能够通过 Flink 实现工作在 K8s 上的运行。

具体如何工作?次要分 Session 和 Perjob 两个方面来给大家介绍。

Native Kubernetes Session 形式

首先 Session 的形式。

  • 第一个阶段:启动 Session Cluster。Flink Client 内置了 K8s Client,通知 K8s Master 创立 Flink Master Deployment,ConfigMap,SVC。创立实现后,Master 就拉起来了。这时,Session 就部署实现了,并没有保护任何 TaskManager。
  • 第二个阶段:当用户提交 Job 时,能够通过 Flink Client 或者 Dashboard 的形式,而后通过 Service 到 Dispatcher,Dispatcher 会产生一个 JobMaster。JobMaster 会向 K8sResourceManager 申请资源。ResourceManager 会发现当初没有任何可用的资源,它就会持续向 K8s 的 Master 去申请资源,申请资源之后将其发送回去,起新的 Taskmanager。Taskmanager 起来之后,再注册回来,此时的 ResourceManager 再向它去申请 slot 提供给 JobMaster,最初由 JobMaster 将相应的 Task 部署到 TaskManager 上。这样整个从 Session 的拉起到用户提交都实现了。
  • 需注意的是,图中 SVC 是一个 External Service。必须要保障 Client 通过 Service 能够拜访到 Master。在很多 K8s 集群里,K8s 和 Flink Client 是不在同一个网络环境的,这时候能够通过 LoadBalancer 的形式或者 NodePort 的形式,使 Flink Client 能够拜访到 Jobmanager Dispatcher,否则 Jar 包是无奈提交的。

Native Kubernetes Perjob 形式

咱们再来看一下 Perjob 的形式,如图所示,Perjob 形式其实和之前是有一些相似,差异在于不须要先去起一个 Session Cluster,再提交工作,而是一步的。

  • 首先创立出了 Service、Master 和 ConfigMap 这几个资源当前,Flink Master Deployment 外面曾经带了一个用户 Jar,这个时候 entrypoint 就会从用户 Jar 外面去提取出或者运行用户的 main,而后产生 JobGraph。之后再提交到 Dispatcher,由 Dispatcher 去产生 Master,而后再向 ResourceManager 申请资源,前面的逻辑的就和 Session 的形式是一样的。
  • 它和 Session 最大的差别就在于它是一步提交的。因为没有了两步提交的需要,如果不须要在工作起来当前拜访内部 UI,就能够不必内部的 Service。可间接通过一步提交使工作运行。通过本地的 port-forward 或者是用 K8s ApiServer 的一些 proxy 能够拜访 Flink 的 Web UI。此时,External Service 就不须要了,意味着不须要再占用一个 LoadBalancer 或者占用 NodePort。这就是 perjob 形式。

Session 与 Perjob 形式的不同

咱们来看一下 Session 和 Perjob 形式有哪些不同?

Demo 演示

Session

  1. 启动 Session

留神:image 须要替换为本人的镜像或者应用 docker hub 上的 Flink 官网镜像库。

./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=k8s-session-1 \
-Dkubernetes.container.image=<ImageName> \
-Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2
  1. 提交 job 到 Session
./bin/flink run -d -p 10 -e kubernetes-session -Dkubernetes.cluster-id=k8s-session-1 examples/streaming/WindowJoin.jar
  1. 进行 Session
echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=k8s-session-1 -Dexecution.attached=true

Application

Application 模式是 FLIP-85 引入的一种新的模式,久远来看是用于替换社区目前的 perjob 模式的。目前 application 模式和 perjob 模式最大的区别是用户代码在 client 端还是 jobmanager 端运行。在 K8s 部署上,因为用户的 jar 和依赖都能够提前打在镜像外面,所以反对 application 模式就变得非常容易。

留神:Application 模式是在 Flink 1.11 中才反对的性能,须要应用对应的 Flink client 和镜像。能够参考社区文档来构建本人的镜像。

./bin/flink run-application -p 10 -t kubernetes-application \
-Dkubernetes.cluster-id=k8s-app1 \
-Dkubernetes.container.image=<ImageName> \
-Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
local:///opt/flink/examples/streaming/WindowJoin.jar

目前性能的状态

  • Native Kubernetes Session 模式

FLINK-9953:曾经在 Flink 1.10 公布:

https://issues.apache.org/jira/browse/FLINK-9953

  • Native Kubernetes Application 模式

FLINK-10934:打算在 Flink 1.11 公布

https://issues.apache.org/jira/browse/FLINK-10934

  • Native 模式下高可用

FLINK-12884:目前高可用形式是基于 zk 实现的,将来是心愿不依赖于内部组件,基于 K8s 的 ConfigMap 去做 Meta 存储和 Leader 选举。目前曾经有外部实现,将来将会奉献给社区。

https://issues.apache.org/jira/browse/FLINK-12884

  • 其余性能反对:

FLINK-14460:打算陆续在 Flink 1.11/1.12 两个版本中公布和欠缺

https://issues.apache.org/jira/browse/FLINK-14460

包含内容如下:

  • Label,annotation,node-selector:心愿将任务调度到某个集群特定的机器上的利用场景可能须要 node-selector, 心愿给集群打特定的 Label 并且内部能拜访到,能够应用 Label 的性能等等。
  • Sidecar container:帮忙实现日志收集等
  • Init container:能够帮忙在 JobManager,TaskManager 启动之前,把 Jar 下载好,这样咱们能够应用对立的现象,不须要把用户 Jar 打到镜像里
  • 存储优化
  • Pod 模板实现一些不罕用的性能等等

目前社区的 Flink on K8s 的 native 计划还在疾速倒退和欠缺,心愿大家多多试用并且提出反馈意见,如果有趣味也十分欢送一起参加进来开发。

正文完
 0