关于python:一篇文章搞定Python多进程

7次阅读

共计 5748 个字符,预计需要花费 15 分钟才能阅读完成。

1.Python 多过程模块
Python 中的多过程是通过 multiprocessing 包来实现的,和多线程的 threading.Thread 差不多,它能够利用 multiprocessing.Process 对象来创立一个过程对象。这个过程对象的办法和线程对象的办法差不多也有 start(), run(), join() 等办法,其中有一个办法不同 Thread 线程对象中的守护线程办法是 setDeamon,而 Process 过程对象的守护过程是通过设置 daemon 属性来实现的。
上面说说 Python 多过程的实现办法,和多线程相似
2.Python 多过程实现办法一

from multiprocessing import  Process

def fun1(name):
    print('测试 %s 多过程' %name)

if __name__ == '__main__':
    process_list = []
    for i in range(5):  #开启 5 个子过程执行 fun1 函数
        p = Process(target=fun1,args=('Python',)) #实例化过程对象
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('完结测试')

后果

测试 Python 多过程
测试 Python 多过程
测试 Python 多过程
测试 Python 多过程
测试 Python 多过程
完结测试

Process finished with exit code 0

下面的代码开启了 5 个子过程去执行函数,咱们能够察看后果,是同时打印的,这里实现了真正的并行操作,就是多个 CPU 同时执行工作。咱们晓得过程是 python 中最小的资源分配单元,也就是过程两头的数据,内存是不共享的,每启动一个过程,都要独立分配资源和拷贝拜访的数据,所以过程的启动和销毁的代价是比拟大了,所以在理论中应用多过程,要依据服务器的配置来设定。
3.Python 多过程实现办法二
还记得 python 多线程的第二种实现办法吗? 是通过类继承的办法来实现的,python 多过程的第二种实现形式也是一样的

from multiprocessing import  Process

class MyProcess(Process): #继承 Process 类
    def __init__(self,name):
        super(MyProcess,self).__init__()
        self.name = name

    def run(self):
        print('测试 %s 多过程' % self.name)


if __name__ == '__main__':
    process_list = []
    for i in range(5):  #开启 5 个子过程执行 fun1 函数
        p = MyProcess('Python') #实例化过程对象
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('完结测试')

后果

测试 Python 多过程
测试 Python 多过程
测试 Python 多过程
测试 Python 多过程
测试 Python 多过程
完结测试

Process finished with exit code 0

成果和第一种形式一样。
咱们能够看到 Python 多过程的实现形式和多线程的实现形式简直一样。
Process 类的其余办法

构造方法:Process([group [, target [, name [, args [, kwargs]]]]])
  group: 线程组 
  target: 要执行的办法
  name: 过程名
  args/kwargs: 要传入办法的参数

实例办法:  is_alive():返回过程是否在运行,bool 类型。  join([timeout]):阻塞以后上下文环境的过程程,直到调用此办法的过程终止或达到指定的 timeout(可选参数)。  start():过程准备就绪,期待 CPU 调度
  run():strat()调用 run 办法,如果实例过程时未制订传入 target,这 star 执行 t 默认 run()办法。  terminate():不论工作是否实现,立刻进行工作过程

属性:  daemon:和线程的 setDeamon 性能一样
  name:过程名字
  pid:过程号

对于 join,daemon 的应用和 python 多线程一样,这里就不在复述了,大家能够看看以前的 python 多线程系列文章。
4.Python 多线程的通信
过程是零碎独立调度核调配系统资源(CPU、内存)的根本单位,过程之间是互相独立的,每启动一个新的过程相当于把数据进行了一次克隆,子过程里的数据批改无奈影响到主过程中的数据,不同子过程之间的数据也不能共享,这是多过程在应用中与多线程最显著的区别。然而难道 Python 多过程两头难道就是孤立的吗?当然不是,python 也提供了多种办法实现了多过程两头的通信和数据共享(能够批改一份数据)
过程对列 Queue
Queue 在多线程中也说到过,在生成者消费者模式中应用,是线程平安的,是生产者和消费者两头的数据管道,那在 python 多过程中,它其实就是过程之间的数据管道,实现过程通信。

from multiprocessing import Process,Queue


def fun1(q,i):
    print('子过程 %s 开始 put 数据' %i)
    q.put('我是 %s 通过 Queue 通信' %i)

if __name__ == '__main__':
    q = Queue()

    process_list = []
    for i in range(3):
        p = Process(target=fun1,args=(q,i,))  #留神 args 外面要把 q 对象传给咱们要执行的办法,这样子过程能力和主过程用 Queue 来通信
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('主过程获取 Queue 数据')
    print(q.get())
    print(q.get())
    print(q.get())
    print('完结测试')

后果

子过程 0 开始 put 数据
子过程 1 开始 put 数据
子过程 2 开始 put 数据
主过程获取 Queue 数据
我是 0 通过 Queue 通信
我是 1 通过 Queue 通信
我是 2 通过 Queue 通信
完结测试

Process finished with exit code 0

下面的代码后果能够看到咱们主过程中能够通过 Queue 获取子过程中 put 的数据,实现过程间的通信。
管道 Pipe
管道 Pipe 和 Queue 的作用大抵差不多,也是实现过程间的通信,上面之间看怎么应用吧

from multiprocessing import Process, Pipe
def fun1(conn):
    print('子过程发送音讯:')
    conn.send('你好主过程')
    print('子过程承受音讯:')
    print(conn.recv())
    conn.close()

if __name__ == '__main__':
    conn1, conn2 = Pipe() #关键点,pipe 实例化生成一个双向管
    p = Process(target=fun1, args=(conn2,)) #conn2 传给子过程
    p.start()
    print('主过程承受音讯:')
    print(conn1.recv())
    print('主过程发送音讯:')
    conn1.send("你好子过程")
    p.join()
    print('完结测试')

后果

主过程承受音讯:子过程发送音讯:子过程承受音讯:你好主过程
主过程发送音讯:你好子过程
完结测试

Process finished with exit code 0

下面能够看到主过程和子过程能够互相发送音讯
Managers
Queue 和 Pipe 只是实现了数据交互,并没实现数据共享,即一个过程去更改另一个过程的数据。那么久要用到 Managers

from multiprocessing import Process, Manager

def fun1(dic,lis,index):

    dic[index] = 'a'
    dic['2'] = 'b'    
    lis.append(index)    #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
    #print(l)

if __name__ == '__main__':
    with Manager() as manager:
        dic = manager.dict()# 留神字典的申明形式,不能间接通过 {} 来定义
        l = manager.list(range(5))#[0,1,2,3,4]

        process_list = []
        for i in range(10):
            p = Process(target=fun1, args=(dic,l,i))
            p.start()
            process_list.append(p)

        for res in process_list:
            res.join()
        print(dic)
        print(l)

后果:

{0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]

能够看到主过程定义了一个字典和一个列表,在子过程中,能够增加和批改字典的内容,在列表中插入新的数据,实现过程间的数据共享,即能够独特批改同一份数据
5. 过程池
过程池外部保护一个过程序列,当应用时,则去过程池中获取一个过程,如果过程池序列中没有可供使用的进过程,那么程序就会期待,直到过程池中有可用过程为止。就是固定有几个过程能够应用。
过程池中有两个办法:
apply:同步,个别不应用
apply_async:异步

from  multiprocessing import Process,Pool
import os, time, random

def fun1(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    pool = Pool(5) #创立一个 5 个过程的过程池

    for i in range(10):
        pool.apply_async(func=fun1, args=(i,))

    pool.close()
    pool.join()
    print('完结测试')

后果

Run task 0 (37476)...
Run task 1 (4044)...
Task 0 runs 0.03 seconds.
Run task 2 (37476)...
Run task 3 (17252)...
Run task 4 (16448)...
Run task 5 (24804)...
Task 2 runs 0.27 seconds.
Run task 6 (37476)...
Task 1 runs 0.58 seconds.
Run task 7 (4044)...
Task 3 runs 0.98 seconds.
Run task 8 (17252)...
Task 5 runs 1.13 seconds.
Run task 9 (24804)...
Task 6 runs 1.46 seconds.
Task 4 runs 2.73 seconds.
Task 8 runs 2.18 seconds.
Task 7 runs 2.93 seconds.
Task 9 runs 2.93 seconds.
完结测试

对 Pool 对象调用 join()办法会期待所有子过程执行结束,调用 join()之前必须先调用 close(),调用 close()之后就不能持续增加新的 Process 了。
过程池 map 办法
案例来源于网络,侵权请告知,谢谢
因为网上看到这个例子感觉不错,所以这里就不本人写案例,这个案例比拟有说服力

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = \'thumbs\'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if \'jpeg\' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == \'__main__\':
    folder = os.path.abspath(\'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images) #关键点,images 是一个可迭代对象
    pool.close()
    pool.join()

上边这段代码的次要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保留到特定文件夹中。这我的机器上,用这一程序处理 6000 张图片须要破费 27.9 秒。map 函数并不反对手动线程治理,反而使得相干的 debug 工作也变得异样简略。
map 在爬虫的畛域里也能够应用,比方多个 URL 的内容爬取,能够把 URL 放入元祖里,而后传给执行函数。

正文完
 0