作者:东哥腾飞
首发于公众号:Python 数据迷信
对于 Pandas
运行速度的晋升办法,之前曾经介绍过很多回了,外面常常提及Dask
,很多敌人没接触过可能不太理解,明天就举荐一下这个神器。
1、什么是 Dask?
Pandas
和 Numpy
大家都不生疏了,代码运行后数据都加载到 RAM 中,如果数据集特地大,咱们就会看到内存飙升。但有时要解决的数据并不适宜 RAM
,这时候Dask
来了。
Dask
是开源收费的。它是与其余社区我的项目(如 Numpy,Pandas 和 Scikit-Learn)协调开发的。
官网:https://dask.org/
Dask
反对 Pandas
的DataFrame
和 NumpyArray
的数据结构,并且既可在本地计算机上运行,也能够扩大到在集群上运行。
基本上,只有编写一次代码,应用一般的 Pythonic
语法,就可在本地运行或部署到多节点集群上。这自身就是一个很牛逼的性能了,但这还不是最牛逼的。
我感觉 Dask
的最牛逼的性能是:它兼容大部分咱们曾经在用的工具,并且只需改变大量的代码,就能够利用本人笔记本电脑上已有的解决能力并行运行代码。而并行处理数据就意味着更少的执行工夫,更少的等待时间和更多的剖析工夫。
上面这个就是 Dask
进行数据处理的大抵流程。
2、Dask 反对哪些现有工具?
这一点也是我比拟看中的,因为 Dask
能够与 Python
数据处理和建模的库包兼容,沿用库包的 API,这对于 Python 使用者来说学习老本是极低的。而像 Hadoop
、Spark
这种大数据处理是有很高的学习门槛和工夫老本的。
目前,Dask
可反对 pandas
、Numpy
、Sklearn
、XGBoost
、XArray
、RAPIDS
等等,光是这几项我感觉就足够用了,至多对于罕用的数据处理、建模剖析是齐全笼罩得掉的。
3、Dask 装置
能够应用 conda
或者 pip
,或从源代码装置dask
。
conda install dask
因为 dask
有很多依赖,所以为了疾速装置也可用上面代码,将装置运行 Dask
所需的起码依赖关系集。
conda install dask-core
再有就是通过源来装置。
git clone https://github.com/dask/dask.git
cd dask
python -m pip install .
4、Dask 如何应用?
Numpy、pandas
Dask
引入了 3 个并行汇合,它们能够存储大于 RAM 的数据,这些汇合有DataFrame
、Bags
、Arrays
。这些汇合类型中的每一个都可能应用在 RAM 和硬盘之间分区的数据,以及散布在群集中多个节点上的数据。
Dask
的应用是十分清晰的,如果你应用 NumPy
数组,就从 Dask
数组开始,如果你应用 Pandas DataFrame
,就从Dask DataFrame
开始,依此类推。
import dask.array as da
x = da.random.uniform(low=0, high=10, size=(10000, 10000), # normal numpy code
chunks=(1000, 1000)) # break into chunks of size 1000x1000
y = x + x.T - x.mean(axis=0) # Use normal syntax for high level algorithms
# DataFrames
import dask.dataframe as dd
df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp', # normal Pandas code
blocksize=64000000) # break text into 64MB chunks
s = df.groupby('name').balance.mean() # Use normal syntax for high level algorithms
# Bags / lists
import dask.bag as db
b = db.read_text('*.json').map(json.loads)
total = (b.filter(lambda d: d['name'] == 'Alice')
.map(lambda d: d['balance'])
.sum())
这些高级接口在稍微变动的状况下复制了标准接口。对于原始我的项目中的大部分 API,这些接口会主动为咱们并行处理较大的数据集,实现上不是很简单,对照 Dask
的 doc 文档即可一步步实现。
Delayed
上面说一下 Dask
的 Delay
性能,十分弱小。
Dask.delayed
是一种并行化现有代码的简略而弱小的办法。之所以被叫做 delayed
是因为,它没有立刻计算出后果,而是将要作为工作计算的后果记录在一个图形中,稍后将在并行硬件上运行。
有时问题用已有的 dask.array
或dask.dataframe
可能都不适宜,在这些状况下,咱们能够应用更简略的 dask.delayed
界面并行化自定义算法。例如上面这个例子。
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = sum(output)
45
下面代码在单个线程中按程序运行。然而,咱们看到其中很多能够并行执行。Dask delayed
函数可润饰 inc
、double
这些函数,以便它们可提早运行,而不是立刻执行函数,它将函数及其参数放入计算工作图中。
咱们简略批改代码,用 delayed
函数包装一下。
import dask
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a, b)
output.append(c)
total = dask.delayed(sum)(output)
代码运行后 inc
、double
、add
和sum
都还没有产生,而是生成一个计算的工作图交给了 total
。而后咱们用visualizatize
看下工作图。
total.visualize()
上图显著看到了并行的可能性,所以毫不犹豫,应用 compute
进行并行计算,这时才实现了计算。
>>> total.compute()
45
因为数据集较小无奈比拟工夫,这里只介绍下应用办法,具体可本人入手实际下。
Sklearn 机器学习
对于机器学习的并行化执行,因为内容较多,东哥会在另一篇文章开展。这里简略说下一下dask-learn
。
dask-learn
我的项目是与 Sklearn
开发人员合作实现的。当初可实现并行化有 Scikit-learn
的Pipeline
、GridsearchCV
和 RandomSearchCV
以及这些的变体,它们能够更好地解决嵌套的并行操作。
因而,如果你将 sklearn
替换为dklearn
,那么速度将会晋升很多。
# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV
# from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline
上面是一个应用 Pipeline 的示例,其中利用了 PCA 和逻辑回归。from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000,
n_features=500,
n_classes=2,
n_redundant=250,
random_state=42)
from sklearn import linear_model, decomposition
from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline
logistic = linear_model.LogisticRegression()
pca = decomposition.PCA()
pipe = Pipeline(steps=[('pca', pca),
('logistic', logistic)])
grid = dict(pca__n_components=[50, 100, 150, 250],
logistic__C=[1e-4, 1.0, 10, 1e4],
logistic__penalty=['l1', 'l2'])
# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV
estimator = GridSearchCV(pipe, grid)
estimator.fit(X, y)
后果是:sklearn
会在 40 秒钟左右执行此计算,而 dask-learn
替代品大概须要 10 秒钟。
另外,如果增加以下代码能够连贯到集群,通过 Client
能够展现整个计算过程的 dashboard
,由Bokeh
实现。
from dask.distributed import Client
c = Client('scheduler-address:8786')
5、总结
以上就是 Dask
的简略介绍,Dask 的性能是十分弱小的,且阐明文档也十分全,既有示例又有解释。感兴趣的敌人能够自行去官网或者 GitHub
学习,东哥下次分享应用 Dask
进行机器学习的一些实例。
原创不易,感觉不错点个赞。
欢送关注我的集体公众号:Python 数据迷信
数据迷信学习网站:datadeepin