乐趣区

TalkingData的Spark-On-Kubernetes实践

摘要: 本文整理自 talkingdata 云架构师徐蓓的分享,介绍了 Spark On Kubernetes 在 TalkingData 的实践。

众所周知,Spark 是一个快速、通用的大规模数据处理平台,和 Hadoop 的 MapReduce 计算框架类似。但是相对于 MapReduce,Spark 凭借其可伸缩、基于内存计算等特点,以及可以直接读写 Hadoop 上任何格式数据的优势,使批处理更加高效,并有更低的延迟。实际上,Spark 已经成为轻量级大数据快速处理的统一平台。
Spark 作为一个数据计算平台和框架,更多的是关注 Spark Application 的管理,而底层实际的资源调度和管理更多的是依靠外部平台的支持:

Spark 官方支持四种 Cluster Manager:Spark standalone cluster manager、Mesos、YARN 和 Kubernetes。由于我们 TalkingData 是使用 Kubernetes 作为资源的调度和管理平台,所以 Spark On Kubernetes 对于我们是最好的解决方案。

如何搭建生产可用的 Kubernetes 集群

部署

目前市面上有很多搭建 Kubernetes 的方法,比如 Scratch、Kubeadm、Minikube 或者各种托管方案。因为我们需要简单快速地搭建功能验证集群,所以选择了 Kubeadm 作为集群的部署工具。部署步骤很简单,在 master 上执行:

kubeadm init

在 node 上执行:

kubeadm join --token  : --discovery-token-ca-cert-hash sha256:

具体配置可见官方文档:https://kubernetes.io/docs/setup/independent/create-cluster-kubeadm/。
需要注意的是由于国内网络限制,很多镜像无法从 k8s.gcr.io 获取,我们需要将之替换为第三方提供的镜像,比如:https://hub.docker.com/u/mirrorgooglecontainers/。

网络

Kubernetes 网络默认是通过 CNI 实现,主流的 CNI plugin 有:Linux Bridge、MACVLAN、Flannel、Calico、Kube-router、Weave Net 等。Flannel 主要是使用 VXLAN tunnel 来解决 pod 间的网络通信,Calico 和 Kube-router 则是使用 BGP。由于软 VXLAN 对宿主机的性能和网络有不小的损耗,BGP 则对硬件交换机有一定的要求,且我们的基础网络是 VXLAN 实现的大二层,所以我们最终选择了 MACVLAN。
CNI MACVLAN 的配置示例如下:

{
  "name": "mynet",
  "type": "macvlan",
  "master": "eth0",
  "ipam": {
    "type": "host-local",
    "subnet": "10.0.0.0/17",
    "rangeStart": "10.0.64.1",
    "rangeEnd": "10.0.64.126",
    "gateway": "10.0.127.254",
    "routes": [
      {"dst": "0.0.0.0/0"},
      {
        "dst": "10.0.80.0/24",
        "gw": "10.0.0.61"
      }
    ]
  }
}

Pod subnet 是 10.0.0.0/17,实际 pod ip pool 是 10.0.64.0/20。cluster cidr 是 10.0.80.0/24。我们使用的 IPAM 是 host-local,规则是在每个 Kubernetes node 上建立 /25 的子网,可以提供 126 个 IP。我们还配置了一条到 cluster cidr 的静态路由 10.0.80.0/24,网关是宿主机。这是因为容器在 macvlan 配置下 egress 并不会通过宿主机的 iptables,这点和 Linux Bridge 有较大区别。在 Linux Bridge 模式下,只要指定内核参数 net.bridge.bridge-nf-call-iptables = 1,所有进入 bridge 的流量都会通过宿主机的 iptables。经过分析 kube-proxy,我们发现可以使用 KUBE-FORWARD 这个 chain 来进行 pod 到 service 的网络转发:

-A FORWARD -m comment --comment "kubernetes forward rules" -j KUBE-FORWARD
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -s 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -d 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT

最后通过 KUBE-SERVICES 使用 DNAT 到后端的 pod。pod 访问其他网段的话,就通过物理网关 10.0.127.254。
还有一个需要注意的地方是出于 kernel security 的考虑,link 物理接口的 macvlan 是无法直接和物理接口通信的,这就导致容器并不能将宿主机作为网关。我们采用了一个小技巧,避开了这个限制。我们从物理接口又创建了一个 macvlan,将物理 IP 移到了这个接口上,物理接口只作为网络入口:

$ cat /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
IPV6INIT=no
BOOTPROTO=none
$ cat /etc/sysconfig/network-scripts/ifcfg-macvlan
DEVICE=macvlan
NAME=macvlan
BOOTPROTO=none
ONBOOT=yes
TYPE=macvlan
DEVICETYPE=macvlan
DEFROUTE=yes
PEERDNS=yes
PEERROUTES=yes
IPV4_FAILURE_FATAL=no
IPADDR=10.0.0.61
PREFIX=17
GATEWAY=10.0.127.254
MACVLAN_PARENT=eth0
MACVLAN_MODE=bridge

这样两个 macvlan 是可以互相通信的。

Kube-dns

默认配置下,Kubernetes 使用 kube-dns 进行 DNS 解析和服务发现。但在实际使用时,我们发现在 pod 上通过 service domain 访问 service 总是有 5 秒的延迟。使用 tcpdump 抓包,发现延迟出现在 DNS AAAA。进一步排查,发现问题是由于 netfilter 在 conntrack 和 SNAT 时的 Race Condition 导致。简言之,DNS A 和 AAAA 记录请求报文是并行发出的,这会导致 netfilter 在_nf_conntrack_confirm 时认为第二个包是重复的(因为有相同的五元组),从而丢包。具体可看我提的 issue:https://github.com/kubernetes/kubernetes/issues/62628。一个简单的解决方案是在 /etc/resolv.conf 中增加 options single-request-reopen,使 DNS A 和 AAAA 记录请求报文使用不同的源端口。我提的 PR 在:https://github.com/kubernetes/kubernetes/issues/62628,大家可以参考。我们的解决方法是不使用 Kubernetes service,设置 hostNetwork=true 使用宿主机网络提供 DNS 服务。因为我们的基础网络是大二层,所以 pod 和 node 可以直接通信,这就避免了 conntrack 和 SNAT。

Spark 与 Kubernetes 集成

由于 Spark 的抽象设计,我们可以使用第三方资源管理平台调度和管理 Spark 作业,比如 Yarn、Mesos 和 Kubernetes。目前官方有一个 experimental 项目,可以将 Spark 运行在 Kubernetes 之上:https://spark.apache.org/docs/latest/running-on-kubernetes.html。

基本原理

当我们通过 spark-submit 将 Spark 作业提交到 Kubernetes 集群时,会执行以下流程:

  • Spark 在 Kubernetes pod 中创建 Spark driver
  • Driver 调用 Kubernetes API 创建 executor pods,executor pods 执行作业代码
  • 计算作业结束,executor pods 回收并清理
  • driver pod 处于 completed 状态,保留日志,直到 Kubernetes GC 或者手动清理

先决条件

  • Spark 2.3+
  • Kubernetes 1.6+
  • 具有 Kubernetes pods 的 list, create, edit 和 delete 权限
  • Kubernetes 集群必须正确配置 Kubernetes DNS[1]

如何集成

Docker 镜像

由于 Spark driver 和 executor 都运行在 Kubernetes pod 中,并且我们使用 Docker 作为 container runtime enviroment,所以首先我们需要建立 Spark 的 Docker 镜像。
在 Spark distribution 中已包含相应脚本和 Dockerfile,可以通过以下命令构建镜像:

$ ./bin/docker-image-tool.sh -r <repo> -t my-tag build
$ ./bin/docker-image-tool.sh -r <repo> -t my-tag push

提交作业

在构建 Spark 镜像后,我们可以通过以下命令提交作业:

$ bin/spark-submit \
    --master k8s://https://: \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
    --files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.container.image= \
    https://path/to/examples.jar

其中,Spark master 是 Kubernetes api server 的地址,可以通过以下命令获取:

$ kubectl cluster-info
Kubernetes master is running at http://127.0.0.1:6443

Spark 的作业代码和依赖,我们可以在 –jars、–files 和最后位置指定,协议支持 http、https 和 HDFS。
执行提交命令后,会有以下输出:

任务结束,会输出:

访问 Spark Driver UI

我们可以在本地使用 kubectl port-forward 访问 Driver UI:

$ kubectl port-forward <driver-pod-name> 4040:4040

执行完后通过 http://localhost:4040 访问。

访问日志

Spark 的所有日志都可以通过 Kubernetes API 和 kubectl CLI 进行访问:

$ kubectl -n=<namespace> logs -f <driver-pod-name>

如何实现租户和资源隔离

Kubernetes Namespace

在 Kubernetes 中,我们可以使用 namespace 在多用户间实现资源分配、隔离和配额。Spark On Kubernetes 同样支持配置 namespace 创建 Spark 作业。
首先,创建一个 Kubernetes namespace:

$ kubectl create namespace spark

由于我们的 Kubernetes 集群使用了 RBAC,所以还需创建 serviceaccount 和绑定 role:

$ kubectl create serviceaccount spark -n spark
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark

并在 spark-submit 中新增以下配置:

$ bin/spark-submit \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.namespace=spark \
    ...

资源隔离

考虑到我们 Spark 作业的一些特点和计算资源隔离,前期我们还是选择了较稳妥的物理隔离方案。具体做法是为每个组提供单独的 Kubernetes namespace,计算任务都在各自 namespace 里提交。计算资源以物理机为单位,折算成 cpu 和内存,纳入 Kubernetes 统一管理。在 Kubernetes 集群里,通过 node label 和 PodNodeSelector 将计算资源和 namespace 关联。从而实现在提交 Spark 作业时,计算资源总是选择 namespace 关联的 node。
具体做法如下:
1、创建 node label

$ kubectl label nodes <node_name> spark:spark

2、开启 Kubernetes admission controller 
我们是使用 kubeadm 安装 Kubernetes 集群,所以修改 /etc/kubernetes/manifests/kube-apiserver.yaml,在 –admission-control 后添加 PodNodeSelector。

$ cat /etc/kubernetes/manifests/kube-apiserver.yaml
apiVersion: v1
kind: Pod
metadata:
  annotations:
    scheduler.alpha.kubernetes.io/critical-pod: ""
  creationTimestamp: null
  labels:
    component: kube-apiserver
    tier: control-plane
  name: kube-apiserver
  namespace: kube-system
spec:
  containers:
  - command:
    - kube-apiserver
    - --secure-port=6443
    - --proxy-client-cert-file=/etc/kubernetes/pki/front-proxy-client.crt
    - --admission-control=Initializers,NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,NodeRestriction,ResourceQuota,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,PodNodeSelector
...

3、配置 PodNodeSelector
在 namespace 的 annotations 中添加 scheduler.alpha.kubernetes.io/node-selector: spark=spark。

apiVersion: v1
kind: Namespace
metadata:
 annotations:
   scheduler.alpha.kubernetes.io/node-selector: spark=spark
 name: spark

完成以上配置后,可以通过 spark-submit 测试结果:

$ spark-submit
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark 
--conf spark.kubernetes.namespace=spark 
--master k8s://https://xxxx:6443     
--deploy-mode cluster     
--name spark-pi     
--class org.apache.spark.examples.SparkPi     
--conf spark.executor.instances=5     
--conf spark.kubernetes.container.image=xxxx/library/spark:v2.3     
http://xxxx:81/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar

我们可以看到,Spark 作业全分配到了关联的 hadooptest-001 到 003 三个 node 上。

待解决问题

Kubernetes HA

Kubernetes 的集群状态基本都保存在 etcd 中,所以 etcd 是 HA 的关键所在。由于我们目前还处在半生产状态,HA 这方面未过多考虑。有兴趣的同学可以查看:https://kubernetes.io/docs/setup/independent/high-availability/。

日志

在 Spark On Yarn 下,可以开启 yarn.log-aggregation-enable 将日志收集聚合到 HDFS 中,以供查看。但是在 Spark On Kubernetes 中,则缺少这种日志收集机制,我们只能通过 Kubernetes pod 的日志输出,来查看 Spark 的日志:

$ kubectl -n=<namespace> logs -f <driver-pod-name>

收集和聚合日志,我们后面会和 ES 结合。
监控
我们 TalkingData 内部有自己的监控平台 OWL[2](已开源),未来我们计划编写 metric plugin,将 Kubernetes 接入 OWL 中。
混合部署
为了保证 Spark 作业时刻有可用的计算资源,我们前期采用了物理隔离的方案。显而易见,这种方式大幅降低了物理资源的使用率。下一步我们计划采用混部方案,通过以下三种方式实现:

  • 将 HDFS 和 Kubernetes 混合部署
  • 为 Spark 作业和 Kubernetes node 划分优先级,在低优先级的 node 上同时运行一些无状态的其他生产服务
  • 利用云实现资源水平扩展,以防止资源突增

资源扩展

在采用以下两种方法增加资源使用率时,集群可能会面临资源短缺和可用性的问题:

  • 混合部署
  • 资源超卖

这会导致运行资源大于实际物理资源的情况(我称之为资源挤兑)。一种做法是给资源划分等级,优先保证部分等级的资源供给。另一种做法是实现资源的水平扩展,动态补充可用资源,并在峰值过后自动释放。我在另一篇文章中阐述了这种设计理念:https://xiaoxubeii.github.io/articles/k8s-on-cloud/。
TalkingData 有自研的多云管理平台,我们的解决方法是实现单独的 Kubernetes tdcloud-controller-manager 作为资源的 provider 和 manager,通过 TalkingData OWL 监控告警,实现资源的水平扩展。

原文链接:http://www.ipshop.xyz/7889.html


本文作者:开源大数据 EMR

阅读原文

本文为云栖社区原创内容,未经允许不得转载。

退出移动版