Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机减速。这其中,并行和分布式 Python 函数次要利用 Mars Remote API。
启动 Mars 分布式环境能够参考:
- 命令行形式在集群中部署。
- Kubernetes 中部署。
- MaxCompute 开箱即用的环境,购买了 MaxCompute 服务的能够间接应用。
如何应用 Mars Remote API
应用 Mars Remote API 非常简单,只须要对原有的代码做少许改变,就能够分布式执行。
拿用蒙特卡洛办法计算 π 为例。代码如下,咱们编写了两个函数,calc_chunk
用来计算每个分片内落在圆内的点的个数,calc_pi
用来把多个分片 calc_chunk
计算的后果汇总最初得出 π 值。
from typing import List
import numpy as np
def calc_chunk(n: int, i: int):
_# 计算 n 个随机点(x 和 y 轴落在 - 1 到 1 之间)到原点间隔小于 1 的点的个数_
rs = np.random.RandomState(i)
a = rs.uniform(-1, 1, size=(n, 2))
d = np.linalg.norm(a, axis=1)
return (d < 1).sum()
def calc_pi(fs: List[int], N: int):
_# 将若干次 calc_chunk 计算的后果汇总,计算 pi 的值_
return sum(fs) * 4 / N
N = 200_000_000
n = 10_000_000
fs = [calc_chunk(n, i)
for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)
%%time
下能够看到后果:
3.1416312
CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s
Wall time: 12.3 s
在单机须要 12.3 s。
要让这个计算应用 Mars Remote API 并行起来,咱们不须要对函数做任何改变,须要变动的仅仅是最初局部。
import mars.remote as mr
_# 函数调用改成 mars.remote.spawn_
fs = [mr.spawn(calc_chunk, args=(n, i))
for i in range(N // n)]
_# 把 spawn 的列表传入作为参数,再 spawn 新的函数_
pi = mr.spawn(calc_pi, args=(fs, N))
_# 通过 execute() 触发执行,fetch() 获取后果_
print(pi.execute().fetch())
%%time
下看到后果:
3.1416312
CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms
Wall time: 2.85 s
后果截然不同,然而却有数倍的性能晋升。
能够看到,对已有的 Python 代码,Mars remote API 简直不须要做多少改变,就能无效并行和分布式来减速执行过程。
一个例子
为了让读者了解 Mars Remote API 的作用,咱们从另一个例子开始。当初咱们有一个数据集,咱们心愿对它们做一个分类工作。要做分类,咱们有很多算法和库能够抉择,这里咱们用 RandomForest、LogisticRegression,以及 XGBoost。
艰难的中央是,除了有多个模型抉择,这些模型也会蕴含多个超参,那哪个超参成果最好呢?对于调参不那么有教训的同学,跑过了才晓得。所以,咱们心愿能生成一堆可选的超参,而后把他们都跑一遍,看看成果。
筹备数据
这个例子里咱们应用 otto 数据集。
首先,咱们筹备数据。读取数据后,咱们按 2:1 的比例把数据分成训练集和测试集。
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
def gen_data():
df = pd.read_csv('otto/train.csv')
X = df.drop(['target', 'id'], axis=1)
y = df['target']
label_encoder = LabelEncoder()
label_encoder.fit(y)
y = label_encoder.transform(y)
return train_test_split(X, y, test_size=0.33, random_state=123)
X_train, X_test, y_train, y_test = gen_data()
模型
接着,咱们应用 scikit-learn 的 RandomForest 和 LogisticRegression 来解决分类。
RandomForest:
from sklearn.ensemble import RandomForestClassifier
def random_forest(X_train: pd.DataFrame,
y_train: pd.Series,
verbose: bool = False,
**kw):
model = RandomForestClassifier(verbose=verbose, **kw)
model.fit(X_train, y_train)
return model
接着,咱们生成供 RandomForest 应用的超参,咱们用 yield 的形式来迭代返回。
def gen_random_forest_parameters():
for n_estimators in [50, 100, 600]:
for max_depth in [None, 3, 15]:
for criterion in ['gini', 'entropy']:
yield {
'n_estimators': n_estimators,
'max_depth': max_depth,
'criterion': criterion
}
LogisticRegression 也是这个过程。咱们先定义模型。
from sklearn.linear_model import LogisticRegression
def logistic_regression(X_train: pd.DataFrame,
y_train: pd.Series,
verbose: bool = False,
**kw):
model = LogisticRegression(verbose=verbose, **kw)
model.fit(X_train, y_train)
return model
接着生成供 LogisticRegression 应用的超参。
def gen_lr_parameters():
for penalty in ['l2', 'none']:
for tol in [0.1, 0.01, 1e-4]:
yield {
'penalty': penalty,
'tol': tol
}
XGBoost 也是一样,咱们用 XGBClassifier
来执行分类工作。
from xgboost import XGBClassifier
def xgb(X_train: pd.DataFrame,
y_train: pd.Series,
verbose: bool = False,
**kw):
model = XGBClassifier(verbosity=int(verbose), **kw)
model.fit(X_train, y_train)
return model
生成一系列超参。
def gen_xgb_parameters():
for n_estimators in [100, 600]:
for criterion in ['gini', 'entropy']:
for learning_rate in [0.001, 0.1, 0.5]:
yield {
'n_estimators': n_estimators,
'criterion': criterion,
'learning_rate': learning_rate
}
验证
接着咱们编写验证逻辑,这里咱们应用 log_loss
来作为评估函数。
from sklearn.metrics import log_loss
def metric_model(model,
X_test: pd.DataFrame,
y_test: pd.Series) -> float:
if isinstance(model, bytes):
model = pickle.loads(model)
y_pred = model.predict_proba(X_test)
return log_loss(y_test, y_pred)
def train_and_metric(train_func,
train_params: dict,
X_train: pd.DataFrame,
y_train: pd.Series,
X_test: pd.DataFrame,
y_test: pd.Series,
verbose: bool = False
):
_# 把训练和验证封装到一起_
model = train_func(X_train, y_train, verbose=verbose, **train_params)
metric = metric_model(model, X_test, y_test)
return model, metric
找出最好的模型
做好筹备工作后,咱们就开始来跑模型了。针对每个模型,咱们把每次生成的超参们送进去训练,除了这些超参,咱们还把 n_jobs
设成 -1,这样能更好利用单机的多核。
results = []
_# -------------_
_# Random Forest_
_# -------------_
for params in gen_random_forest_parameters():
print(f'calculating on {params}')
_# fixed random_state_
params['random_state'] = 123
_# use all CPU cores_
params['n_jobs'] = -1
model, metric = train_and_metric(random_forest, params,
X_train, y_train,
X_test, y_test)
print(f'metric: {metric}')
results.append({'model': model,
'metric': metric})
_# -------------------_
_# Logistic Regression_
_# -------------------_
for params in gen_lr_parameters():
print(f'calculating on {params}')
_# fixed random_state_
params['random_state'] = 123
_# use all CPU cores_
params['n_jobs'] = -1
model, metric = train_and_metric(logistic_regression, params,
X_train, y_train,
X_test, y_test)
print(f'metric: {metric}')
results.append({'model': model,
'metric': metric})
_# -------_
_# XGBoost_
_# -------_
for params in gen_xgb_parameters():
print(f'calculating on {params}')
_# fixed random_state_
params['random_state'] = 123
_# use all CPU cores_
params['n_jobs'] = -1
model, metric = train_and_metric(xgb, params,
X_train, y_train,
X_test, y_test)
print(f'metric: {metric}')
results.append({'model': model,
'metric': metric})
运行一下,须要相当长时间,咱们省略掉一部分输入内容。
calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'}
metric: 0.6964123781828575
calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'}
metric: 0.6912312790832288
_# 省略其余模型的输入后果_
CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s
Wall time: 31min 44s
从 CPU 工夫和 Wall 工夫,能看进去这些训练还是充分利用了多核的性能。但整个过程还是破费了 31 分钟。
应用 Remote API 分布式减速
当初咱们尝试应用 Remote API 通过分布式形式减速整个过程。
集群方面,咱们应用最开始说的第三种形式,间接在 MaxCompute 上拉起一个集群。大家能够抉择其余形式,成果是一样的。
n_cores = 8
mem = 2 * n_cores _# 16G_
_# o 是 MaxCompute 入口,这里创立 10 个 worker 的集群,每个 worker 8 核 16G_
cluster = o.create_mars_cluster(10, n_cores, mem, image='extended')
为了不便在分布式读取数据,咱们对数据处理稍作改变,把数据上传到 MaxCompute 资源。对于其余环境,用户能够思考 HDFS、Aliyun OSS 或者 Amazon S3 等存储。
if not o.exist_resource('otto_train.csv'):
with open('otto/train.csv') as f:
_# 上传资源_
o.create_resource('otto_train.csv', 'file', fileobj=f)
def gen_data():
_# 改成从资源读取_
df = pd.read_csv(o.open_resource('otto_train.csv'))
X = df.drop(['target', 'id'], axis=1)
y = df['target']
label_encoder = LabelEncoder()
label_encoder.fit(y)
y = label_encoder.transform(y)
return train_test_split(X, y, test_size=0.33, random_state=123)
稍作改变之后,咱们应用 mars.remote.spawn
办法来让 gen_data
调度到集群上运行。
import mars.remote as mr
_# n_output 阐明是 4 输入_
_# execute() 执行后,数据会读取到 Mars 集群外部_
data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute()
_# remote_ 结尾的都是 Mars 对象,这时候数据在集群内,这些对象只是援用_
remote_X_train, remote_X_test, remote_y_train, remote_y_test = data
目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,还不能序列化模型,所以,咱们要对 train_and_metric
稍作改变,把模型 pickle 了之后再返回。
def distributed_train_and_metric(train_func,
train_params: dict,
X_train: pd.DataFrame,
y_train: pd.Series,
X_test: pd.DataFrame,
y_test: pd.Series,
verbose: bool = False
):
model, metric = train_and_metric(train_func, train_params,
X_train, y_train,
X_test, y_test, verbose=verbose)
return pickle.dumps(model), metric
后续 Mars 反对了序列化模型后能够间接 spawn 本来的函数。
接着咱们就对后面的执行过程稍作改变,把函数调用全副都用 mars.remote.spawn
来改写。
import numpy as np
tasks = []
models = []
metrics = []
_# -------------_
_# Random Forest_
_# -------------_
for params in gen_random_forest_parameters():
_# fixed random_state_
params['random_state'] = 123
task = mr.spawn(distributed_train_and_metric,
args=(random_forest, params,
remote_X_train, remote_y_train,
remote_X_test, remote_y_test),
kwargs={'verbose': 2},
n_output=2
)
tasks.extend(task)
_# 把模型和评估别离存储_
models.append(task[0])
metrics.append(task[1])
_# -------------------_
_# Logistic Regression_
_# -------------------_
for params in gen_lr_parameters():
_# fixed random_state_
params['random_state'] = 123
task = mr.spawn(distributed_train_and_metric,
args=(logistic_regression, params,
remote_X_train, remote_y_train,
remote_X_test, remote_y_test),
kwargs={'verbose': 2},
n_output=2
)
tasks.extend(task)
_# 把模型和评估别离存储_
models.append(task[0])
metrics.append(task[1])
_# -------_
_# XGBoost_
_# -------_
for params in gen_xgb_parameters():
_# fixed random_state_
params['random_state'] = 123
_# 再指定并发为核的个数_
params['n_jobs'] = n_cores
task = mr.spawn(distributed_train_and_metric,
args=(xgb, params,
remote_X_train, remote_y_train,
remote_X_test, remote_y_test),
kwargs={'verbose': 2},
n_output=2
)
tasks.extend(task)
_# 把模型和评估别离存储_
models.append(task[0])
metrics.append(task[1])
_# 把程序打乱,目标是能扩散到 worker 上均匀一点_
shuffled_tasks = np.random.permutation(tasks)
_ = mr.ExecutableTuple(shuffled_tasks).execute()
能够看到代码简直统一。
运行查看后果:
CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms
Wall time: 1min 59s
工夫一下子从 31 分钟多来到了 2 分钟,晋升 15x+。但代码批改的代价能够忽略不计。
仔细的读者可能留神到了,分布式运行的代码中,咱们把模型的 verbose 给关上了,在分布式环境下,因为这些函数近程执行,打印的内容只会输入到 worker 的规范输入流,咱们在客户端不会看到打印的后果,但 Mars 提供了一个十分有用的接口来让咱们查看每个模型运行时的输入。
以第 0 个模型为例,咱们能够在 Mars 对象上间接调用 fetch_log
办法。
print(models[0].fetch_log())
输入咱们简略一部分。
building tree 1 of 50
building tree 2 of 50
building tree 3 of 50
building tree 4 of 50
building tree 5 of 50
building tree 6 of 50
_# 两头省略_
building tree 49 of 50
building tree 50 of 50
要看哪个模型都能够通过这种形式。试想下,如果没有 fetch_log
API,你确想看两头过程的输入有多麻烦。首先这个函数在哪个 worker 上执行,不得而知;而后,即使晓得是哪个 worker,因为每个 worker 上可能有多个函数执行,这些输入就可能混淆在一起,甚至被宏大日志吞没了。fetch_log
接口让用户不须要关怀在哪个 worker 上执行,也不必放心日志混合在一起。
想要理解 fetch_log
接口,能够查看 文档。
还有更多
Mars Remote API 的能力其实不止这些,举个例子,在 remote 外部能够 spawn 新的函数;也能够调用 Mars tensor、DataFrame 或者 learn 的算法。这些内容,读者们能够后行摸索,后续咱们再写别的文章介绍。
总结
Mars Remote API 通过并行和分布式 Python 函数,用很小的批改代价,极大晋升了执行效率。
原文链接
本文为阿里云原创内容,未经容许不得转载。