如何落地一个算法

38次阅读

共计 4603 个字符,预计需要花费 12 分钟才能阅读完成。

简介: 在解决实际问题的时候,很多人认为只要有机器学习算法就可以了,实际上要把一个算法落地还需要解决很多工程上的难题。本文将和大家分享如何从零开始搭建一个 GPU 加速的分布式机器学习系统,介绍在搭建过程中遇到的问题和解决方法。

一 背景

在云计算环境下,虚拟机的负载均衡、自动伸缩、绿色节能以及宿主机升级等需求使得我们需要利用虚拟机 (VM) 迁移技术,尤其是虚拟机热迁移技术,对于 down time(停机时间)要求比较高,停机时间越短,客户业务中断时间就越短,影响就越小。如果能够根据 VM 的历史工作负载预测其未来的工作负载趋势,就能够寻找到最合适的时间窗口完成虚拟机热迁移的操作。

于是我们开始探索如何用机器学习算法预测 ECS 虚拟机的负载以及热迁移的停机时间,但是机器学习算法要在生产环境发挥作用,还需要很多配套系统去支持。为了能快速将现有算法在实际生产环境落地,并能利用 GPU 加速实现大规模计算,我们自己搭建了一个 GPU 加速的大规模分布式机器学习系统,取名小诸葛,作为 ECS 数据中台的异构机器学习算法加速引擎。搭载以上算法的小诸葛已经在生产环境上线,支撑阿里云全网规模的虚拟机的大规模热迁移预测。

二 方案

那么一套完整大规模分布式系统机器学习系统需要哪些组成部分呢?

1 总体架构

阿里云全网如此大规模的虚拟机数量,要实现 24 小时之内完成预测,需要在端到端整个流程的每一个环节做优化。所以这必然是一个复杂的工程实现,为了高效的搭建这个平台,大量使用了现有阿里云上的产品服务来搭建。

整个平台包含:Web 服务、MQ 消息队列、Redis 数据库、SLS/MaxComputer/HybridDB 数据获取、OSS 模型仓库的上传下载、GPU 云服务器、DASK 分布式框架、RAPIDS 加速库。

1)架构

下图是小诸葛的总体架构图。

小诸葛是基于 RAPIDS+DASK 搭建的一个端到端的 GPU 加速的机器学习平台,整个平台都是基于阿里云上的产品和服务来搭建的。

我们在前端提供了一个基于 Tengine+Flask 的 Web 服务用于接受客户端发送来的数据计算请求,并利用消息队列与后端的大规模计算集群解耦。

Dask 分布式框架则提供了数据准备和模型训练以及预测的计算节点的管理和调度,同时我们使用了阿里云的 MaxComputer 做训练阶段离线数据的处理,使用 Blink 等实时计算引擎做预测阶段的在线数据处理,使用 HybridDB 分析型数据库存放处理过的在线数据用于实时预测的数据拉取,并使用阿里云的对象存储服务 OSS 来获取训练数据和保存训练模型,使用 GPU 云服务器加速机器学习的运算。

2)设计思考

下面讲下平台的核心设计思考。

一个是分布式消息队列的使用:

  • 首先可以实现前端业务平台与后端计算系统的解耦,实现业务处理异步化。
  • 还可以实现高并发:使得系统支持百万以上规模的高并发读写。
  • 另外,如果后端系统出现故障,消息可以在队列里堆积且不丢失,待后端系统恢复后可以继续处理请求,满足高可用。
  • 消息队列的消费者可以是多套计算系统,而且多套系统可以做轮转升级,不影响前端业务,实现了高扩展。

另一个是 GPU 加速的分布式并行计算后端的设计:

  • 计算资源选择的是阿里云的 GPU 云服务器。分布式并行计算框架我选择了轻量级的 DASK,它更易用更灵活,可以写出自由度更高的并行计算代码,且可以与 GPU 机器学习加速库 RAPIDS 很好的结合。
  • 同时通过与 MaxComputer、HybridDB 等多个数仓的数据链路打通,实现了一个从数据准备、离线训练到在线预测的端到端的计算平台。
  • 我们在数据仓库的选择上做了很多评估和相应的优化设计工作,MaxComputer 因其实时性较差用于离线训练数据仓库,SLS 实时性不错但不适合大规模并发访问,对于实时预测其数据读取性能也无法满足需求,所以实时预测选择了性能和并发规模更好的 Cstore(HybridDB for MySQL,现已升级为 AnalyticDB)。

整个平台的搭建涉及内部多个业务团队的合作,就业务需求的分析从而确定了最终算法,以及在数据 ETL 和数据源性能和稳定性方面的方案确定,和就预测结果如何应用于热迁移任务执行的方案确定,最终实现了一个端到端的平台达成了业务目标。

2 消息队列

消息队列使用的是阿里云的 RocketMQ。

消息队列的使用需要考虑以下几个问题:

1)消息幂等

用于解决投递时消息重复的问题。

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

我们使用 Redis 数据库记录每条消息的 Message Key 用于幂等性,在消费时如果发现有重复投递的消息会丢弃掉,避免消息被重复消费执行。

2)消息是无序还是顺序

对于全网预测的批量消息处理,是不需要考虑消息的顺序的,所以为了保证消息的处理性能我们选择无序消息。

3 数据处理及数据平台的选择

数据是一切的根本,首先需要解决海量数据的存储、分析和处理。

  • 我们需要处理的数据可以是如下的不同种类:
  • 实时数据和非实时数据
  • 格式化数据和非格式化数据
  • 需要索引的数据和只需要计算的数据
  • 全量数据和抽样数据
  • 可视化数据和告警数据

每一个分类都对应一种或者多种数据处理、分析和存储方式。

多维度和多数据源是另一个要考虑的问题。对于相对复杂的业务场景,往往需要不同维度的数据(比如我们做热迁移预测使用了实时的 CPU 利用率数据,还有虚拟机规格等其它多个维度的数据)综合起来考虑。同样,负载场景下也不会只产生一种类型的数据,不是所有数据都是使用统一的方式处理和存储,所以具体实践中往往会使用多个不同的数据源。

公有云上的海量数据都达到了 TB、PB 以上的级别,传统的数据存储方式已经满足不了需求,因此针对大数据的存储诞生了 Hadoop 生态。传统的系统数据存储方式在数据量达到一定规模后会带来一系列问题:性能问题、成本问题、单点故障问题、数据准确性问题等等。取而代之的是以 HDFS 为代表的分布式存储系统。

除了数据存储的问题,实时数据的采集也很重要。业务系统都有各自的实时日志,日志收集工具都和业务服务部署在一起,为了不和线上服务抢占资源,日志收集必须要严格控制占用的资源。同时也不能在收集端进行日志清洗和分析操作,而应该集中收集到一个地方后再处理。

就我们使用的数据仓库而言,初期选择的是 ODPS(即 MaxCompute,类似于开源的 Hadoop 系统)和 SLS(阿里云的日志服务)。ODPS 可作为离线数据仓库存储海量的历史数据,而 SLS 则存放了海量的实时监控数据(比如我们使用的 ECS 虚拟机的 CPU 利用率数据)。

但是数据太多了又会出现信息过载的情况,所以往往需要对数据做聚合后再使用(比如我们 CPU 利用率的预测是对原始的分钟级采样数据分别做了 5 分钟平均和 1 小时平均的聚合)。因为我们发现 SLS 自带的聚合计算因为计算量太大导致速度非常的慢而无法满足实际计算需求。所以数据中台使用实时计算平台 Blink 将聚合好的数据存放在了新的 SLS 仓库里供我们实际计算使用。Blink 是集团内部基于 Apache 开源的实时计算流处理平台 Flink 进行定制开发和优化后的流计算平台。

在大规模的线上预测时我们又发现,SLS 根本无法满足高并发、低延迟的预测数据的拉取,常常因为排队拉不到数据或者拉取速度太慢导致大幅增加预测延迟,在经过评估测试后,我们选择了 ECS 数据中台提供的 Cstore 数仓存放聚合后的数据,从 Cstore 拉取预测需要的数据,从而解决了高并发、低延迟预测数据的拉取问题。

4 GPU 加速的分布式并行计算后端的搭建

整个分布式并行计算后端的核心是并行计算框架的选择以及 GPU 加速。

1)框架选择

在分布式并行计算框架的选择上,有如下一些考虑,SPARK 是目前大数据计算的主流分布式并行计算框架,但受限于 CPU 的性能和成本及 SPARK 任务无法获得 GPU 加速(当时搭建小诸葛的时候,SPARK 还没有提供 GPU 加速的完善支持,后来发布的 SPARK 3.0 预览版开始已经提供了 GPU 加速的支持,这块的工作我们一直在保持关注和投入,后续会更新相关进展),无法满足全网大规模预测的需求,我们选择了 DASK 这个轻量级的分布式计算框架结合 GPU 加速库 RAPIDS 在 GPU 云服务器加速我们的算法。

我们利用 DASK 并行框架惰性计算的特点及提供的代码打包分发所有 Dask Worker 能力,将 Worker 执行代码通过 Dask Scheduler 分发到各个 Worker 节点,并在后端消息队列消费者收到计算任务后再通过 Dask Client 将执行任务递交到 Dask Scheduler,由 Dask Scheduler 负责将计算任务调度到指定的 Worker 节点上完成相应的计算任务。可以看到 DASK 框架的架构和执行流程跟 Spark 是很像的,只不过 Dask 更轻量级一些,且是 Python 语言生态的框架,适合集成 RAPIDS。

根据业务需求,我们设计了以下几种计算任务:数据准备任务、训练任务、预测任务,并为不同的任务配置了相应的 Dask Worker 完成相应计算。与此相适应的消息队列也设计了相应的消息 Topic,Web Server 也设计了相应的统一的 HTTP 报文格式。

训练和计算任务的 Worker 部署在 GPU 服务器上,而数据准备阶段目前没有 GPU 加速则部署在 CPU 服务器上。

针对每种任务,设计了丰富的参数选择,可以灵活支持预测目标(集群维度、NC 维度、VM 维度)、算法模型(ARIMA、LSTM、XGBoost 等)、算法任务(回归任务、分类任务等)等不同的计算任务。

计算后端与多个数据源打通,实现离线训练数据(ODPS)和在线预测数据(CStore)的自动拉取,模型的自动保存和拉取(OSS),构成了一个闭环的端到端的计算平台。

2)GPU 加速

为了提升计算的效率,我们采用了 RAPIDS 加速库实现了核心算法的 GPU 加速。

RAPIDS,全称 Real-time Acceleration Platform for Integrated Data Science。顾名思义,RAPIDS 是一个针对数据科学和机器学习的 GPU 加速库。借助 RAPIDS,我们可以使用 GPU 来加速大数据和机器学习算法。

在项目过程中,我们对算法、计算任务流程做了大量的优化,最终只用了 8 台小规格 GPU 服务器就实现了原本需要 50 台 + 大规格 CPU 服务器(2000+ vCPU)才能完成的预测任务,成本大幅下降为之前的 1 /10。

5 模型更新及评估发布系统

一个完整的机器学习平台还需要提供自动的离线训练系统和模型评估和发布系统。

目前线上运行的小诸葛实现了自动化的在线实时预测,但是模型的评估、更新及发布还未完全实现自动化,这也是目前正在补充和完善的工作。

目前,小诸葛已经提供了线上测试评估数据的自动化生成和采集,后续结合自动化的模型评估系统和模型发布系统,将可以实现真正意义上的全流程自动化。

三 总结

云原生背景下,越来越多的业务系统选择在云上构建自己的业务平台,借助于公有云完善的技术生态,使得搭建一个可用于生产环境的企业级平台变得不再那么困难。

同时通过小诸葛平台,实现了 GPU 加速机器学习的工程落地,实际的业务效果来看,也证明了 GPU 在加速数据科学领域的巨大价值潜力。

正文完
 0