关于python:如何用-Python-实现分布式计算

53次阅读

共计 3174 个字符,预计需要花费 8 分钟才能阅读完成。

面对计算密集型的工作,除了多过程,就是分布式计算,如何用 Python 实现分布式计算呢?明天分享一个很简略的办法,那就是借助于 Ray。

什么是 Ray

Ray 是基于 Python 的分布式计算框架,采纳动态图计算模型,提供简略、通用的 API 来创立分布式应用。应用起来很不便,你能够通过装璜器的形式,仅需批改极少的的代码,让本来运行在单机的 Python 代码轻松实现分布式计算,目前多用于机器学习。

Ray 的特色:

1、提供用于构建和运行分布式应用程序的简略原语。

2、使用户可能并行化单机代码,代码更改很少甚至为零。

3、Ray Core 包含一个由应用程序、库和工具组成的大型生态系统,以反对简单的应用程序。比方 Tune、RLlib、RaySGD、Serve、Datasets、Workflows。

装置 Ray

最简略的装置官网版本的形式:

pip install -U ray
pip install 'ray[default]'

如果是 Windows 零碎,要求必须装置 Visual C++ runtime

其余装置形式见官网文档。

应用 Ray

一个装璜器就搞定分布式计算:

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures)) # [0, 1, 4, 9]

先执行 ray.init(),而后在要执行分布式工作的函数前加一个装璜器 @ray.remote 就实现了分布式计算。装璜器 @ray.remote 也能够装璜一个类:

import ray
ray.init()

@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1

    def read(self):
        return self.n

counters = [Counter.remote() for i in range(4)]
tmp1 = [c.increment.remote() for c in counters]
tmp2 = [c.increment.remote() for c in counters]
tmp3 = [c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures)) # [3, 3, 3, 3]

当然了,上述的分布式计算仍然是在本人的电脑上进行的,只不过是以分布式的模式。 程序执行的过程中 ,你能够输出 http://127.0.0.1:8265/#/ 查看分布式工作的执行状况:

那么如何实现 Ray 集群计算呢?接着往下看。

应用 Ray 集群

Ray 的劣势之一是可能在同一程序中利用多台机器。当然,Ray 能够在一台机器上运行,因为通常状况下,你只有一台机器。但真正的力量是在一组机器上应用 Ray。

Ray 集群由一个头节点和一组工作节点组成。须要先启动头节点,给 worker 节点赋予头节点地址,组成集群:

你能够应用 Ray Cluster Launcher 来配置机器并启动多节点 Ray 集群。你能够在 AWS、GCP、Azure、Kubernetes、阿里云、外部部署和 Staroid 上甚至在你的自定义节点提供商上应用集群启动器。

Ray 集群还能够利用 Ray Autoscaler,它容许 Ray 与云提供商交互,以依据标准和应用程序工作负载申请或公布实例。

当初,咱们来疾速演示下 Ray 集群的性能,这里是用 Docker 来启动两个 Ubuntu 容器来模仿集群:

  • 环境 1: 172.17.0.2 作为 head 节点
  • 环境 2: 172.17.0.3 作为 worker 节点,能够有多个 worker 节点

具体步骤:

1. 下载 ubuntu 镜像

docker pull ubuntu

2. 启动 ubuntu 容器,装置依赖

启动第一个

docker run -it --name ubuntu-01 ubuntu bash

启动第二个

docker run -it --name ubuntu-02 ubuntu bash

查看下它们的 IP 地址:

$ docker inspect -f "{{.NetworkSettings.IPAddress}}" ubuntu-01
172.17.0.2
$ docker inspect -f "{{.NetworkSettings.IPAddress}}" ubuntu-02
172.17.0.3

而后别离在容器外部装置 python、pip、ray

apt update && apt install python3 
apt install python3-pip
pip3 install ray

3. 启动 head 节点和 worker 节点

抉择在其中一个容器作为 head 节点,这里抉择 172.17.0.2,执行:

ray start --head --node-ip-address 172.17.0.2

默认端口是 6379,你能够应用 --port 参数来批改默认端口,启动后的后果如下:

疏忽掉正告,能够看到给出了一个提醒,如果要把其余节点绑定到该 head,能够这样:

ray start --address='172.17.0.2:6379' --redis-password='5241590000000000'

在另一个节点执行上述命令,即可启动 worker 节点:

如果要敞开,执行:

ray stop

4、执行工作

轻易抉择一个节点,执行上面的脚本,批改下 ray.init() 函数的参数:

from collections import Counter
import socket
import time

import ray

ray.init(address='172.17.0.2:6379', _redis_password='5241590000000000')

print('''This cluster consists o    f
    {} nodes in total
    {} CPU resources in total
'''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))

@ray.remote
def f():
    time.sleep(0.001)
    # Return IP address.
    return socket.gethostbyname(socket.gethostname())

object_ids = [f.remote() for _ in range(10000)]
ip_addresses = ray.get(object_ids)

print('Tasks executed')
for ip_address, num_tasks in Counter(ip_addresses).items():
    print('{} tasks on {}'.format(num_tasks, ip_address))

执行后果如下:

能够看到 172.17.0.2 执行了 4751 个工作,172.17.0.3 执行了 5249 个工作,实现了分布式计算的成果。

最初的话

有了 Ray,你能够不应用 Python 的多过程就能够实现并行计算。明天的机器学习次要就是计算密集型工作,不借助分布式计算速度会十分慢,Ray 提供了简略实现分布式计算的解决方案。

正文完
 0