乐趣区

关于程序员:Python-结合多进程和-Asyncio-以提高性能

动动发财的小手,点个赞吧!

简介

多亏了 GIL,应用多个线程来执行 CPU 密集型工作素来都不是一种抉择。随着多核 CPU 的遍及,Python 提供了一种多解决解决方案来执行 CPU 密集型工作。然而直到现在,间接应用多过程相干的 API 还是存在一些问题。

在本文开始之前,咱们还有一小段代码来帮忙演示:

import time
from multiprocessing import Process


def sum_to_num(final_num: int) -> int:
    start = time.monotonic()

    result = 0
    for i in range(0, final_num+1, 1):
        result += i

    print(f"The method with {final_num} completed in {time.monotonic() - start:.2f} second(s).")
    return result

该办法承受一个参数并从 0 开始累加到该参数。打印办法执行工夫并返回后果。

多过程存在的问题

def main():
    # We initialize the two processes with two parameters, from largest to smallest
    process_a = Process(target=sum_to_num, args=(200_000_000,))
    process_b = Process(target=sum_to_num, args=(50_000_000,))

    # And then let them start executing
    process_a.start()
    process_b.start()

    # Note that the join method is blocking and gets results sequentially
    start_a = time.monotonic()
    process_a.join()
    print(f"Process_a completed in {time.monotonic() - start_a:.2f} seconds")

    # Because when we wait process_a for join. The process_b has joined already.
    # so the time counter is 0 seconds.
    start_b = time.monotonic()
    process_b.join()
    print(f"Process_b completed in {time.monotonic() - start_b:.2f} seconds")

如代码所示,咱们间接创立并启动多个过程,调用每个过程的 start 和 join 办法。然而,这里存在一些问题:

  1. join 办法不能返回工作执行的后果。
  2. join 办法阻塞主过程并按程序执行它。

即便前面的工作比后面的工作执行得更快,如下图所示:

应用池的问题

如果咱们应用 multiprocessing.Pool,也会存在一些问题:

def main():
    with Pool() as pool:
        result_a = pool.apply(sum_to_num, args=(200_000_000,))
        result_b = pool.apply(sum_to_num, args=(50_000_000,))

        print(f"sum_to_num with 200_000_000 got a result of {result_a}.")
        print(f"sum_to_num with 50_000_000 got a result of {result_b}.")

如代码所示,Pool 的 apply 办法是同步的,这意味着您必须期待之前的 apply 工作实现能力开始执行下一个 apply 工作。

当然,咱们能够应用 apply_async 办法异步创立工作。然而同样,您须要应用 get 办法来阻塞地获取后果。它让咱们回到 join 办法的问题:

def main():
    with Pool() as pool:
        result_a = pool.apply_async(sum_to_num, args=(200_000_000,))
        result_b = pool.apply_async(sum_to_num, args=(50_000_000,))

        print(f"sum_to_num with 200_000_000 got a result of {result_a.get()}.")
        print(f"sum_to_num with 50_000_000 got a result of {result_b.get()}.")

间接应用 ProcessPoolExecutor 的问题

那么,如果咱们应用 concurrent.futures.ProcesssPoolExecutor 来执行咱们的 CPU 绑定工作呢?

def main():
    with ProcessPoolExecutor() as executor:
        numbers = [200_000_000, 50_000_000]
        for result in executor.map(sum_to_num, numbers):
            print(f"sum_to_num got a result which is {result}.")

如代码所示,所有看起来都很棒,并且就像 asyncio.as_completed 一样被调用。然而看看后果;它们仍按启动程序获取。这与 asyncio.as_completed 齐全不同,后者依照执行程序获取后果:

应用 asyncio 的 run_in_executor 修复

侥幸的是,咱们能够应用 asyncio 来解决 IO-bound 工作,它的 run_in_executor 办法能够像 asyncio 一样调用多过程工作。不仅对立了并发和并行的 API,还解决了咱们下面遇到的各种问题:

async def main():
    loop = asyncio.get_running_loop()
    tasks = []

    with ProcessPoolExecutor() as executor:
        for number in [200_000_000, 50_000_000]:
            tasks.append(loop.run_in_executor(executor, sum_to_num, number))
        
        # Or we can just use the method asyncio.gather(*tasks)
        for done in asyncio.as_completed(tasks):
            result = await done
            print(f"sum_to_num got a result which is {result}")

因为上一篇的示例代码都是模仿咱们应该调用的并发过程的办法,所以很多读者在学习之后在理论编码中还是须要帮忙了解如何应用。所以在理解了为什么咱们须要在 asyncio 中执行 CPU-bound 并行任务之后,明天咱们将通过一个真实世界的例子来解释如何应用 asyncio 同时解决 IO-bound 和 CPU-bound 工作,并领略 asyncio 对咱们的效率代码。

本文由 mdnice 多平台公布

退出移动版