Hello 大家好,我是来自 BOSS 直聘的基础架构工程师周佩洁。次要负责 BOSS 直聘算法平台的数据流链路的架构和设计。上面由我介绍 Alluxio+Fluid 在 BOSS 直聘算法平台的落地实际,咱们本期的分享次要分为以下几个内容:
首先,我会介绍一下 Alluxio 在咱们这边应用的背景,另外我会介绍一下咱们在应用过程中遇到的挑战。再之后我会介绍咱们的整个架构设计,最初我会介绍一下应用 Fluid 治理 Alluxio 在 k8s 集群上的落地,以及咱们实现的 Alluxio 在 k8s 集群上的动静扩缩容的实际。
Arsenal 深度学习平台是一个集数据预处理,大规模分布式训练,模型优化,模型在线推理等多位一体的一站式算法平台,咱们也始终致力于打造更高效的一体化算法链路,这其中也包含数据局部的建设和减速,Alluxio 正是作为训练链路减速的一个重要组件而被引入到算法平台中来的。在模型训练之前,咱们会应用 spark 将训练数据从其余异构数据源加载到算法平台的 ceph 中,作为模型的训练数据, 数据的导入速度间接决定了日更模型的上线速度。
这里就会有一个问题,如何更快,更稳固地将训练数据导入到算法平台的环境中以便于疾速地进行后续的训练。
咱们发现,咱们应用 Spark 在进行算法平台数据导入的过程中,Spark 自身的文件在写入的时候,为了保障一致性,做了一个两段式或者三段式的提交协定,比方在 spark v1 版本的提交协定中,在 Spark 写入的时候,会创立一个双重的 temporary 的目录,在 commit task 的时候会将目录移到外层,而后在 driver commit job 的时候再往最外层挪动到咱们理论的目录里,那在 v2 的 Spark 版本的提交协定外面,它也是会最开始写两层 temporary 目录,最初会把这个目录移到最外层。
其实 v1 和 v2 版本在 Spark 写入 ceph 都有一些问题是无奈躲避的,次要就是 ceph 并不像文件系统一样能够间接进行 rename 的操作,对 ceph 来说 rename 意味着数据的迁徙,也就是如果我把 3 个 TB 的数据写入到 ceph 里,那实际上在我挪动目录的时候,ceph 的 worker 节点会呈现 3TB*n(n 依照 spark v1,v2 的提交协定会有不同的值) 的大规模的数据传输。这样岂但会给 ceph 整个集群带来比拟大的网络负载,同时也会使整个 ceph 集群处于一个极度不稳固的状态。
除了这样一个问题外,咱们在原有的架构 Spark 写入 ceph 的过程中还发现了一些其余的问题:
比如说没有方法做流量的管制,因为 Spark 是以比拟快的速度对 ceph 进行写入的,如果在没有流量管制的状况下,就会导致整个集群的稳定性受到影响。
另外咱们的算法平台次要是反对多种的大数据处理框架,比如说 Spark/Flink/Ppresto 等等,同时也会反对多种的机器学习和深度学习的框架,比如说 TensorFlow,PyTorch 和其余用户自研的学习框架,这个时候咱们就不足一套对立的数据读取和数据写入的 API。
应用 ceph 大集群,咱们也没有方法很好的做到网络隔离以及读写隔离。网络隔离的话,比如说咱们的 ceph 可能是一个 n 个节点的 ceph,那在写入的时候就会遇到一个零碎瓶颈,这个瓶颈在于这个 n 台 ceph 节点的网络带宽是多少,比如说有一个用户的写入工作把 ceph 的网络带宽或者磁盘的 IOPS 全部打满,这个时候就会影响其余用户的读写和写入。
如何从一个大集群里为不同用户拆分权限和容量也成了一个比拟困哪的问题。
咱们心愿数据能够尽快地进入到算法平台的环境用于训练,然而对存储底层的组件来说,咱们心愿能够有流控,而流控和写入速度自身就是互相矛盾的指标。
因而咱们就引入了 Alluxio 来解决这样的一些问题。
当初大家看到的就是咱们调整之后的架构,咱们会把整个的数据算法平台的数据层分成三层:
最上层是咱们的数据计算层,这部分咱们应用 Spark 和 Flink 框架,把数据从异构的数据源写入到咱们的算法平台,而后数据计算层其余的,比如说像 TensorFlow,PyTorch 或者是其余的计算框架,会通过 Alluxio 的 fuse 从咱们的缓存层读取数据;
缓存层咱们次要是应用 Alluxio 进行实现,而后对同名的读取 API 进行了封装。
最上层是咱们算法平台的数据存储层,包含 ceph,TiDB 等等。
应用这样的模式就解决了我方才提到的一些问题。
首先就是一个流控的问题,在流控局部的话,咱们应用了 Alluxio 去进行数据的缓冲,通过管制 Alluxio 的 job worker 应用限度的 CPU 和内存,异步写的形式对写入到 ceph 的流量进行了无效的限度。
如上图所示,咱们能够看到上侧是进入到 Alluxio 的流量,下侧是 Alluxio 写到 ceph 的流量,咱们能够看到这里有比拟显著的限流,写入到 ceph 的数据流速是一个比较稳定的状态,这样能够最大水平保障 ceph 集群的稳定性。
咱们说到了 ceph 具备稳定性,它是一个大集群,对于多个组的用于算法训练的同学,它的应用并不是敌对的,所以咱们在 k8s 集群里会依据用户的需要搭建多种独立的 Alluxio 的小集群,而 CNCF 的 Sandbox 我的项目 Fluid 中 Dataset 和 Runtime 的形象正好和咱们的初衷不约而同。
咱们依照用户的需要去创立 Alluxio cluster 的集群,并且在 k8s 上的同一个节点会部署不同集群的不同 worker,用于 spark 写入数据到 Alluxio,以及 Alluxio 同步数据到 ceph 里,这样就能够最大限度的保障用户想要的容量和权限的限度。权限限度的话,咱们对立在 Alluxio 上进行了管制,并对一部分权限源码进行了革新。容量局部咱们应用 Fluid 联合 k8s HPA 实现,后续会在 HPA 里具体的讲到。
上面我次要介绍一下应用 Fluid 在 k8s 治理 Alluxio 集群生命周期的实际,首先咱们会有一些相应的 pod 固定地部署在 k8s 集群里用于 allxuio 集群的局部调度和治理,咱们应用 Fluid 在集群中创立了一个 dataset controller 和一个 AlluxioRuntime controller,并对 Fluid 的调度层和 Csi Plugin 局部进行了二次开发,新增了 Fluid HPA 动静扩缩容逻辑。当用户尝试去创立一个 Alluxio 集群的时候,咱们会创立 AlluxioRuntime CR,AlluxioRuntime controller 在接管到这个 CR 之后,会对 alluxio values 进行渲染,并通过 helm install 装置 alluxio 的 master 和 worker 节点,同时咱们为了实现 Alluxio 每一个小集群在 k8s 上的动静扩缩容和调度,咱们将原 Alluxio 的 worker 的 DaemonSet 更改为 Deployment,并在下面嵌套了 k8s HPA。
另外在 k8s 集群的每一个节点上,咱们都部署了一个 csi plugin,次要是用于当 Tensorflow 作业或者是 PyTorch 的作业读取 Alluxio 数据的时候,应用 csi plugin 做近程挂载。
上面的两张图次要介绍了 csi plugin 以及训练作业在进行挂载时的解决流程。
咱们的 TensorFlow 或者 PyTorch 去应用这个数据集的时候,咱们会动静为 TensorFlow 的 pod 或者是 PyTorch 的 pod 里默认地去绑定 Alluxio 的 pvc。
当 TensorFlow pod 被调度到集群的某一个节点的时候,csi plugin 会相应的去调用 NodeStageVolume 和 NodePublishVolume,而后去动静地创立 Alluxio fuse pod,fuse pod 会建设 Alluxio 集群的近程连贯,从而去开始一个训练。在发稿时,这种 Lazy 启动的能力 Fluid 也曾经反对了。比如说咱们有第二个训练,被调到这个节点上之后,咱们将会复用这个曾经创立的 fuse pod。当这个节点上没有任何应用 Alluxio 集群的训练任务的时候,csi plugin 会尝试去把 fuse 停掉,咱们通过这样的形式就保障了多个 alluxio 集群和训练节点在 k8s 集群调度最大的可控性以及灵活性。
为了实现 Alluxio 集群在 k8s 上的动静扩缩容,咱们还引入了一些其余的组件,首先咱们引入了 prometheus,对整个 Alluxio 以及 Alluxio 相干的指标进行了采集,作为 HPA 的判断条件,在调度层面上咱们通过二次开发引进了 volcano 调度器,定制了咱们外部的磁盘调度逻辑,资源的预留逻辑以及 OOM Kill pervent,在 fuse 的调度流程里,比方 TensorFlow pod 可能应用的是 20 个 GB 的一个存储,fuse 自身也须要 10GB 的存储,所以咱们应用 volcano 为这个节点进行一部分的资源预留。
在上面这个 PPT 外面,咱们次要说一下应用 Alluxio 集群动静扩缩容的一些指标,以及具体的 HPA。如上图所示左侧, 咱们能够看到有一些指标,指标来自于两局部,一部分是 AlluxioRuntime 默认提供的一些 metrics,这部分 metrics 被咱们应用 prometheus 收集到,通过 cluster metrics 能够进行 HPA 的管制。
如何在不失落数据的状况下对 alluxio 集群进行缩容是咱们比拟关注的,为实现缩容咱们对 Alluxio java client 和 Fluid 缩容逻辑也进行了二次开发,从而在 Fluid 中开发了一个指标叫 dataset client number,这个指标是通过 prometheus 的 exporter 动静地去监控整个集群里有多少利用在应用某一个数据集。如果是有一个利用在应用的话,咱们就会把这个数量进行固定的加 1,而后咱们对下面提到的这些指标进行一些加权计算,组成了一个固定的算法从新生成了 capacity_used_rate 这个 metrics,用于管制 HPA 的扩缩容。在右侧,咱们当初看到的 HPA 的 CR 里会有一个 min replicas 和 max replicas。当用户去申请一个固定容量的 Alluxio 集群的时候,比如说他申请了 1 个 10T 的 Alluxio 集群,咱们会用 10T 除以每个节点能够调配的内存,求得 1 个最大的 worker 正本数。比如说是每个节点 1 个 T,那这样的话将会有 10 个节点,就是说 Alluxio 集群最大能够有 10 个 worker,最小的容量个别设置为 1 或者 0,也就是说在没有人应用的时候,Alluxio 集群将会被缩容到 1 个正本或 0 个正本,这样能够最大限度的节俭集群的资源,也能够最大限度地保障所有的 Alluxio 集群的节点都能够在 k8s 上进行动静的扩缩容,同时能够保证数据不会产生失落。
在上面的 behavior 里咱们做了一些非凡的解决,为了保障当用户应用 Alluxio 的时候,Alluxio 的 worker 能够尽快扩起来,咱们应用了一个速率,比如说 300,每次扩 3 个正本。在缩容的局部咱们是选用了每 60 秒缩容一个正本,比如说有两个利用在应用 Alluxio 的集群,可能两个利用都停掉了,在一分钟之内又有别的利用被调度上,这个时候 Alluxio 集群是不会进行缩容的。
上图咱们做了一个比拟大的图,显示了 HPA 的整体流程,首先在集群的最上层咱们应用 worker deployment 替换了 daemonset,管控 Alluxio worker pod,同时会有一个 master statefulsets.apps.kruise.io 用于管控 Alluxio 的 master,这里咱们将 Fluid 原有的 statefulset 替换为 kruise 的 statefulset, 进步 master HA 模式在滚动重启时的稳定性。另外还会有相应的一个 controller,AlluxioRuntime controller 会应用 prometheus 的 exporter 把自定义的指标汇报给 prometheus,最初通过 prometheus adapter 做为 HPA 的 metrics。
通过这样的形式,能够保障比拟高的数据写入以及模型训练的速度,且在没有数据失落的状况下,实现了 worker 在集群中动静进行扩容和缩容
上面咱们次要介绍后续想要对 Alluxio 进行的一些扩大。
在咱们整个的生产实践中,咱们发现 Alluxio worker 在 Spark 进行大规模的写入的时候,它会占用比拟多的堆外内存,对咱们的集群会有比拟大的累赘。因而咱们下一步会和社区进行单干,缩小每一个 Alluxio worker 的间接内存的应用。
另外从 fuse 的角度来说,因为 fuse 的一些不稳固状况会间接导致 TensorFlow 训练任务或者 PyTorch 训练任务的失败,因而咱们正在尝试调度层面以及 csi plugin 的层面做一些 fuse pod 的 remount,比如说 fuse pod 呈现一些异样行为的时候,咱们会在 TensorFlow 或者 PyTorch 的 pod 里从新把操作系统进行一些挂载,使训练能够持续往下进行。
另外,因为咱们数据的流向是通过 Spark 到 Alluxio,再到 ceph 的这样一个流向,咱们目前在写入的过程中会把 Alluxio 数据 pin˙住,保证数据肯定是全副写到了 ceph 后才会进行驱赶,因而咱们这里将会拓展数据的驱赶策略,也就是说数据在保留到 ceph 里之后,就能够进行驱赶。
最初,非常感激 Alluxio 社区范斌老师,邱露老师,朱禹老师,Fluid 社区车漾老师对咱们的大力支持及披星戴月的致力工作。对 Alluxio 和 Fluid 的批改咱们将尽快反馈给社区, 期待和 Alluxio,Fluid 社区的进一步深度单干。