共计 5314 个字符,预计需要花费 14 分钟才能阅读完成。
本文首发于:行者 AI
为什么要用协程,通常在 Python 中咱们进行并发编程都是应用多线程或者多过程来实现的,对于计算型工作因为 GIL 的存在咱们通常应用多过程来实现,而对于 IO 型工作咱们能够通过线程调度来让线程在执行 IO 工作时让出 GIL,从而实现外表上的并发。
协程是运行在单线程当中的“并发”,协程相比多线程一大劣势就是省去了多线程之间的切换开销,取得了更大的运行效率。本文不会对 python 协程的实现机制展开讨论,只是通过简略的示例展现协程最常见的用法,能够很快上手一些基于协程的高性能的 web 框架,比方 FastAPI。
1. 家务活
假如要干三样家务活,别离是烧水、洗衣服和扫地,作为一个程序员,干活前我总是布局的很有条理,上面是我布局的具体任务线:
- 烧水壶接水
- 期待烧水壶烧水
- 洗衣机放衣服和退出热水
- 期待洗衣机洗衣服
- 晾衣服
- 扫地
想一想我就像那个勤奋的 CPU,不过我还有很多机器能够帮忙我干这些活,或者对于 CPU 来说,网卡就像烧水壶,硬盘就像洗衣机。再来剖析一下,烧水和洗衣服是同一类活,咱们须要做的是把水接到水壶里或者把衣服放进洗衣机里而后关上开关,具体的细节有机器帮咱们实现。扫地又是另外一类活,因为没有机器人帮我做,所以须要我本人扫。
如果将烧水和洗衣服类比为 IO 型工作,扫地就是计算密集型工作。
2. 程序形容
上一节中的家务活用程序模仿一下:
程序 | 家务活 |
---|---|
计算 1 +2 | 烧水壶接水 |
通过网络读取另一台计算机上保留的一个加数 | 期待烧水壶烧水 |
累加求和失去磁盘上保留乘数的文件名 | 洗衣机放衣服和退出热水 |
从磁盘文件读取保留的一个乘数 | 期待洗衣机洗衣服 |
累加求的后果和乘数做乘积 | 晾衣服 |
计算 0 到 10000 的累加和 | 扫地 |
def get_network_number() -> int:
"""通过网络获取一个整数"""
...
def get_file_number(filename: str) -> int:
"""读取磁盘文件获取一个整数"""
...
def cumulative_sum(start: int, end: int) -> int:
"""累加求和"""
sum = 0
for number in range(start, end):
sum += number
return sum
def task():
"""工作"""
result = 1 + 2
network_number = get_network_number()
result += network_number
file_number = get_file_number(f"{result}.txt")
result *= file_number
sum = cumulative_sum(0, 10000) task()
3. 问题剖析及程序改良
母亲作为咱们的管家看不下去了,一看我就在偷懒,扫地和另外的两个工作齐全没有先后关系,烧水壶烧水和洗衣机工作的时候还能够去扫地,于是就开始指挥我干活,比方在水烧到一半的时候安顿我去扫地,地还没扫完又安顿我去烧水。
在管家的安顿下我这个人力资源被高效的利用了起来,很难有机会闲下来。
这个管家就如同咱们的操作系统,于是就有了如下优化后的代码:
from threading import Thread
def get_network_number() -> int:
"""通过网络获取一个整数"""
...
def get_file_number(filename: str) -> int:
"""读取磁盘文件获取一个整数"""
...
def cumulative_sum(start: int, end: int) -> int:
"""累加求和"""
sum = 0
for number in range(start, end):
sum += number
return sum
def task1():
""工作 1"""
result = 1 + 2
network_number = get_network_number()
result += network_number
file_number = get_file_number()
result *= file_number
def task2():
"""工作 2"""
sum = cumulative_sum(0, 10000)
t1 = Thread(target=task1)
t2 = Thread(target=task2)
t1.start()
t2.start()
t1.join()
t2.join()
扫地跟烧水洗衣服没多大关系,是一个须要咱们另外执行的工作,这两个工作是并发的关系,所以咱们能够将这个工作安顿到另一个线程中。于是 CPU 就会在两个线程之间来回的切换,同时执行两个工作。
这种由操作系统指挥的形式有一个很大的弊病,须要频繁的切换工作,这节约了很多的工夫。
4. 引入协程
像我这样聪慧的人是不须要管家指挥的,烧水壶接完水关上开关之后,我间接拿起扫帚开始扫地,再也不会傻傻的等了,于是就有了上面这个运行逻辑:
工作 1 | 工作 2 |
---|---|
烧水壶接水 | |
期待烧水壶烧水 | |
实现扫地 | |
洗衣机放衣服和退出热水 | |
期待洗衣机洗衣服 | |
晾衣服 |
这样干活比管家指挥好多了,也不用浪费来回的工作切换工夫,本人依据状况本人安顿,上面是最新的协程代码实现:
import asyncio
# io 工作改为协程
async def get_network_number() -> int:
"""通过网络获取一个整数"""
...
# io 工作改为协程
async def get_file_number(filename) -> int:
"""读取磁盘文件获取一个整数"""
...
# 计算密集型工作不必改协程
def cumulative_sum(start: int, end: int) -> int:
"""累加求和"""
sum = 0
for number in range(start, end):
sum += number
return sum
async def task1():
"""工作 1"""
result = 1 + 2
network_number = await get_network_number()
result += network_number
file_number = await get_file_number(f'{result}.txt')
result *= file_number
async def task2():
"""工作 2"""
sum = cumulative_sum(0, 10000)
async def main():
task1 = asyncio.create_task(task1())
task2 = asyncio.create_task(task2())
await task1
await task2
asyncio.run(main())
协程版本咱们会发现这样一个问题,扫地是一个计算密集型工作,所以干起活来就停不下来,水可能曾经烧好了,然而必须干完扫地这个活能力回去洗衣服。
为了解决这个问题,扫地的途中能够被动停一下,能够把扫地的活分几次干,这样不就能够去看看有没有其余活能够干了吗。在 python 协程中咱们能够用 asyncio.sleep 让咱们停下手头的活去干其它的活,上面是对计算密集型工作的革新:
async def cumulative_sum(start: int, end: int):
result = 0
for i in range(start, end):
if i % 100 == 0:
await asyncio.sleep(1)
result += i
return result
async def task2():
"""工作 2"""
sum = await cumulative_sum(0, 10000)
咱们每加 100 次就去看一下还有没有其它活能够干,就好比水烧好了咱们就能够去洗衣服了,衣服放到洗衣机里咱们再回来,如果啥活都没有就劳动会,工夫到了持续干(sleep 是有工夫长度的)。
上述的思路是咱们把扫地一个工作分成了好几件事件做。
5. 工作和事件
通过前几节的剖析,咱们从家务活中找到了两个很重要的概念:
- 工作
- 事件
咱们发现工作是由很多有程序关系的事件组成的,咱们实现各类工作的时候都是在做一件一件的事件。
回头剖析一下 python 程序,咱们从中找一下哪些能够对应日常生活中的事件。
对于 task1
async def task1():
"""工作 1"""
result = 1 + 2
network_number = await get_network_number()
result += network_number
file_number = await get_file_number(f"{result}.txt")
result *= file_number
咱们发现了这些 CPU 要干的事件(简化起见疏忽网络申请和读取文件时 CPU 应用,同样疏忽 netwok_number 和 file_number 的赋值操作):
1) result = 1 + 2
2) result += network_number
3) result *= file_number
对于 task2
async def task2():
"""工作 2"""
sum = await cumulative_sum(0, 10000)
因为 task2 执行的逻辑在 cumulative_sum 中,所以咱们还要持续剖析 cumulative_sum 这个协程产生的 事件。
async def cumulative_sum(start: int, end: int):
result = 0
for i in range(start, end):
if i % 100 == 0:
await asyncio.sleep(1)
result += i
return result
累加 100 次咱们看作一个事件,那么 task2 这个工作就是由很多累加 100 次的事件组成的。咱们通过下面的剖析能够看到,生存中的事件就是 python 协程中的事件,await 就是很显著的事件宰割点。
咱们的程序能够由很多并发的工作组成,这些工作当中又蕴含着大量的事件,程序理论执行过程中的最小单位是这些事件。
6. 事件循环
咱们按执行事件的思路实现程序的整个执行过程,应该怎么实现呢?
咱们能够创立一个循环,这个循环就是用来执行事件的。最开始这个循环外面什么都没有,之后咱们创立了一个工作,这个工作外面有好几个事件,于是咱们先把这个工作里的第一个事件放到事件循环中,于是事件循环执行咱们放入的这个事件,当这个事件完结的时候咱们再把之后须要执行的事件再放到事件循环中,就这样通过有序的屡次事件增加之后事件循环执行完了咱们工作里所有的事件,工作完结。
因为事件循环一次只能执行一个事件,当咱们有好几个工作的时候,事件就会排起队顺次期待执行。
7. 细节探讨
咱们来看看文件读取整数的操作,失常的读取是这样的:
async def get_file_number(filename):
with open(filename) as f:
number = int(f.read())
return number
咱们发现读取操作中没有 await,它的执行和咱们未退出 asyncio.sleep 的 cumulative_sum 是一样的,所以主程序即使是在做磁盘 IO 的时候也是在期待状态的,不会去执行其它事件,咱们须要对磁盘 IO 操作也做革新解决,以最大化利用 CPU 资源。
这个时候线程就派上用场了,python 提供的革新是这样的:
import asyncio
def _read_file(fd):
return fd.read()
async def get_file_number(filename):
loop = asyncio.get_event_loop()
with open(filename) as f:
number = await loop.run_in_executor(None, _read_file, f)
return int(number)
通过操作系统线程的调度,咱们将磁盘 IO 的操作宰割进来,给其它事件让出肯定的执行权,就好比两个事件能够抢占 CPU 资源,具体哪个执行,操作系统来裁决。同样的 time.sleep 也会阻塞事件循环,所以在应用协程的时候要用 asyncio.sleep。上述的革新形式同样能够用于 cumulative_sum 的革新,替换掉原来的 asyncio.sleep 的革新形式,改用线程执行在 python3.9 中有了更好用的 asyncio.to_thread,协程应用的细节还是得仔细阅读 python 官网文档。
8. 协程的用武之地
通过上一节的细节探讨又引出了两个问题:
1)为什么磁盘 IO 须要用线程调度而网络 IO 不须要?
2)引入线程革新后的协程不是还会存在频繁的工作切换节约 CPU 工夫吗,这样做效率会比多线程形式高吗?
了解上述两个问题是咱们灵便应用 python 协程的要害,以下几点是我集体的了解心得,并未剖析源码,仅供参考:
- 网络编程中有同步和异步的形式,异步的形式就是 IO 多路复用。
- IO 多路复用反对的文件描述符类型和操作系统无关。
- python 协程中工作的切换依赖于 IO 多路复用。
- Windows 下磁盘 IO 不反对 IO 多路复用,即使有操作系统反对,如果规范库未做封装,须要咱们本人封装。
- 如果程序中未波及网络 IO,那么应用协程并不能无效的升高工作切换的开销,但协程良好的同步编程形式仍旧可用。
- 不同的编程语言对协程的实现有所不同,应用办法和利用场景也不尽相同。