分布式过程:
分布式过程是指的是将 Process 过程散布到多台机器上,充分利用多台机器的性能实现简单的工作。在 Thread 和 Process 中,该当优选 Process,因为 Process 更稳固,而且,Process 能够散布到多台机器上,而 Thread 最多只能散布到同一台机器的多个 CPU 上。
Python 的 multiprocessing 模块岂但反对多过程,其中 managers 子模块还反对把多过程散布到多台机器上。一个服务过程能够作为调度者,将工作散布到其余多个过程中,依附网络通信。因为 managers 模块封装很好,不用理解网络通信的细节,就能够很容易地编写分布式多过程程序。
举个例子:做爬虫程序时,经常会遇到这样的场景,咱们想抓取图片的链接地址,将链接地址寄存到 Queue 中,另外的过程负责从 Queue 中读取链接地址进行下载和存储到本地。当初把这个过程做成分布式,一台机器上的过程负责抓取链接,其它机器上的过程负责下载存储,那么遇到的次要问题是将 Queue 裸露到网络中,让其它机器过程都能够拜访,分布式过程就是将这一个过程进行了封装,咱们能够将这个过程称为本队列的网络化。
创立分布式过程须要一个服务过程与工作过程:
服务过程创立:
建设队列 Queue,用来进行过程间的通信。服务过程创立工作队列 task_queue, 用来作为 传递工作给工作过程的通道; 服务过程创立后果队列 result_queue,作为工作过程实现工作后回复服务过程的通道。在分布式多过程环境下,必须通过由 Queuemanager 取得 Queue 接口来增加工作.
把第一步中建设的队列在网络上注册,裸露给其它过程 (主机),注册后取得网络队列,相当于本队队列的映像.
建设一个险象 (Queuemanager(BaseManager)) 实例 manager, 绑定端口和验证口令。
启动第三步中建设的实例,即启动治理 manager, 监管信息通道
通过治理实例的办法取得通过网络拜访的 Queue 对象,即再把网络队列实体化成能够应用的本地队列.
创立工作到 ” 本地 ” 队列中,主动上传工作到网络队列中,调配给工作过程进行解决。
留神:我这里是基于 window 操作系统的,linux 零碎会有所不同
coding:utf-8
taskManager.py for win
> import Queue
> from multiprocessing.managers import BaseManager
> from multiprocessing import freeze_support
工作个数
task_num = 10
定义收发队列
task_queue = Queue.Queue(task_num)
result_queue = Queue.Queue(task_num)
def get_task():
return task_queue
def get_result():
return result_queue
创立相似的 QueueManager
class QueueManager(BaseManager):
pass
def win_run():
# windows 下绑定调用接口不能应用 lambda, 所以只能先定义函数再绑定
QueueManager.register('get_task_queue', callable=get_task)
QueueManager.register('get_result_queue', callable=get_result)
# 绑定端口并设置验证口令,windows 下须要填写 IP 地址,Linux 下不填,默认为本地
manager = QueueManager(address=('127.0.0.1', 4000), authkey='qty')
# 启动
manager.start()
# 通过网络获取工作队列和后果队列
task = manager.get_task_queue()
result = manager.get_result_queue()
try:
# 增加工作
for i in range(10):
print 'put task %s...' % i
task.put(i)
print 'try get result...'
for i in range(10):
print 'result is %s' % result.get(timeout=10)
except:
print 'manage error'
finally:
# 肯定要敞开,否则会报治理未敞开的谬误
manager.shutdown()
print 'master exit!'
if name == ‘__main__’:
# windows 下多过程可能会呈现问题,增加这句能够缓解
freeze_support()
win_run()
工作过程
应用 QueueManager 注册用于获取 Queue 的办法名称,工作过程只能通过名称来在网络上获取 Queue
连贯服务器中,端口和验证口令留神放弃与服务过程中完全一致
从网络上获取 Queue,进行本地化
从 Task 队列获取工作,并把后果 result 队列
coding:utf-8
import time
from multiprocessing.managers import BaseManager
创立相似的 QueueManager:
class QueueManager(BaseManager):
pass
第一步:应用 QueueManager 注册用于获取 Queue 的办法名称
QueueManager.register(‘get_task_queue’)
QueueManager.register(‘get_result_queue’)
第二步:连贯服务器
server_addr = ‘127.0.0.1’
print “Connect to server %s” % server_addr
端口和验证口令留神放弃与服务过程完全一致
m = QueueManager(address=(server_addr, 4000), authkey=’qty’)
从网络连接
m.connect()
第三步:获取 Queue 的对象
task = m.get_task_queue()
result = m.get_result_queue()
第四步: 从 task 队列获取工作,并把后果写入 result 队列:
while not task.empty():
index = task.get(True, timeout=10)
print 'run task download %s' % str(index)
result.put('%s---->success' % str(index))
解决完结
print ‘worker exit.’
执行后果
先运行: 服务过程失去后果
put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
try get result...
再立刻运行:工作过程失去后果,避免过程走完后得不到后果,这里肯定要立刻执行
Connect to server 127.0.0.1
run task download 0
run task download 1
run task download 2
run task download 3
run task download 4
run task download 5
run task download 6
run task download 7
run task download 8
run task download 9
worker exit.
最初再回头看服务过程窗口的后果
put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
try get result...
result is 0---->success
result is 1---->success
result is 2---->success
result is 3---->success
result is 4---->success
result is 5---->success
result is 6---->success
result is 7---->success
result is 8---->success
result is 9---->success
master exit!