乐趣区

关于python:Python的多任务编程

文章转载自东凌阁
GoodMai 好买网

前言
Python 程序代码都是按自上而下的程序加载并执行的,但理论须要代码解决的工作并不都是须要循序渐进的程序执行,通常为进步代码执行的效率,须要多个代码执行工作同时执行,也就是多任务编程的需要。
根本的计算机模型是由 CPU、RAM 及各种资源 (键盘、硬盘、显卡、网卡等) 组成,代码的执行过程,理论就是 CPU 和相干寄存器及 RAM 之间的相干解决的过程。在单核 CPU 场景下,一段代码交由 CPU 执行前,都会处于就绪队列中,CPU 执行时很快就会返回该段代码的后果,所以不同过程的代码是轮流由 CPU 执行的,因为 CPU 执行速度很快,在体现上仍会被感觉是同时执行的。不同就绪队列间的读入与后果保留被称之为上下文切换,因为过程间切换会产生肯定的工夫期待及资源的耗费,所以为了缩小等待时间和资源的耗费,就引入了线程的设计。线程是当过程的队列被受权占用 CPU 时,该过程的所有线程队列在共享该过程资源的环境下按优先级由 CPU 执行。无论是过程还是线程,其队列及资源切换都是由操作系统进行管制的,同时线程的切换也是十分耗费性能的,为了使各线程的调度更节约资源,就呈现了协程的设计。协程是在过程或线程环境下执行的,其领有本人的寄存器上下文和栈,调度是齐全由用户管制的,相当于函数办法的调度。对于多任务编程,若要实现代码的多任务高效率执行,咱们要清晰如下这几个概念的特点及其区别,能力依据理论需要,选用最佳的多任务编程办法。

并行
指在同一时刻有多个过程的指令在多个处理器上同时执行。
并发
是指在同一时刻只能有一个过程的指令执行,但多个过程指令被疾速轮换执行,使得在宏观上具备多个过程同时执行的成果。
过程
过程是程序的运行态,过程间数据共享须要借助内部存储空间。
线程
线程是过程的组成部分, 一个过程能够蕴含一个或多个线程,同一过程内线程间数据共享属于外部共享。
协程
协程是一种用户态的轻量级线程,一个过程能够蕴含一个或多个协程,也能够在一个线程蕴含一个或多个协程。协程的调度齐全由用户管制,同一过程内协程间数据共享属于外部共享。
多线程解决
因为 Python 是动静编译的语言,与 C /C++、Java 等动态语言不同,它是在运行时一句一句代码地边编译边执行的。用 C 语言实现的 Python 解释器,通常称为 CPython,也是 Python 环境默认的编译器。在 Cpython 解释器中,为避免多个线程同时执行同一 Python 的代码段,确保线程数据安全,引入了全局解释器锁 (GIL, Global Interpreter Lock) 的解决机制, 该机制相当于一个互斥锁,所以即使一个过程下开启了多线程,但同一时刻只能有一个线程被执行。所以 Python 的多线程是伪线程,性能并不高,也无奈利用 CPU 多核的劣势。

另,GIL 并不是 Python 的个性,他是在实现 Python 解释器(Cpython)时所引入的一个概念,GIL 爱护的是解释器级的数据,爱护用户本人的数据仍须要本人加锁解决。在默认状况下,因为 GIL 的存在,为了使多线程 (threading) 执行效率更高,须要应用 join 办法对无序的线程进行阻塞,如下代码能够看到区别。

from multiprocessing import Process
import threading
import os,time

l=[]
stop=time.time()
def work():
    global stop
    time.sleep(2)
    print('===>',threading.current_thread().name)
    stop=time.time()

def test1():
    for i in range(400):
        p=threading.Thread(target=work,name="test"+str(i)) 
        l.append(p)
        p.start()

def test2():
    for i in range(400):
        p=threading.Thread(target=work,name="test"+str(i)) 
        l.append(p)
        p.start()

    for p in l:
        p.join()

if __name__ == '__main__':
    print("CPU Core:",os.cpu_count()) #本机为 4 核
    print("Worker: 400") #测试线程数

    start=time.time()
    test1()
    active_count=threading.active_count()
    while (active_count>1):
        active_count=threading.active_count()
        continue
    test1_result=stop-start

    start=time.time()
    l=[]
    test2()
    active_count=threading.active_count()
    while (active_count>1):
        active_count=threading.active_count()
        continue
    
    print('Thread run time is %s' %(test1_result))
print('Thread join run time is %s' %(stop-start))

执行后果如下:

Thread run time is 4.829492807388306 
Thread join run time is 2.053645372390747

由上后果能够看到, 多线程时 join 阻塞后执行效率进步了很多。
多过程与多线程
多任务编程的实质是 CPU 占用办法的调度解决,对于 python 下多任务处理有多种编程办法可供选择,别离有多过程 (multiprocessing)、多线程(threading) 及异步协程(Asyncio),在理论应用中该如何抉择呢?咱们先看如下一段程序的执行成果。

from multiprocessing import Process
from threading import Thread
import os,time

l=[]
def work():
    res=0
    for i in range(100000000):
        res*=i

def test1():
    for i in range(4):
        p=Process(target=work) 
        l.append(p)
        p.start()

def test2():
    for i in range(4):
        p=Thread(target=work) 
        l.append(p)
        p.start()

    for p in l:
        p.join()

if __name__ == '__main__':
    print("CPU Core:",os.cpu_count()) #本机为 4 核
    print("Worker: 4") #工作线程或子过程数

    start=time.time()
    test1()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    print('Process run time is %s' %(stop-start))
    
    start=time.time()
    l=[]
    test2()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    print('Thread run time is %s' %(stop-start))

执行后果如下:

CPU Core: 4 
Worker: 4 
Process run time is 11.030176877975464 
Thread run time is 17.0117769241333

从下面的后果,咱们能够看到同一个函数用 Process 及 Thread 不同的办法,执行的工夫是不同的,为什么会产生这样的差别?
多过程 (multiprocessing) 办法应用子过程而非线程,其无效地绕过了全局解释器锁 GIL(Global Interpreter Lock), 并充分利用了多核 CPU 的性能,所以在多核 CPU 环境下,其比多线程形式效率要高。

协程
又称为微线程,协程也可被看作是被标注的函数,不同被表注函数的执行和切换就是协程的切换,其齐全由编程者自行管制。协程个别是应用 gevent 库,在晚期这个库用起来比拟麻烦,所以在 python 3.7 当前的版本,对协程的应用办法做了优化。执行代码如下:

import asyncio
import time

async def work(i):
    await asyncio.sleep(2)
    print('===>',i)

async def main():
    start=time.time()
    l=[]
    for i in range(400):
        p=asyncio.create_task(work(i))
        l.append(p)

    for p in l:
        await p

    stop=time.time()
    print('run time is %s' %(stop-start))

asyncio.run(main())

执行后果如下:
run time is 2.0228068828582764
另,默认环境下,协程是在单线程模式下执行的异步操作,其并不能施展多处理器的性能。为了晋升执行效率,能够在多过程中执行协程调用办法,代码用例如下:

from multiprocessing import Process
import asyncio
import os,time

l=[]
async_result=0
async def work1():
    res=0
    for i in range(100000000):
        res*=i

# 协程入口
async def async_test():
    m=[]
    for i in range(4):
        p=asyncio.create_task(work1())
        m.append(p)

    for p in m:
        await p

async def async_test1():
    await asyncio.create_task(work1())

def async_run():
    asyncio.run(async_test1())

# 多过程入口
def test1():
    for i in range(4):
        p=Process(target=async_run) 
        l.append(p)
        p.start()

if __name__ == '__main__':
    print("CPU Core:",os.cpu_count()) #本机为 4 核
    print("Worker: 4") #工作线程或子过程数
    start=time.time()
    asyncio.run(async_test())
    stop=time.time()
    
    print('Asyncio run time is %s' %(stop-start))

    start=time.time()
    test1()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()

    print('Process Asyncio run time is %s' %(stop-start))

执行后果如下:

CPU Core: 4 
Worker: 4 
Asyncio run time is 18.89663052558899 
Process Asyncio run time is 10.865562438964844

如上后果,在多过程中调用多协程的办法,执行效率明显提高。

多任务编程抉择
如上所后果是否是就决定肯定要抉择多过程 (multiprocessing) 模式呢?咱们再看下如下代码:

from multiprocessing import Process
from threading import Thread
import os,time

l=[]
# 仅计算
def work1():
    res=0
    for i in range(100000000):
        res*=i

# 仅输入
def work2():
    time.sleep(2)
    print('===>')

# 多过程,仅计算
def test1():
    for i in range(4):
        p=Process(target=work1) 
        l.append(p)
        p.start()

# 多过程,仅输入
def test_1():
    for i in range(400):
        p=Process(target=work2) 
        l.append(p)
        p.start()

# 多线程,仅计算
def test2():
    for i in range(4):
        p=Thread(target=work1) 
        l.append(p)
        p.start()

    for p in l:
        p.join()

# 多线程,仅输入
def test_2():
    for i in range(400):
        p=Thread(target=work2) 
        l.append(p)
        p.start()

    for p in l:
        p.join()

if __name__ == '__main__':
    print("CPU Core:",os.cpu_count()) #本机为 4 核

    start=time.time()
    test1()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    test_result=stop-start
    
    start=time.time()
    l=[]
    test_1()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    test1_result=stop-start

    start=time.time()
    l=[]
    test2()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    test2_result=stop-start
    start=time.time()
    l=[]
    test_2()
    while (l[len(l)-1].is_alive()):
        continue
    stop=time.time()
    test3_result=stop-start
    print('Process run time is %s' %(test_result))
    print('Process I/O run time is %s' %(test1_result))
    print('Thread run time is %s' %(test2_result))
    print('Thread I/O run time is %s' %(stop-start))

执行后果如下:

Process run time is 10.77662968635559 
Process I/O run time is 2.9869778156280518 
Thread run time is 16.842355012893677 
Thread I/O run time is 2.024587869644165

由后果可看,在仅计算的操作时,多过程效率比拟高,在仅输入的操作时,多线程的效率比拟高,所以在理论应用中要依据理论状况测试决定。通用的倡议如下:
多线程 (threading) 用于 IO 密集型,如 socket,爬虫,web
多过程 (multiprocessing) 用于计算密集型,如数据分析

退出移动版