原文转载自「刘悦的技术博客」https://v3u.cn/a_id_221

个别状况下,大家对Python原生的并发/并行工作形式:过程、线程和协程的关系与区别都能讲清楚。甚至具体的对象名称、内置办法都能够一五一十,这显然是极好的,但咱们其实都疏忽了一个问题,就是具体利用场景,三者的应用目标是一样的,话句话说,应用后果是一样的,都能够进步程序运行的效率,但到底那种场景用那种形式更好一点?

这就好比,目前支流的汽车发动机变速箱无外乎三种:双离合、CVT以及传统AT。主机厂把它们搭载到不同的发动机和车型上,它们都是变速箱,都能够将发动机产生的能源作用到车轮上,但不同应用场景下到底该抉择那种变速箱?这显然也是一个问题。

所谓“无场景,不性能”,本次咱们来讨论一下,具体的并发编程场景有哪些,并且对应到具体场景,应该怎么抉择并发伎俩和形式。

什么是并发和并行?

在探讨场景之前,咱们须要将多任务执行的形式进行一下分类,那就是并发形式和并行形式。教科书上通知咱们:并行是指两个或者多个事件在同一时刻产生;而并发是指两个或多个事件在同一时间距离内产生。 在多道程序环境下,并发性是指在一段时间内宏观上有多个程序在同时运行,但在单处理机零碎中,每一时刻却仅能有一道程序执行,故宏观上这些程序只能是分时地交替执行。

如同有那么一点形象,好吧,让咱们求实一点,因为GIL全局解释器锁的存在,在Python编程畛域,咱们能够简略粗犷地将并发和并行用程序通过是否应用多核CPU来辨别,能应用多核CPU就是并行,不能应用多核CPU,只能单核解决的,就是并发。就这么简略,是的,Python的GIL全局解释器锁帮咱们把问题简化了, 这是Python的大幸?还是可怜?

Python中并发工作实现形式蕴含:多线程threading和协程asyncio,它们的共同点都是交替执行,而区别是多线程threading是抢占式的,而协程asyncio是合作式的,原理也很简略,只有一颗CPU能够用,而一颗CPU一次只能做一件事,所以只能靠不停地切换能力实现并发工作。

Python中并行任务的实现形式是多过程multiprocessing,通过multiprocessing库,Python能够在程序主过程中创立新的子过程。这里的一个过程能够被认为是一个简直齐全不同的程序,只管从技术上讲,它们通常被定义为资源汇合,其中资源包含内存、文件句柄等。换一种说法是,每个子过程都领有本人的Python解释器,因而,Python中的并行任务能够应用一颗以上的CPU,每一颗CPU都能够跑一个过程,是真正的同时运行,而不须要切换,如此Python就能够实现并行任务。

什么时候应用并发?IO密集型工作

当初咱们搞清楚了,Python里的并发运行形式就是多线程threading和协程asyncio,那么什么场景下应用它们?

个别状况下,工作场景,或者说的更精确一些,工作类型,无非两种:CPU密集型工作和IO密集型工作。

什么是IO密集型工作?IO就是Input-Output的缩写,说白了就是程序的输出和输入,想一想的确就是这样,您的电脑,它不就是这两种性能吗?用键盘、麦克风、摄像头输出数据,而后再用屏幕和音箱进行输入操作。

但输出和输入操作要比电脑中的CPU运行速度慢,换句话说,CPU得等着这些比它慢的输出和输入操作,说白了就是CPU运算一会,就得等这些IO操作,等IO操作完了,CPU能力持续运算一会,而后再等着IO操作,如图所示:

由此可知,并发适宜这种IO操作密集和频繁的工作,因为就算CPU是苹果最新ARM架构的M2芯片,也没有用武之地。

另外,如果把IO密集型工作具象化,那就是咱们常常操作的:硬盘读写(数据库读写)、网络申请、文件的打印等等。

并发形式的抉择:多线程threading还是协程asyncio?

既然波及硬盘读写(数据库读写)、网络申请、文件打印等工作都算并发工作,那咱们就真正地实际一下,看看不同的并发形式到底能晋升多少效率?

一个简略的小需要,对本站数据进行反复抓取操作,并计算首页数据文本的行数:

import requests  import time      def download_site(url, session):      with session.get(url) as response:          print(f"下载了{len(response.content)}行数据")      def download_all_sites(sites):      with requests.Session() as session:          for url in sites:              download_site(url, session)      if __name__ == "__main__":        sites = ["https://v3u.cn"] * 50      start_time = time.time()      download_all_sites(sites)      duration = time.time() - start_time      print(f"下载了 {len(sites)}次,执行了{duration}秒")

在不应用任何并发伎俩的前提下,程序返回:

下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了76347行数据  下载了 50 次数据,执行了8.781155824661255秒  [Finished in 9.6s]

这里程序的每一步都是同步操作,也就是说当第一次抓取网站首页时,剩下的49次都在期待。

接着应用多线程threading来革新程序:

import concurrent.futures  import requests  import threading  import time      thread_local = threading.local()      def get_session():      if not hasattr(thread_local, "session"):          thread_local.session = requests.Session()      return thread_local.session      def download_site(url):      session = get_session()      with session.get(url) as response:          print(f"下载了{len(response.content)}行数据")      def download_all_sites(sites):      with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:          executor.map(download_site, sites)      if __name__ == "__main__":        sites = ["https://v3u.cn"] * 50      start_time = time.time()      download_all_sites(sites)      duration = time.time() - start_time      print(f"下载了 {len(sites)}次,执行了{duration}秒")

这里通过with关键词开启线程池上下文管理器,并发8个线程进行下载,程序返回:

下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76161行数据  下载了76424行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了 50次,执行了7.680492877960205秒

很显著,效率上有所晋升,事实上,每个线程其实是在不停“切换”着运行,这就节俭了单线程每次期待爬取后果的工夫:

由此带来了另外一个问题:上下文切换的工夫开销。

让咱们持续革新,用协程来一试矛头,首先装置异步web申请库aiohttp:

pip3 install aiohttp

改写逻辑:

import asyncio  import time  import aiohttp      async def download_site(session, url):      async with session.get(url) as response:          print(f"下载了{response.content_length}行数据")      async def download_all_sites(sites):      async with aiohttp.ClientSession() as session:          tasks = []          for url in sites:              task = asyncio.ensure_future(download_site(session, url))              tasks.append(task)          await asyncio.gather(*tasks, return_exceptions=True)      if __name__ == "__main__":      sites = ["https://v3u.cn"] * 50      start_time = time.time()      asyncio.run(download_all_sites(sites))      duration = time.time() - start_time      print(f"下载了 {len(sites)}次,执行了{duration}秒")

程序返回:

下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76424行数据  下载了76161行数据  下载了76424行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据  下载了76161行数据下载了 50次,执行了6.893810987472534秒

效率上百尺竿头更进一步,同样的应用with关键字操作上下文管理器,协程应用asyncio.ensure\_future()创立工作列表,该列表还负责启动它们。创立所有工作后,应用asyncio.gather()来放弃会话上下文的实例,直到所有爬取工作实现。和多线程threading的区别是,协程并不需要切换上下文,因而每个工作所需的资源和创立工夫要少得多,因而创立和运行更多的工作效率更高:

综上,并发逻辑归根结底是缩小CPU期待的工夫,也就是让CPU少等一会儿,而协程的工作形式显然让CPU期待的工夫起码。

并行形式:多过程multiprocessing

再来试试多过程multiprocessing,并行能不能干并发的事?

import requests  import multiprocessing  import time    session = None      def set_global_session():      global session      if not session:          session = requests.Session()      def download_site(url):      with session.get(url) as response:          name = multiprocessing.current_process().name          print(f"读了{len(response.content)}行")      def download_all_sites(sites):      with multiprocessing.Pool(initializer=set_global_session) as pool:          pool.map(download_site, sites)      if __name__ == "__main__":      sites = ["https://v3u.cn"] * 50      start_time = time.time()      download_all_sites(sites)      duration = time.time() - start_time      print(f"下载了 {len(sites)}次,执行了{duration}秒")

这里咱们仍然应用上下文管理器开启过程池,默认过程数匹配以后计算机的CPU外围数,也就是有几核就开启几个过程,程序返回:

读了76000行  读了76241行  读了76044行  读了75894行  读了76290行  读了76312行  读了76419行  读了76753行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  读了76290行  下载了 50次,执行了8.195281982421875秒

尽管比同步程序要快,但无疑的,效率上要低于多线程和协程。为什么?因为多过程不适宜IO密集型工作,尽管能够利用多核资源,但没有任何意义:

无论开多少过程,CPU都没有用武之地,少数状况下CPU都在期待IO操作,也就是说,多核反而连累了IO程序的执行。

并行形式的抉择:CPU密集型工作

什么是CPU密集型工作?这里咱们能够应用逆定理:所有不波及硬盘读写(数据库读写)、网络申请、文件打印等工作都算CPU密集型工作工作,说白了就是,计算型工作。

以求平方和为例子:

import time      def cpu_bound(number):      return sum(i * i for i in range(number))      def find_sums(numbers):      for number in numbers:          cpu_bound(number)      if __name__ == "__main__":      numbers = [5_000_000 + x for x in range(20)]      start_time = time.time()      find_sums(numbers)      duration = time.time() - start_time      print(f"{duration}秒")

同步执行20次,须要破费多少工夫?

4.466595888137817秒

再来试试并行形式:

import multiprocessing  import time      def cpu_bound(number):      return sum(i * i for i in range(number))      def find_sums(numbers):      with multiprocessing.Pool() as pool:          pool.map(cpu_bound, numbers)      if __name__ == "__main__":      numbers = [5_000_000 + x for x in range(20)]        start_time = time.time()      find_sums(numbers)      duration = time.time() - start_time      print(f"{duration}秒")

八核处理器,开八个过程开始跑:

1.1755797863006592秒

显而易见,并行形式无效进步了计算效率。

最初,既然之前用并行形式运行了IO密集型工作,咱们就再来试试用并发的形式运行CPU密集型工作:

import concurrent.futures  import time      def cpu_bound(number):      return sum(i * i for i in range(number))      def find_sums(numbers):      with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:          executor.map(cpu_bound, numbers)      if __name__ == "__main__":      numbers = [5_000_000 + x for x in range(20)]        start_time = time.time()      find_sums(numbers)      duration = time.time() - start_time      print(f"{duration}秒")

单过程开8个线程,走起:

4.452666759490967秒

如何?和并行形式运行IO密集型工作一样,能够运行,然而没有任何意义。为什么?因为没有任何IO操作了,CPU不须要期待了,CPU只有全力运算即可,所以你上多线程或者协程,无非就是画龙点睛、多此一举。

结语

有教训的汽修徒弟会通知你,想省油就选CVT和双离合,想品质稳固就选AT,常常高速上强烈驾驶就选双离合,常常市区内堵车就选CVT;同样地,作为经验丰富的后盾研发,你也能够通知汽修徒弟,任何不须要CPU期待的工作就抉择并行(multiprocessing)的解决形式,而须要CPU等待时间过长的工作,抉择并发(threading/asyncio)。反过来,我就想用CVT在高速上飙车,用双离合在市区堵车,行不行?行,但没有意义,或者说的更精确一些,没有任何额定的收益;而用并发形式执行CPU密集型工作,用并行形式执行IO密集型工作行不行?也行,但仍然没有任何额定的收益, 无他,唯物无定味,适口者珍矣。

原文转载自「刘悦的技术博客」 https://v3u.cn/a_id_221