关于kubernetes:在Kubernetes上运行Flink

7次阅读

共计 5165 个字符,预计需要花费 13 分钟才能阅读完成。

Flink是目前最热门的分布式流 / 批处理框架,而 Kubernetes 是目前最热门的资源管理和调度平台。Flink反对在 Kubernetes 上采纳 Session 模式或 Application 模式部署作业。基于实践经验,本文次要探讨在 Kubernetes 上部署 Flink 作业须要留神的中央。

环境:

  • k8s: 1.15
  • flink-client:flink-1.11.2

测试虽基于 flink-1.11.2,参考 1.12 的文档也不妨:

native_kubernetes

k8s 上运行 flink 工作有两种模式,session 模式和 application 模式(晚期还有一种 pre-job 模式,但已废除)

session 模式下,在 k8s 上会部署一个运行 jobmanager 的 pod(以及包含 deployment/rs/service/configmap 等资源)。后续通过提交命令提交的工作,都由这个 jobmanager 的 pod 负责向 k8s 申请 taskmanager 的 pod。这种模式与 standalone 有些相似,惟一的不同是,standalone 模式须要实现部署好 master 和 worker。

application 模式下,每个作业会在 k8s 上部署一套 jm 和 tm,这跟 yarn 模式是相似的。

筹备

RBCA

基于上述原理,不论是哪种模式都须要 pod 有权限创立其余资源,因而须要思考 RBAC。

正如文档所说,须要当时为 default 这个 servicename 设置对应的 role,实际中咱们部署如下配置,参考 kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services:

apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: fabric8-rbac
subjects:
  - kind: ServiceAccount
    # Reference to upper's `metadata.name`
    name: default
    # Reference to upper's `metadata.namespace`
    namespace: default
roleRef:
  kind: ClusterRole
  name: cluster-admin
  apiGroup: rbac.authorization.k8s.io

最开始遗留上述步骤节约了大量的调试工夫。

在 k8s 节点上提交

另外,提交工作最好在 k8s 的节点上进行,因为如下起因

KubeConfig, which has access to list, create, delete pods and services, configurable via ~/.kube/config. You can verify permissions by running kubectl auth can-i <list|create|edit|delete> pods.

CoreDNS

k8s 上须要实现装置好 CoreDNS

解决日志配置

flink1.11 的客户端对 log 配置解决并不好,这造成调试和排错艰难,所以倡议上来先解决一下客户端的配置:

flink-conf.yaml 减少如下配置:

kubernetes.container-start-command-template: %java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%
kubernetes.container.image.pull-policy: Always
  • kubernetes.container-start-command-template的作用是生成 jobmanager pod 时的启动命令。这里去掉提交命令中最初的的 %redirect%。默认%redirect% 会将规范输入和规范谬误重定向到文件,如果 pod 出错挂掉的话,无奈通过 kubectl logs 命令查看日志
  • 镜像始终拉取。在内网环境下,流量不是问题,始终拉取镜像,不便前面批改根底镜像后,能及时拉取

logback-console.xmllog4j-console.properties重命名为 logback.xmllog4j.properties。这将使得日志打印到 stdout 和 stderr,否则日志将打印到文件

Session mode

首先通过如下命令提交 jobmanager:

$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

如果不胜利的的话,倡议通过 kubectl logs 命令看下问题。

接下来提交作业:

$ ./bin/flink run --target kubernetes-session -Dkubernetes.cluster-id=my-first-flink-cluster ./examples/streaming/TopSpeedWindowing.jar

可能会报错:

Caused by: java.util.concurrent.CompletionException: [org.apache.flink.shaded.netty4.io](http://org.apache.flink.shaded.netty4.io/).netty.channel.AbstractChannel$AnnotatedConnectException: 回绝连贯: /192.168.21.73:8081
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 19 more

显然是客户端尝试将 jobgraph 提交给容器中的 jobmanager 时无奈连贯到 jobmanager。这里想到首先要检查一下 jobmanager 是否失常,进入 jm 容器,日志没有什么报错,应该没问题。容器启动的是 8081 的端口,在里面是无法访问的。这是 k8s 环境的“通病”:简单网络环境。

有几种计划:

  1. 通过 kubectl port-forward 进行本地端口转发,例如:

    kubectl port-forward my-first-flink-cluster-6446bbb6f6-4nnm5 8081:8081 --address 0.0.0.0
  2. jobmanager 会启动一个 rest service 资源,默认采纳 LoadBalancer 类型,咱们能够在集群中装置一个 LoadBalancer,上面介绍了如何装置 metallb

    metallb 自身也是 pod 运行的,依照官网装置没什么问题:

    kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.9.5/manifests/namespace.yaml
    kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.9.5/manifests/metallb.yaml

    首次装置需运行

    kubectl create secret generic -n metallb-system memberlist --from-literal=secretkey="$(openssl rand -base64 128)"

    采纳 layer2 的形式配置,而且地址池的地址段与 node 是同一个地址段,这样不须要配置其余货色:

    config.yaml
    apiVersion: v1
    kind: ConfigMap
    metadata:
      namespace: metallb-system
      name: config
    data:
      config: |
        address-pools:
        - name: default
          protocol: layer2
          addresses:
          - 192.168.21.210-192.168.21.215

配置 loadbalancer 当前:

能够看到 xxx-rest 的 svc,其中的 EXTERNAL-IP 原先始终是 <pending>,当初从地址段中调配了一个地址了。这个地址无奈 ping,然而能够拜访:

解决了服务内部拜访的问题,就能失常运行测试作业了。

session 模式总结:

  1. session 模式下,在 k8s 上提交一个 jobmanager 的 pod,要保障服务账号有治理 pod 的权限,否则无奈运行
  2. 客户端提交的时候要留神买通网络,能够用 port-forward 来做
  3. 客户端提交终端会阻塞,但 ctrl- C 也没关系
  4. jobmanager 会为作业向 k8s 申请生成 taskmanager 容器运行作业

Application mode

Session mode 尽管看似简略,然而对于扫清环境阻碍起到至关重要的作用。下面提到 Application mode 与 yarn 其实是比拟相似的,是一种更靠近生产的部署模式。

首先须要将打包好的应用程序 jar 包打入镜像:

FROM flink:1.11.2-scala_2.11
 
RUN mkdir -p $FLINK_HOME/usrlib
COPY jax-flink-entry-2.0-SNAPSHOT.jar $FLINK_HOME/usrlib/jax-flink-entry-2.0-SNAPSHOT.jar
COPY kafka-clients-2.2.0.jar $FLINK_HOME/lib/kafka-clients-2.2.0.jar

以下面的 Dockerfile 为例,把咱们的利用程序包放到 $FLINK_HOME/usrlib(这是个非凡的目录,默认 Flink 在运行的时候会从这个目录加载用户的 jar 包)。同时,咱们把依赖包放到$FLINK_HOME/lib 下。

构建镜像并推送到外部的镜像仓库:

docker build -t xxxxx:5000/jax-flink:lastest .
docker push xxxx:5000/jax-flink:lastest

以 Application mode 提交作业

./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=jax-application-cluster \
-Dkubernetes.container.image=xxxx:5000/jax-flink:lastest \
-c com.eoi.jax.flink_entry.FlinkMainEntry  \
local:///opt/flink/usrlib/jax-flink-entry-2.0-SNAPSHOT.jar ...

作业会启动独立的 jobmanager 和 taskmanager。Applicatoin mode 的特点是作业的构建(生成 jobgraph 的过程)不在客户端实现,而是在 jobmanager 上实现,这一点与 spark 的 driver 是相似的。

一些提交命令参数的作用:

  • 利用本身的参数:会在 flink-conf.yaml 中生成:$internal.application.program-args。这将最终最为用户 main 函数的参数[]String
  • -class:会在 flink-conf.yaml 中生成$internal.application.main
  • -C: 会在 flink-conf.yarml 中生成 pipeline.classpaths(必须是非法的 URL)。 然而,pipeline.classpaths 中的 URL 不会被加到运行用户 main 函数的类加载器中,这意味着 - C 指定的依赖包无奈被用户代码应用。笔者曾经向 Flink 提交了相干的 issue 和 PR,曾经被确定为 BUG。FLINK-21289
  • containerized.taskmanager.env 和 containerized.master.env 测试下来是失效的,能够生成容器的 env
正文完
 0