关于程序员:使用这些方法让你的-Python-并发任务执行得更好

56次阅读

共计 5520 个字符,预计需要花费 14 分钟才能阅读完成。

动动发财的小手,点个赞吧!

问题

始终以来,Python 的多线程性能因为 GIL 而始终没有达到预期。

所以从 3.4 版本开始,Python 引入了 asyncio 包,通过并发的形式并发执行 IO-bound 工作。通过屡次迭代,asyncio API 的成果十分好,并发工作的性能相比多线程版本有了很大的晋升。

然而,程序员在应用 asyncio 时还是会犯很多谬误:

一个谬误如下图所示,间接应用 await 协程办法,将对并发工作的调用从异步变为同步,最终失去并发个性。

async def main():
    result_1 = await some_coro("name-1")
    result_2 = await some_coro("name-2")

另一个谬误如下图所示,尽管程序员意识到他须要应用 create_task 创立一个工作在后盾执行。而上面这种一个一个期待工作的形式,将不同时序的工作变成了有序的期待。

async def main():
    task_1 = asyncio.create_task(some_coro("name-1"))
    task_2 = asyncio.create_task(some_coro("name-2"))
    
    result_1 = await task_1
    result_2 = await task_2

此代码将期待 task_1 先实现,而不论 task_2 是否先实现。

什么是并发工作执行?

那么,什么是真正的并发工作呢?咱们用一张图来阐明:

如图所示,一个并发流程应该由两局部组成:启动后台任务,将后台任务重新加入主函数,并获取后果。

大多数读者曾经晓得如何应用 create_task 启动后台任务。明天,我将介绍几种期待后台任务实现的办法以及每种办法的最佳实际。

开始

在开始介绍明天的配角之前,咱们须要筹备一个示例 async 办法来模仿 IO 绑定的办法调用,以及一个自定义的 AsyncException,能够用来在测试抛出异样时敌对地提醒异样信息:

from random import random, randint
import asyncio


class AsyncException(Exception):
    def __init__(self, message, *args, **kwargs):
        self.message = message
        super(*args, **kwargs)

    def __str__(self):
        return self.message


async def some_coro(name):
    print(f"Coroutine {name} begin to run")
    value = random()

    delay = randint(1, 4)
    await asyncio.sleep(delay)
    if value > 0.5:
        raise AsyncException(f"Something bad happen after delay {delay} second(s)")
    print(f"Coro {name} is Done. with delay {delay} second(s)")
    return value

并发执行办法比拟

1. asyncio.gather

asyncio.gather 可用于启动一组后台任务,期待它们实现执行,并获取后果列表:

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))

    results = await asyncio.gather(*aws)  # need to unpack the list
    for result in results:
        print(f">got : {result}")

asyncio.run(main())

asyncio.gather 尽管组成了一组后台任务,但不能间接承受一个列表或汇合作为参数。如果须要传入蕴含后台任务的列表,请解包。

asyncio.gather 承受一个 return_exceptions 参数。当 return_exception 的值为 False 时,任何后台任务抛出异样,都会抛给 gather 办法的调用者。而 gather 办法的后果列表是空的。

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))

    try:
        results = await asyncio.gather(*aws, return_exceptions=False)  # need to unpack the list
    except AsyncException as e:
        print(e)
    for result in results:
        print(f">got : {result}")

asyncio.run(main())

当 return_exception 的值为 True 时,后台任务抛出的异样不会影响其余工作的执行,最终会合并到后果列表中一起返回。

results = await asyncio.gather(*aws, return_exceptions=True)

接下来咱们看看为什么 gather 办法不能间接承受一个列表,而是要对列表进行解包。因为当一个列表被填满并执行时,咱们很难在期待工作实现时向列表中增加新工作。然而 gather 办法能够应用嵌套组将现有工作与新工作混合,解决了两头无奈增加新工作的问题:

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))
    group_1 = asyncio.gather(*aws)  # note we don't use await now
    # when some situation happen, we may add a new task
    group_2 = asyncio.gather(group_1, asyncio.create_task(some_coro("a new task")))
    results = await group_2
    for result in results:
        print(f">got : {result}")

asyncio.run(main())

然而 gather 不能间接设置 timeout 参数。如果须要为所有正在运行的工作设置超时工夫,就用这个姿态,不够优雅。

async def main():
    aws, results = [], []
    for i in range(3):
        aws.append(asyncio.create_task(some_coro(f'name-{i}')))

    results = await asyncio.wait_for(asyncio.gather(*aws), timeout=2)
    for result in results:
        print(f">got : {result}")

asyncio.run(main())

2. asyncio.as_completed

有时,咱们必须在实现一个后台任务后立刻开始上面的动作。比方咱们爬取一些数据,马上调用机器学习模型进行计算,gather 办法不能满足咱们的需要,然而咱们能够应用 as_completed 办法。

在应用 asyncio.as_completed 办法之前,咱们先看一下这个办法的源码。

# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
def as_completed(fs, *, timeout=None):
  # ...
  for f in todo:
      f.add_done_callback(_on_completion)
  if todo and timeout is not None:
      timeout_handle = loop.call_later(timeout, _on_timeout)
  for _ in range(len(todo)):
      yield _wait_for_one()

源码显示 as_completed 不是并发办法,返回一个带有 yield 语句的迭代器。所以咱们能够间接遍历每个实现的后台任务,咱们能够对每个工作独自解决异样,而不影响其余工作的执行:

async def main():
    aws = []
    for i in range(5):
        aws.append(asyncio.create_task(some_coro(f"name-{i}")))

    for done in asyncio.as_completed(aws):  # we don't need to unpack the list
        try:
            result = await done
            print(f">got : {result}")
        except AsyncException as e:
            print(e)

asyncio.run(main())

as_completed 承受超时参数,超时后以后迭代的工作会抛出 asyncio.TimeoutError:

async def main():
    aws = []
    for i in range(5):
        aws.append(asyncio.create_task(some_coro(f"name-{i}")))

    for done in asyncio.as_completed(aws, timeout=2):  # we don't need to unpack the list
        try:
            result = await done
            print(f">got : {result}")
        except AsyncException as e:
            print(e)
        except asyncio.TimeoutError: # we need to handle the TimeoutError
            print("time out.")

asyncio.run(main())

as_complete 在解决工作执行的后果方面比 gather 灵便很多,然而在期待的时候很难往原来的工作列表中增加新的工作。

3. asyncio.wait

asyncio.wait 的调用形式与 as_completed 雷同,但返回一个蕴含两个汇合的元组:done 和 pending。done 保留已实现执行的工作,而 pending 保留仍在运行的工作。

asyncio.wait 承受一个 return_when 参数,它能够取三个枚举值:

  • 当 return_when 为 asyncio.ALL_COMPLETED 时,done 寄存所有实现的工作,pending 为空。
  • 当 return_when 为 asyncio.FIRST_COMPLETED 时,done 持有所有已实现的工作,而 pending 持有仍在运行的工作。
async def main():
    aws = set()
    for i in range(5):
        aws.add(asyncio.create_task(some_coro(f"name-{i}")))

    done, pending = await asyncio.wait(aws, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        try:
            result = await task
            print(f">got : {result}")
        except AsyncException as e:
            print(e)
    print(f"the length of pending is {len(pending)}")

asyncio.run(main())

  • 当 return_when 为 asyncio.FIRST_EXCEPTION 时,done 寄存抛出异样并执行结束的工作,pending 寄存仍在运行的工作。

当 return_when 为 asyncio.FIRST_COMPLETED 或 asyncio.FIRST_EXECEPTION 时,咱们能够递归调用 asyncio.wait,这样咱们就能够增加新的工作,并依据状况始终期待所有工作实现。

async def main():
    pending = set()
    for i in range(5):
        pending.add(asyncio.create_task(some_coro(f"name-{i}")))  # note the type and name of the task list

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in done:
            try:
                result = await task
                print(f">got : {result}")
            except AsyncException as e:
                print(e)
                pending.add(asyncio.create_task(some_coro("a new task")))
    print(f"the length of pending is {len(pending)}")

asyncio.run(main())

4. asyncio.TaskGroup

在 Python 3.11 中,asyncio 引入了新的 TaskGroup API,正式让 Python 反对结构化并发。此性能容许您以更 Pythonic 的形式治理并发工作的生命周期。

总结

本文介绍了 asyncio.gather、asyncio.as_completed 和 asyncio.wait API,还回顾了 Python 3.11 中引入的新 asyncio.TaskGroup 个性。

依据理论须要应用这些后台任务治理形式能够让咱们的 asyncio 并发编程更加灵便。

本文由 mdnice 多平台公布

正文完
 0