摘要: 本文主要谈了一些分布式计算框架方面的心得。
如果问 mapreduce 和 spark 什么关系,或者说有什么共同属性,你可能会回答他们都是大数据处理引擎。如果问 spark 与 tensorflow 呢,就可能有点迷糊,这俩关注的领域不太一样啊。但是再问 spark 与 MPI 呢?这个就更远了。虽然这样问多少有些不严谨,但是它们都有共同的一部分,这就是我们今天谈论的一个话题,一个比较大的话题:分布式计算框架。
不管是 mapreduce,还是 spark 亦或 tensorflow,它们都是利用分布式的能力,运行某些计算,解决一些特定的问题。从这个 level 讲,它们都定义了一种“分布式计算模型”,即提出了一种计算的方法,通过这种计算方法,就能够解决大量数据的分布式计算问题。它们的区别在于提出的分布式计算模型不同。Mapreduce 正如其名,是一个很基本的 map-reduce 式的计算模型(好像没说一样)。Spark 定义了一套 RDD 模型,本质上是一系列的 map/reduce 组成的一个 DAG 图。Tensorflow 的计算模型也是一张图,但是 tensorflow 的图比起 spark 来,显得更“复杂”一点。你需要为图中的每个节点和边作出定义。根据这些定义,可以指导 tensorflow 如何计算这张图。Tensorflow 的这种具体化的定义使它比较适合处理特定类型的的计算,对 tensorflow 来讲就是神经网络。而 spark 的 RDD 模型使它比较适合那种没有相互关联的的数据并行任务。那么有没有一种通用的、简单的、性能还高的分布式计算模型?我觉着挺难。通用往往意味着性能不能针对具体情形作出优化。而为专门任务写的分布式任务又做不到通用,当然也做不到简单。
插一句题外话,分布式计算模型有一块伴随的内容,就是调度。虽然不怎么受关注,但这是分布式计算引擎必备的东西。mapreduce 的调度是 yarn,spark 的调度有自己内嵌的调度器,tensorflow 也一样。MPI 呢?它的调度就是几乎没有调度,一切假设集群有资源,靠 ssh 把所有任务拉起来。调度实际上应当分为资源调度器和任务调度器。前者用于向一些资源管理者申请一些硬件资源,后者用于将计算图中的任务下发到这些远程资源进行计算,其实也就是所谓的两阶段调度。近年来有一些 TensorflowOnSpark 之类的项目。这类项目的本质实际上是用 spark 的资源调度,加上 tensorflow 的计算模型。
当我们写完一个单机程序,而面临数据量上的问题的时候,一个自然的想法就是,我能不能让它运行在分布式的环境中?如果能够不加改动或稍加改动就能让它分布式化,那就太好了。当然现实是比较残酷的。通常情况下,对于一个一般性的程序,用户需要自己手动编写它的分布式版本,利用比如 MPI 之类的框架,自己控制数据的分发、汇总,自己对任务的失败做容灾(通常没有容灾)。如果要处理的目标是恰好是对一批数据进行批量化处理,那么 可以用 mapreduce 或者 spark 预定义的 api。对于这一类任务,计算框架已经帮我们把业务之外的部分(脚手架代码)做好了。同样的,如果我们的任务是训练一个神经网络,那么用 tensorflow pytorch 之类的框架就好了。这段话的意思是,如果你要处理的问题已经有了对应框架,那么拿来用就好了。但是如果没有呢?除了自己实现之外有没有什么别的办法呢?
今天注意到一个项目,Ray,声称你只需要稍微修改一下你的代码,就能让它变为分布式的(实际上这个项目早就发布了,只是一直没有刻意关注它)。当然这个代码仅局限于 python,比如下面这个例子,
| **Basic Python** | **Distributed with Ray** |
+------------------------------------------------+----------------------------------------------------+
| | |
| # Execute f serially. | # Execute f in parallel. |
| | |
| | @ray.remote |
| def f(): | def f(): |
| time.sleep(1) | time.sleep(1) |
| return 1 | return 1 |
| | |
| | |
| | ray.init() |
| results = [f() for i in range(4)] | results = ray.get([f.remote() for i in range(4)]) |
+------------------------------------------------+----------------------------------------------------+
这么简单?这样笔者想到了 openmp(注意不是 openmpi)。来看看,
#include<iostream>
#include"omp.h"
using namespace std;
void main() {
#pragma omp parallel for
for(int i = 0; i < 10; ++i) {cout << "Test" << endl;}
system("pause");
}
把头文件导入,添加一行预处理指令就可以了,这段代码立马变为并行执行。当然 openmp 不是分布式,只是借助编译器将代码中需要并行化的部分编译为多线程运行,本身还是一个进程,因此其并行度收到 CPU 线程数量所限。如果 CPU 是双线程,那只能 2 倍加速。在一些服务器上,CPU 可以是单核 32 线程,自然能够享受到 32 倍加速(被并行化的部分)。不过这些都不重要,在用户看来,Ray 的这个做法和 openmp 是不是有几分相似之处?你不需要做过多的代码改动,就能将代码变为分布式执行(当然 openmp 要更绝一点,因为对于不支持 openmp 的编译器它就是一行注释而已)。
那么 Ray 是怎么做到这一点的呢?其实 Ray 的做法说起来也比较简单,就是定义了一些 API,类似于 MPI 中的定义的通信原语。使用的时候,将这些 API“注入”到代码合适的位置,那么代码就变成了用户代码夹杂着一些 Ray 框架层的 API 调用,整个代码实际上就形成了一张计算图。接下来的事情就是等待 Ray 把这张计算图完成返回就好了。Ray 的论文给了个例子:
@ray.remote
def create_policy():
# Initialize the policy randomly.
return policy
@ray.remote(num_gpus=1)
class Simulator(object):
def __init__(self):
# Initialize the environment.
self.env = Environment()
def rollout(self, policy, num_steps):
observations = []
observation = self.env.current_state()
for _ in range(num_steps):
action = policy(observation)
observation = self.env.step(action)
observations.append(observation)
return observations
@ray.remote(num_gpus=2)
def update_policy(policy, *rollouts):
# Update the policy.
return policy
@ray.remote
def train_policy():
# Create a policy.
policy_id = create_policy.remote()
# Create 10 actors.
simulators = [Simulator.remote() for _ in range(10)]
# Do 100 steps of training.
for _ in range(100):
# Perform one rollout on each actor.
rollout_ids = [s.rollout.remote(policy_id)
for s in simulators]
# Update the policy with the rollouts.
policy_id = update_policy.remote(policy_id, *rollout_ids)
return ray.get(policy_id)
生成的计算图为
所以,用户要做的事情,就是在自己的代码里加入适当的 Ray API 调用,然后自己的代码就实际上变成了一张分布式计算图了。作为对比,我们再来看看 tensorflow 对图的定义,
import tensorflow as tf
# 创建数据流图:y = W * x + b,其中 W 和 b 为存储节点,x 为数据节点。x = tf.placeholder(tf.float32)
W = tf.Variable(1.0)
b = tf.Variable(1.0)
y = W * x + b
with tf.Session() as sess:
tf.global_variables_initializer().run() # Operation.run
fetch = y.eval(feed_dict={x: 3.0}) # Tensor.eval
print(fetch) # fetch = 1.0 * 3.0 + 1.0
'''
输出:4.0
'''
可以看出,tensorflow 中是自己需要自己显式的、明确的定义出图的节点,placeholder Variable 等等(这些都是图节点的具体类型),而 Ray 中图是以一种隐式的方式定义的。我认为后者是一种更自然的方式,站在开发者的角度看问题,而前者更像是为了使用 tensorflow 把自己代码逻辑去适配这个轮子。
那么 ray 是不是就我们要寻找的那个即通用、又简单、还灵活的分布式计算框架呢?由于笔者没有太多的 ray 的使用经验,这个问题不太好说。从官方介绍来看,有限的几个 API 确实是足够简单的。仅靠这几个 API 能不能达成通用且灵活的目的还不好讲。本质上来说,Tensorflow 对图的定义也足够 General,但是它并不是一个通用的分布式计算框架。由于某些问题不在于框架,而在于问题本身的分布式化就存在困难,所以试图寻求一种通用分布式计算框架解决单机问题可能是个伪命题。
话扯远了。假设 ray 能够让我们以一种比较容易的方式分布式地执行程序,那么会怎么样呢?前不久 Databricks 开源了一个新项目,Koalas,试图以 RDD 的框架并行化 pandas。由于 pandas 的场景是数据分析,和 spark 面对的场景类似,两者的底层存储结构、概念也是很相似的,因此用 RDD 来分布式化 pandas 也是可行的。我想,如果 ray 足够简单好用,在 pandas 里加一些 ray 的 api 调用花费的时间精力可能会远远小于开发一套 koalas。但是在 pandas 里加 ray 就把 pandas 绑定到了 ray 上,即便单机也是这样,因为 ray 做不到像 openmp 那样如果支持,很好,不支持也不影响代码运行。
啰嗦这么多,其实就想从这么多引擎的细节中跳出来,思考一下到底什么是分布式计算框架,每种框架又是设计的,解决什么问题,有什么优缺点。最后拿大佬的一个观点结束本文。David Patterson 在演讲“New Golden Age For Computer Architecture”中提到,通用硬件越来越逼近极限,要想要达到更高的效率,我们需要设计面向领域的架构(Domain Specific Architectures)。这是一个计算架构层出不穷的时代,每种架构都是为了解决其面对的领域问题出现的,必然包含对其问题的特殊优化。通用性不是用户解决问题的出发点,而更多的是框架设计者的“一厢情愿”,用户关注的永远是领域问题。从这个意义上讲,面向领域的计算架构应该才是正确的方向。
声明:限于本人水平有限,文中陈述内容可能有误。欢迎批评指正。
本文作者:EMR
阅读原文
本文为云栖社区原创内容,未经允许不得转载。