关于python3.x:物无定味适口者珍Python3并发场景CPU密集IO密集任务的并发方式的场景抉择多线程多进程协程asyncio

3次阅读

共计 8401 个字符,预计需要花费 22 分钟才能阅读完成。

原文转载自「刘悦的技术博客」https://v3u.cn/a_id_221

个别状况下,大家对 Python 原生的并发 / 并行工作形式:过程、线程和协程的关系与区别都能讲清楚。甚至具体的对象名称、内置办法都能够一五一十,这显然是极好的,但咱们其实都疏忽了一个问题,就是具体利用场景,三者的应用目标是一样的,话句话说,应用后果是一样的,都能够进步程序运行的效率,但到底那种场景用那种形式更好一点?

这就好比,目前支流的汽车发动机变速箱无外乎三种:双离合、CVT 以及传统 AT。主机厂把它们搭载到不同的发动机和车型上,它们都是变速箱,都能够将发动机产生的能源作用到车轮上,但不同应用场景下到底该抉择那种变速箱?这显然也是一个问题。

所谓“无场景,不性能”,本次咱们来讨论一下,具体的并发编程场景有哪些,并且对应到具体场景,应该怎么抉择并发伎俩和形式。

什么是并发和并行?

在探讨场景之前,咱们须要将多任务执行的形式进行一下分类,那就是并发形式和并行形式。教科书上通知咱们:并行是指两个或者多个事件在同一时刻产生;而并发是指两个或多个事件在同一时间距离内产生。在多道程序环境下,并发性是指在一段时间内宏观上有多个程序在同时运行,但在单处理机零碎中,每一时刻却仅能有一道程序执行,故宏观上这些程序只能是分时地交替执行。

如同有那么一点形象,好吧,让咱们求实一点,因为 GIL 全局解释器锁的存在,在 Python 编程畛域,咱们能够简略粗犷地将并发和并行用程序通过是否应用多核 CPU 来辨别,能应用多核 CPU 就是并行,不能应用多核 CPU,只能单核解决的,就是并发。就这么简略,是的,Python 的 GIL 全局解释器锁帮咱们把问题简化了,这是 Python 的大幸?还是可怜?

Python 中并发工作实现形式蕴含:多线程 threading 和协程 asyncio,它们的共同点都是交替执行,而区别是多线程 threading 是抢占式的,而协程 asyncio 是合作式的,原理也很简略,只有一颗 CPU 能够用,而一颗 CPU 一次只能做一件事,所以只能靠不停地切换能力实现并发工作。

Python 中并行任务的实现形式是多过程 multiprocessing,通过 multiprocessing 库,Python 能够在程序主过程中创立新的子过程。这里的一个过程能够被认为是一个简直齐全不同的程序,只管从技术上讲,它们通常被定义为资源汇合,其中资源包含内存、文件句柄等。换一种说法是,每个子过程都领有本人的 Python 解释器,因而,Python 中的并行任务能够应用一颗以上的 CPU,每一颗 CPU 都能够跑一个过程,是真正的同时运行,而不须要切换,如此 Python 就能够实现并行任务。

什么时候应用并发?IO 密集型工作

当初咱们搞清楚了,Python 里的并发运行形式就是多线程 threading 和协程 asyncio,那么什么场景下应用它们?

个别状况下,工作场景,或者说的更精确一些,工作类型,无非两种:CPU 密集型工作和 IO 密集型工作。

什么是 IO 密集型工作?IO 就是 Input-Output 的缩写,说白了就是程序的输出和输入,想一想的确就是这样,您的电脑,它不就是这两种性能吗?用键盘、麦克风、摄像头输出数据,而后再用屏幕和音箱进行输入操作。

但输出和输入操作要比电脑中的 CPU 运行速度慢,换句话说,CPU 得等着这些比它慢的输出和输入操作,说白了就是 CPU 运算一会,就得等这些 IO 操作,等 IO 操作完了,CPU 能力持续运算一会,而后再等着 IO 操作,如图所示:

由此可知,并发适宜这种 IO 操作密集和频繁的工作,因为就算 CPU 是苹果最新 ARM 架构的 M2 芯片,也没有用武之地。

另外,如果把 IO 密集型工作具象化,那就是咱们常常操作的:硬盘读写(数据库读写)、网络申请、文件的打印等等。

并发形式的抉择:多线程 threading 还是协程 asyncio?

既然波及硬盘读写(数据库读写)、网络申请、文件打印等工作都算并发工作,那咱们就真正地实际一下,看看不同的并发形式到底能晋升多少效率?

一个简略的小需要,对本站数据进行反复抓取操作,并计算首页数据文本的行数:

import requests  
import time  
  
  
def download_site(url, session):  
    with session.get(url) as response:  
        print(f"下载了 {len(response.content)} 行数据")  
  
  
def download_all_sites(sites):  
    with requests.Session() as session:  
        for url in sites:  
            download_site(url, session)  
  
  
if __name__ == "__main__":  
  
    sites = ["https://v3u.cn"] * 50  
    start_time = time.time()  
    download_all_sites(sites)  
    duration = time.time() - start_time  
    print(f"下载了 {len(sites)}次,执行了 {duration} 秒")

在不应用任何并发伎俩的前提下,程序返回:

下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 76347 行数据  
下载了 50 次数据,执行了 8.781155824661255 秒  
[Finished in 9.6s]

这里程序的每一步都是同步操作,也就是说当第一次抓取网站首页时,剩下的 49 次都在期待。

接着应用多线程 threading 来革新程序:

import concurrent.futures  
import requests  
import threading  
import time  
  
  
thread_local = threading.local()  
  
  
def get_session():  
    if not hasattr(thread_local, "session"):  
        thread_local.session = requests.Session()  
    return thread_local.session  
  
  
def download_site(url):  
    session = get_session()  
    with session.get(url) as response:  
        print(f"下载了 {len(response.content)} 行数据")  
  
  
def download_all_sites(sites):  
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:  
        executor.map(download_site, sites)  
  
  
if __name__ == "__main__":  
  
    sites = ["https://v3u.cn"] * 50  
    start_time = time.time()  
    download_all_sites(sites)  
    duration = time.time() - start_time  
    print(f"下载了 {len(sites)}次,执行了 {duration} 秒")

这里通过 with 关键词开启线程池上下文管理器,并发 8 个线程进行下载,程序返回:

下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76161 行数据  
下载了 76424 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 50 次,执行了 7.680492877960205 秒

很显著,效率上有所晋升,事实上,每个线程其实是在不停“切换”着运行,这就节俭了单线程每次期待爬取后果的工夫:

由此带来了另外一个问题:上下文切换的工夫开销。

让咱们持续革新,用协程来一试矛头,首先装置异步 web 申请库 aiohttp:

pip3 install aiohttp

改写逻辑:

import asyncio  
import time  
import aiohttp  
  
  
async def download_site(session, url):  
    async with session.get(url) as response:  
        print(f"下载了 {response.content_length} 行数据")  
  
  
async def download_all_sites(sites):  
    async with aiohttp.ClientSession() as session:  
        tasks = []  
        for url in sites:  
            task = asyncio.ensure_future(download_site(session, url))  
            tasks.append(task)  
        await asyncio.gather(*tasks, return_exceptions=True)  
  
  
if __name__ == "__main__":  
    sites = ["https://v3u.cn"] * 50  
    start_time = time.time()  
    asyncio.run(download_all_sites(sites))  
    duration = time.time() - start_time  
    print(f"下载了 {len(sites)}次,执行了 {duration} 秒")

程序返回:



下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76424 行数据  
下载了 76161 行数据  
下载了 76424 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据  
下载了 76161 行数据

下载了 50 次,执行了 6.893810987472534 秒

效率上百尺竿头更进一步,同样的应用 with 关键字操作上下文管理器,协程应用 asyncio.ensure\_future()创立工作列表,该列表还负责启动它们。创立所有工作后,应用 asyncio.gather()来放弃会话上下文的实例,直到所有爬取工作实现。和多线程 threading 的区别是,协程并不需要切换上下文,因而每个工作所需的资源和创立工夫要少得多,因而创立和运行更多的工作效率更高:

综上,并发逻辑归根结底是缩小 CPU 期待的工夫,也就是让 CPU 少等一会儿,而协程的工作形式显然让 CPU 期待的工夫起码。

并行形式:多过程 multiprocessing

再来试试多过程 multiprocessing,并行能不能干并发的事?

import requests  
import multiprocessing  
import time  
  
session = None  
  
  
def set_global_session():  
    global session  
    if not session:  
        session = requests.Session()  
  
  
def download_site(url):  
    with session.get(url) as response:  
        name = multiprocessing.current_process().name  
        print(f"读了 {len(response.content)} 行")  
  
  
def download_all_sites(sites):  
    with multiprocessing.Pool(initializer=set_global_session) as pool:  
        pool.map(download_site, sites)  
  
  
if __name__ == "__main__":  
    sites = ["https://v3u.cn"] * 50  
    start_time = time.time()  
    download_all_sites(sites)  
    duration = time.time() - start_time  
    print(f"下载了 {len(sites)}次,执行了 {duration} 秒")

这里咱们仍然应用上下文管理器开启过程池,默认过程数匹配以后计算机的 CPU 外围数,也就是有几核就开启几个过程,程序返回:

读了 76000 行  
读了 76241 行  
读了 76044 行  
读了 75894 行  
读了 76290 行  
读了 76312 行  
读了 76419 行  
读了 76753 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
读了 76290 行  
下载了 50 次,执行了 8.195281982421875 秒

尽管比同步程序要快,但无疑的,效率上要低于多线程和协程。为什么?因为多过程不适宜 IO 密集型工作,尽管能够利用多核资源,但没有任何意义:

无论开多少过程,CPU 都没有用武之地,少数状况下 CPU 都在期待 IO 操作,也就是说,多核反而连累了 IO 程序的执行。

并行形式的抉择:CPU 密集型工作

什么是 CPU 密集型工作?这里咱们能够应用逆定理:所有不波及硬盘读写(数据库读写)、网络申请、文件打印等工作都算 CPU 密集型工作工作,说白了就是,计算型工作。

以求平方和为例子:

import time  
  
  
def cpu_bound(number):  
    return sum(i * i for i in range(number))  
  
  
def find_sums(numbers):  
    for number in numbers:  
        cpu_bound(number)  
  
  
if __name__ == "__main__":  
    numbers = [5_000_000 + x for x in range(20)]  
    start_time = time.time()  
    find_sums(numbers)  
    duration = time.time() - start_time  
    print(f"{duration}秒")

同步执行 20 次,须要破费多少工夫?

4.466595888137817 秒

再来试试并行形式:

import multiprocessing  
import time  
  
  
def cpu_bound(number):  
    return sum(i * i for i in range(number))  
  
  
def find_sums(numbers):  
    with multiprocessing.Pool() as pool:  
        pool.map(cpu_bound, numbers)  
  
  
if __name__ == "__main__":  
    numbers = [5_000_000 + x for x in range(20)]  
  
    start_time = time.time()  
    find_sums(numbers)  
    duration = time.time() - start_time  
    print(f"{duration}秒")

八核处理器,开八个过程开始跑:

1.1755797863006592 秒

显而易见,并行形式无效进步了计算效率。

最初,既然之前用并行形式运行了 IO 密集型工作,咱们就再来试试用并发的形式运行 CPU 密集型工作:

import concurrent.futures  
import time  
  
  
def cpu_bound(number):  
    return sum(i * i for i in range(number))  
  
  
def find_sums(numbers):  
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:  
        executor.map(cpu_bound, numbers)  
  
  
if __name__ == "__main__":  
    numbers = [5_000_000 + x for x in range(20)]  
  
    start_time = time.time()  
    find_sums(numbers)  
    duration = time.time() - start_time  
    print(f"{duration}秒")

单过程开 8 个线程,走起:

4.452666759490967 秒

如何?和并行形式运行 IO 密集型工作一样,能够运行,然而没有任何意义。为什么?因为没有任何 IO 操作了,CPU 不须要期待了,CPU 只有全力运算即可,所以你上多线程或者协程,无非就是画龙点睛、多此一举。

结语

有教训的汽修徒弟会通知你,想省油就选 CVT 和双离合,想品质稳固就选 AT,常常高速上强烈驾驶就选双离合,常常市区内堵车就选 CVT;同样地,作为经验丰富的后盾研发,你也能够通知汽修徒弟,任何不须要 CPU 期待的工作就抉择并行 (multiprocessing) 的解决形式,而须要 CPU 等待时间过长的工作,抉择并发(threading/asyncio)。反过来,我就想用 CVT 在高速上飙车,用双离合在市区堵车,行不行?行,但没有意义,或者说的更精确一些,没有任何额定的收益;而用并发形式执行 CPU 密集型工作,用并行形式执行 IO 密集型工作行不行?也行,但仍然没有任何额定的收益,无他,唯物无定味,适口者珍矣。

原文转载自「刘悦的技术博客」https://v3u.cn/a_id_221

正文完
 0