动动发财的小手,点个赞吧!

简介

多亏了 GIL,应用多个线程来执行 CPU 密集型工作素来都不是一种抉择。随着多核 CPU 的遍及,Python 提供了一种多解决解决方案来执行 CPU 密集型工作。然而直到现在,间接应用多过程相干的API还是存在一些问题。

在本文开始之前,咱们还有一小段代码来帮忙演示:

import timefrom multiprocessing import Processdef sum_to_num(final_num: int) -> int:    start = time.monotonic()    result = 0    for i in range(0, final_num+1, 1):        result += i    print(f"The method with {final_num} completed in {time.monotonic() - start:.2f} second(s).")    return result

该办法承受一个参数并从 0 开始累加到该参数。打印办法执行工夫并返回后果。

多过程存在的问题

def main():    # We initialize the two processes with two parameters, from largest to smallest    process_a = Process(target=sum_to_num, args=(200_000_000,))    process_b = Process(target=sum_to_num, args=(50_000_000,))    # And then let them start executing    process_a.start()    process_b.start()    # Note that the join method is blocking and gets results sequentially    start_a = time.monotonic()    process_a.join()    print(f"Process_a completed in {time.monotonic() - start_a:.2f} seconds")    # Because when we wait process_a for join. The process_b has joined already.    # so the time counter is 0 seconds.    start_b = time.monotonic()    process_b.join()    print(f"Process_b completed in {time.monotonic() - start_b:.2f} seconds")

如代码所示,咱们间接创立并启动多个过程,调用每个过程的start和join办法。然而,这里存在一些问题:

  1. join 办法不能返回工作执行的后果。
  2. join 办法阻塞主过程并按程序执行它。

即便前面的工作比后面的工作执行得更快,如下图所示:

应用池的问题

如果咱们应用multiprocessing.Pool,也会存在一些问题:

def main():    with Pool() as pool:        result_a = pool.apply(sum_to_num, args=(200_000_000,))        result_b = pool.apply(sum_to_num, args=(50_000_000,))        print(f"sum_to_num with 200_000_000 got a result of {result_a}.")        print(f"sum_to_num with 50_000_000 got a result of {result_b}.")

如代码所示,Pool 的 apply 办法是同步的,这意味着您必须期待之前的 apply 工作实现能力开始执行下一个 apply 工作。

当然,咱们能够应用 apply_async 办法异步创立工作。然而同样,您须要应用 get 办法来阻塞地获取后果。它让咱们回到 join 办法的问题:

def main():    with Pool() as pool:        result_a = pool.apply_async(sum_to_num, args=(200_000_000,))        result_b = pool.apply_async(sum_to_num, args=(50_000_000,))        print(f"sum_to_num with 200_000_000 got a result of {result_a.get()}.")        print(f"sum_to_num with 50_000_000 got a result of {result_b.get()}.")

间接应用ProcessPoolExecutor的问题

那么,如果咱们应用 concurrent.futures.ProcesssPoolExecutor 来执行咱们的 CPU 绑定工作呢?

def main():    with ProcessPoolExecutor() as executor:        numbers = [200_000_000, 50_000_000]        for result in executor.map(sum_to_num, numbers):            print(f"sum_to_num got a result which is {result}.")

如代码所示,所有看起来都很棒,并且就像 asyncio.as_completed 一样被调用。然而看看后果;它们仍按启动程序获取。这与 asyncio.as_completed 齐全不同,后者依照执行程序获取后果:

应用 asyncio 的 run_in_executor 修复

侥幸的是,咱们能够应用 asyncio 来解决 IO-bound 工作,它的 run_in_executor 办法能够像 asyncio 一样调用多过程工作。不仅对立了并发和并行的API,还解决了咱们下面遇到的各种问题:

async def main():    loop = asyncio.get_running_loop()    tasks = []    with ProcessPoolExecutor() as executor:        for number in [200_000_000, 50_000_000]:            tasks.append(loop.run_in_executor(executor, sum_to_num, number))                # Or we can just use the method asyncio.gather(*tasks)        for done in asyncio.as_completed(tasks):            result = await done            print(f"sum_to_num got a result which is {result}")

因为上一篇的示例代码都是模仿咱们应该调用的并发过程的办法,所以很多读者在学习之后在理论编码中还是须要帮忙了解如何应用。所以在理解了为什么咱们须要在asyncio中执行CPU-bound并行任务之后,明天咱们将通过一个真实世界的例子来解释如何应用asyncio同时解决IO-bound和CPU-bound工作,并领略asyncio对咱们的效率代码。

本文由mdnice多平台公布