Hello大家好,我是来自BOSS直聘的基础架构工程师周佩洁。次要负责BOSS直聘算法平台的数据流链路的架构和设计。上面由我介绍Alluxio+Fluid在BOSS直聘算法平台的落地实际,咱们本期的分享次要分为以下几个内容:

首先,我会介绍一下Alluxio在咱们这边应用的背景,另外我会介绍一下咱们在应用过程中遇到的挑战。再之后我会介绍咱们的整个架构设计,最初我会介绍一下应用Fluid治理Alluxio在k8s集群上的落地,以及咱们实现的Alluxio在k8s集群上的动静扩缩容的实际。

Arsenal深度学习平台是一个集数据预处理,大规模分布式训练,模型优化,模型在线推理等多位一体的一站式算法平台,咱们也始终致力于打造更高效的一体化算法链路,这其中也包含数据局部的建设和减速,Alluxio正是作为训练链路减速的一个重要组件而被引入到算法平台中来的。在模型训练之前,咱们会应用spark将训练数据从其余异构数据源加载到算法平台的ceph中,作为模型的训练数据,数据的导入速度间接决定了日更模型的上线速度。

这里就会有一个问题,如何更快,更稳固地将训练数据导入到算法平台的环境中以便于疾速地进行后续的训练。


咱们发现,咱们应用Spark在进行算法平台数据导入的过程中,Spark自身的文件在写入的时候,为了保障一致性,做了一个两段式或者三段式的提交协定,比方在 spark v1版本的提交协定中,在Spark写入的时候,会创立一个双重的temporary的目录,在commit task的时候会将目录移到外层,而后在driver commit job的时候再往最外层挪动到咱们理论的目录里,那在v2的Spark版本的提交协定外面,它也是会最开始写两层temporary目录,最初会把这个目录移到最外层。

其实v1和v2版本在Spark写入ceph都有一些问题是无奈躲避的,次要就是ceph并不像文件系统一样能够间接进行rename的操作,对ceph来说rename意味着数据的迁徙,也就是如果我把3个TB的数据写入到ceph里,那实际上在我挪动目录的时候,ceph的worker节点会呈现3TB*n(n依照spark v1,v2的提交协定会有不同的值)的大规模的数据传输。这样岂但会给ceph整个集群带来比拟大的网络负载,同时也会使整个ceph集群处于一个极度不稳固的状态。

除了这样一个问题外,咱们在原有的架构Spark写入ceph的过程中还发现了一些其余的问题:

比如说没有方法做流量的管制,因为Spark是以比拟快的速度对ceph进行写入的,如果在没有流量管制的状况下,就会导致整个集群的稳定性受到影响。

另外咱们的算法平台次要是反对多种的大数据处理框架,比如说Spark/Flink/Ppresto等等,同时也会反对多种的机器学习和深度学习的框架,比如说TensorFlow,PyTorch和其余用户自研的学习框架,这个时候咱们就不足一套对立的数据读取和数据写入的API。

应用ceph大集群,咱们也没有方法很好的做到网络隔离以及读写隔离。网络隔离的话,比如说咱们的ceph可能是一个n个节点的ceph,那在写入的时候就会遇到一个零碎瓶颈,这个瓶颈在于这个n台ceph节点的网络带宽是多少,比如说有一个用户的写入工作把ceph的网络带宽或者磁盘的IOPS全部打满,这个时候就会影响其余用户的读写和写入。

如何从一个大集群里为不同用户拆分权限和容量也成了一个比拟困哪的问题。

咱们心愿数据能够尽快地进入到算法平台的环境用于训练,然而对存储底层的组件来说,咱们心愿能够有流控,而流控和写入速度自身就是互相矛盾的指标。

因而咱们就引入了Alluxio来解决这样的一些问题。


当初大家看到的就是咱们调整之后的架构,咱们会把整个的数据算法平台的数据层分成三层:

最上层是咱们的数据计算层,这部分咱们应用Spark和Flink框架,把数据从异构的数据源写入到咱们的算法平台,而后数据计算层其余的,比如说像TensorFlow,PyTorch或者是其余的计算框架,会通过Alluxio的fuse从咱们的缓存层读取数据;

缓存层咱们次要是应用Alluxio进行实现,而后对同名的读取API进行了封装。

最上层是咱们算法平台的数据存储层,包含ceph,TiDB等等。

应用这样的模式就解决了我方才提到的一些问题。

首先就是一个流控的问题,在流控局部的话,咱们应用了Alluxio去进行数据的缓冲,通过管制Alluxio的job worker应用限度的CPU和内存,异步写的形式对写入到ceph的流量进行了无效的限度。

如上图所示,咱们能够看到上侧是进入到Alluxio的流量,下侧是Alluxio写到ceph的流量,咱们能够看到这里有比拟显著的限流,写入到ceph的数据流速是一个比较稳定的状态,这样能够最大水平保障ceph集群的稳定性。

咱们说到了ceph具备稳定性,它是一个大集群,对于多个组的用于算法训练的同学,它的应用并不是敌对的,所以咱们在k8s集群里会依据用户的需要搭建多种独立的Alluxio的小集群,而CNCF的Sandbox我的项目Fluid中Dataset和Runtime的形象正好和咱们的初衷不约而同。

咱们依照用户的需要去创立Alluxio cluster的集群,并且在k8s上的同一个节点会部署不同集群的不同worker,用于spark写入数据到Alluxio,以及Alluxio同步数据到ceph里,这样就能够最大限度的保障用户想要的容量和权限的限度。权限限度的话,咱们对立在Alluxio上进行了管制,并对一部分权限源码进行了革新。容量局部咱们应用Fluid联合k8s HPA实现,后续会在HPA里具体的讲到。


上面我次要介绍一下应用Fluid在k8s治理Alluxio集群生命周期的实际,首先咱们会有一些相应的pod固定地部署在k8s集群里用于allxuio集群的局部调度和治理,咱们应用Fluid在集群中创立了一个dataset controller和一个AlluxioRuntime controller,并对Fluid的调度层和Csi Plugin局部进行了二次开发,新增了Fluid HPA动静扩缩容逻辑。当用户尝试去创立一个Alluxio集群的时候,咱们会创立AlluxioRuntime CR, AlluxioRuntime controller在接管到这个 CR之后,会对alluxio values进行渲染,并通过helm install装置alluxio的 master和worker节点,同时咱们为了实现Alluxio每一个小集群在k8s上的动静扩缩容和调度,咱们将原Alluxio的worker的DaemonSet更改为Deployment,并在下面嵌套了k8s HPA。

另外在k8s集群的每一个节点上,咱们都部署了一个csi plugin,次要是用于当Tensorflow作业或者是PyTorch的作业读取Alluxio数据的时候,应用csi plugin做近程挂载。


上面的两张图次要介绍了csi plugin以及训练作业在进行挂载时的解决流程。

咱们的TensorFlow或者PyTorch去应用这个数据集的时候,咱们会动静为TensorFlow的pod或者是PyTorch的pod里默认地去绑定Alluxio的pvc。

当TensorFlow pod被调度到集群的某一个节点的时候,csi plugin会相应的去调用NodeStageVolume和NodePublishVolume,而后去动静地创立Alluxio fuse pod, fuse pod会建设Alluxio集群的近程连贯,从而去开始一个训练。在发稿时,这种Lazy启动的能力Fluid也曾经反对了。比如说咱们有第二个训练, 被调到这个节点上之后,咱们将会复用这个曾经创立的fuse pod。当这个节点上没有任何应用Alluxio集群的训练任务的时候,csi plugin会尝试去把fuse停掉,咱们通过这样的形式就保障了多个alluxio集群和训练节点在k8s集群调度最大的可控性以及灵活性。

为了实现Alluxio集群在k8s上的动静扩缩容,咱们还引入了一些其余的组件,首先咱们引入了prometheus,对整个Alluxio以及Alluxio相干的指标进行了采集,作为HPA的判断条件,在调度层面上咱们通过二次开发引进了volcano调度器,定制了咱们外部的磁盘调度逻辑,资源的预留逻辑以及OOM Kill pervent,在fuse的调度流程里,比方TensorFlow pod可能应用的是20个GB的一个存储,fuse自身也须要10GB的存储,所以咱们应用volcano为这个节点进行一部分的资源预留。

在上面这个PPT外面,咱们次要说一下应用Alluxio集群动静扩缩容的一些指标,以及具体的HPA。如上图所示左侧,咱们能够看到有一些指标,指标来自于两局部,一部分是AlluxioRuntime默认提供的一些metrics,这部分metrics被咱们应用prometheus收集到,通过cluster metrics能够进行HPA的管制。

如何在不失落数据的状况下对alluxio集群进行缩容是咱们比拟关注的,为实现缩容咱们对Alluxio java client和Fluid缩容逻辑也进行了二次开发,从而在Fluid中开发了一个指标叫dataset client number,这个指标是通过prometheus的exporter动静地去监控整个集群里有多少利用在应用某一个数据集。如果是有一个利用在应用的话,咱们就会把这个数量进行固定的加1,而后咱们对下面提到的这些指标进行一些加权计算,组成了一个固定的算法从新生成了capacity_used_rate这个metrics,用于管制 HPA的扩缩容。在右侧,咱们当初看到的HPA的CR里会有一个min replicas和max replicas。当用户去申请一个固定容量的Alluxio集群的时候,比如说他申请了1个10T的Alluxio集群,咱们会用10T除以每个节点能够调配的内存,求得1个最大的worker正本数。比如说是每个节点1个T,那这样的话将会有10个节点,就是说Alluxio集群最大能够有10个worker,最小的容量个别设置为1或者0,也就是说在没有人应用的时候,Alluxio集群将会被缩容到1个正本或0个正本,这样能够最大限度的节俭集群的资源,也能够最大限度地保障所有的Alluxio集群的节点都能够在k8s上进行动静的扩缩容,同时能够保证数据不会产生失落。

在上面的behavior里咱们做了一些非凡的解决,为了保障当用户应用Alluxio的时候,Alluxio的worker能够尽快扩起来,咱们应用了一个速率,比如说300,每次扩3个正本。在缩容的局部咱们是选用了每60秒缩容一个正本,比如说有两个利用在应用Alluxio的集群,可能两个利用都停掉了,在一分钟之内又有别的利用被调度上,这个时候Alluxio集群是不会进行缩容的。

上图咱们做了一个比拟大的图,显示了HPA的整体流程,首先在集群的最上层咱们应用worker deployment替换了daemonset,管控Alluxio worker pod,同时会有一个master statefulsets.apps.kruise.io用于管控Alluxio的master,这里咱们将Fluid原有的statefulset替换为kruise的statefulset,进步master HA模式在滚动重启时的稳定性。另外还会有相应的一个controller,AlluxioRuntime controller会应用prometheus的exporter把自定义的指标汇报给prometheus,最初通过prometheus adapter做为HPA的metrics。

通过这样的形式,能够保障比拟高的数据写入以及模型训练的速度,且在没有数据失落的状况下,实现了worker 在集群中动静进行扩容和缩容

上面咱们次要介绍后续想要对Alluxio进行的一些扩大。


在咱们整个的生产实践中,咱们发现Alluxio worker在Spark进行大规模的写入的时候,它会占用比拟多的堆外内存,对咱们的集群会有比拟大的累赘。因而咱们下一步会和社区进行单干,缩小每一个Alluxio worker的间接内存的应用。

另外从fuse的角度来说,因为fuse的一些不稳固状况会间接导致TensorFlow训练任务或者PyTorch训练任务的失败,因而咱们正在尝试调度层面以及csi plugin的层面做一些fuse pod的remount,比如说fuse pod呈现一些异样行为的时候,咱们会在TensorFlow或者PyTorch的pod里从新把操作系统进行一些挂载,使训练能够持续往下进行。

另外,因为咱们数据的流向是通过Spark到Alluxio,再到ceph的这样一个流向,咱们目前在写入的过程中会把Alluxio数据pin住,保证数据肯定是全副写到了ceph后才会进行驱赶,因而咱们这里将会拓展数据的驱赶策略,也就是说数据在保留到ceph里之后,就能够进行驱赶。

最初,非常感激Alluxio社区范斌老师,邱露老师,朱禹老师,Fluid社区车漾老师对咱们的大力支持及披星戴月的致力工作。对Alluxio和Fluid的批改咱们将尽快反馈给社区,期待和Alluxio,Fluid社区的进一步深度单干。