乐趣区

关于大数据:安利一个Python大数据分析神器

作者:东哥腾飞
首发于公众号:Python 数据迷信

对于 Pandas 运行速度的晋升办法,之前曾经介绍过很多回了,外面常常提及Dask,很多敌人没接触过可能不太理解,明天就举荐一下这个神器。

1、什么是 Dask?

PandasNumpy 大家都不生疏了,代码运行后数据都加载到 RAM 中,如果数据集特地大,咱们就会看到内存飙升。但有时要解决的数据并不适宜 RAM,这时候Dask 来了。

Dask是开源收费的。它是与其余社区我的项目(如 Numpy,Pandas 和 Scikit-Learn)协调开发的。

官网:https://dask.org/

Dask反对 PandasDataFrameNumpyArray 的数据结构,并且既可在本地计算机上运行,也能够扩大到在集群上运行。

基本上,只有编写一次代码,应用一般的 Pythonic 语法,就可在本地运行或部署到多节点集群上。这自身就是一个很牛逼的性能了,但这还不是最牛逼的。

我感觉 Dask 的最牛逼的性能是:它兼容大部分咱们曾经在用的工具,并且只需改变大量的代码,就能够利用本人笔记本电脑上已有的解决能力并行运行代码。而并行处理数据就意味着更少的执行工夫,更少的等待时间和更多的剖析工夫。

上面这个就是 Dask 进行数据处理的大抵流程。

2、Dask 反对哪些现有工具?

这一点也是我比拟看中的,因为 Dask 能够与 Python 数据处理和建模的库包兼容,沿用库包的 API,这对于 Python 使用者来说学习老本是极低的。而像 HadoopSpark 这种大数据处理是有很高的学习门槛和工夫老本的。

目前,Dask可反对 pandasNumpySklearnXGBoostXArrayRAPIDS 等等,光是这几项我感觉就足够用了,至多对于罕用的数据处理、建模剖析是齐全笼罩得掉的。

3、Dask 装置

能够应用 conda 或者 pip,或从源代码装置dask

conda install dask

因为 dask 有很多依赖,所以为了疾速装置也可用上面代码,将装置运行 Dask 所需的起码依赖关系集。

conda install dask-core

再有就是通过源来装置。

git clone https://github.com/dask/dask.git
cd dask
python -m pip install .

4、Dask 如何应用?

Numpy、pandas

Dask引入了 3 个并行汇合,它们能够存储大于 RAM 的数据,这些汇合有DataFrameBagsArrays。这些汇合类型中的每一个都可能应用在 RAM 和硬盘之间分区的数据,以及散布在群集中多个节点上的数据。

Dask的应用是十分清晰的,如果你应用 NumPy 数组,就从 Dask 数组开始,如果你应用 Pandas DataFrame,就从Dask DataFrame 开始,依此类推。

import dask.array as da
x = da.random.uniform(low=0, high=10, size=(10000, 10000),  # normal numpy code
                      chunks=(1000, 1000))  # break into chunks of size 1000x1000

y = x + x.T - x.mean(axis=0)  # Use normal syntax for high level algorithms

# DataFrames
import dask.dataframe as dd
df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp',  # normal Pandas code
                 blocksize=64000000)  # break text into 64MB chunks

s = df.groupby('name').balance.mean()  # Use normal syntax for high level algorithms

# Bags / lists
import dask.bag as db
b = db.read_text('*.json').map(json.loads)
total = (b.filter(lambda d: d['name'] == 'Alice')
          .map(lambda d: d['balance'])
          .sum())

这些高级接口在稍微变动的状况下复制了标准接口。对于原始我的项目中的大部分 API,这些接口会主动为咱们并行处理较大的数据集,实现上不是很简单,对照 Dask 的 doc 文档即可一步步实现。

Delayed

上面说一下 DaskDelay 性能,十分弱小。

Dask.delayed是一种并行化现有代码的简略而弱小的办法。之所以被叫做 delayed 是因为,它没有立刻计算出后果,而是将要作为工作计算的后果记录在一个图形中,稍后将在并行硬件上运行。

有时问题用已有的 dask.arraydask.dataframe可能都不适宜,在这些状况下,咱们能够应用更简略的 dask.delayed 界面并行化自定义算法。例如上面这个例子。

def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)
45

下面代码在单个线程中按程序运行。然而,咱们看到其中很多能够并行执行。Dask delayed函数可润饰 incdouble 这些函数,以便它们可提早运行,而不是立刻执行函数,它将函数及其参数放入计算工作图中。

咱们简略批改代码,用 delayed 函数包装一下。

import dask

output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)
    output.append(c)

total = dask.delayed(sum)(output)

代码运行后 incdoubleaddsum都还没有产生,而是生成一个计算的工作图交给了 total。而后咱们用visualizatize 看下工作图。

total.visualize()  


上图显著看到了并行的可能性,所以毫不犹豫,应用 compute 进行并行计算,这时才实现了计算。

>>> total.compute()
45

因为数据集较小无奈比拟工夫,这里只介绍下应用办法,具体可本人入手实际下。

Sklearn 机器学习

对于机器学习的并行化执行,因为内容较多,东哥会在另一篇文章开展。这里简略说下一下dask-learn

dask-learn我的项目是与 Sklearn 开发人员合作实现的。当初可实现并行化有 Scikit-learnPipelineGridsearchCVRandomSearchCV 以及这些的变体,它们能够更好地解决嵌套的并行操作。

因而,如果你将 sklearn 替换为dklearn,那么速度将会晋升很多。

# from sklearn.grid_search import GridSearchCV
  from dklearn.grid_search import GridSearchCV
# from sklearn.pipeline import Pipeline
  from dklearn.pipeline import Pipeline
上面是一个应用 Pipeline 的示例,其中利用了 PCA 和逻辑回归。from sklearn.datasets import make_classification

X, y = make_classification(n_samples=10000,
                           n_features=500,
                           n_classes=2,
                           n_redundant=250,
                           random_state=42)

from sklearn import linear_model, decomposition
from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline

logistic = linear_model.LogisticRegression()
pca = decomposition.PCA()
pipe = Pipeline(steps=[('pca', pca),
                       ('logistic', logistic)])


grid = dict(pca__n_components=[50, 100, 150, 250],
            logistic__C=[1e-4, 1.0, 10, 1e4],
            logistic__penalty=['l1', 'l2'])

# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV

estimator = GridSearchCV(pipe, grid)

estimator.fit(X, y)

后果是:sklearn会在 40 秒钟左右执行此计算,而 dask-learn 替代品大概须要 10 秒钟。
另外,如果增加以下代码能够连贯到集群,通过 Client 能够展现整个计算过程的 dashboard,由Bokeh 实现。

from dask.distributed import Client
c = Client('scheduler-address:8786')

5、总结

以上就是 Dask 的简略介绍,Dask 的性能是十分弱小的,且阐明文档也十分全,既有示例又有解释。感兴趣的敌人能够自行去官网或者 GitHub 学习,东哥下次分享应用 Dask 进行机器学习的一些实例。

原创不易,感觉不错点个赞。

欢送关注我的集体公众号:Python 数据迷信

数据迷信学习网站:datadeepin

退出移动版