动动发财的小手,点个赞吧!
简介
多亏了 GIL,应用多个线程来执行 CPU 密集型工作素来都不是一种抉择。随着多核 CPU 的遍及,Python 提供了一种多解决解决方案来执行 CPU 密集型工作。然而直到现在,间接应用多过程相干的 API 还是存在一些问题。
在本文开始之前,咱们还有一小段代码来帮忙演示:
import time
from multiprocessing import Process
def sum_to_num(final_num: int) -> int:
start = time.monotonic()
result = 0
for i in range(0, final_num+1, 1):
result += i
print(f"The method with {final_num} completed in {time.monotonic() - start:.2f} second(s).")
return result
该办法承受一个参数并从 0 开始累加到该参数。打印办法执行工夫并返回后果。
多过程存在的问题
def main():
# We initialize the two processes with two parameters, from largest to smallest
process_a = Process(target=sum_to_num, args=(200_000_000,))
process_b = Process(target=sum_to_num, args=(50_000_000,))
# And then let them start executing
process_a.start()
process_b.start()
# Note that the join method is blocking and gets results sequentially
start_a = time.monotonic()
process_a.join()
print(f"Process_a completed in {time.monotonic() - start_a:.2f} seconds")
# Because when we wait process_a for join. The process_b has joined already.
# so the time counter is 0 seconds.
start_b = time.monotonic()
process_b.join()
print(f"Process_b completed in {time.monotonic() - start_b:.2f} seconds")
如代码所示,咱们间接创立并启动多个过程,调用每个过程的 start 和 join 办法。然而,这里存在一些问题:
- join 办法不能返回工作执行的后果。
- join 办法阻塞主过程并按程序执行它。
即便前面的工作比后面的工作执行得更快,如下图所示:
应用池的问题
如果咱们应用 multiprocessing.Pool,也会存在一些问题:
def main():
with Pool() as pool:
result_a = pool.apply(sum_to_num, args=(200_000_000,))
result_b = pool.apply(sum_to_num, args=(50_000_000,))
print(f"sum_to_num with 200_000_000 got a result of {result_a}.")
print(f"sum_to_num with 50_000_000 got a result of {result_b}.")
如代码所示,Pool 的 apply 办法是同步的,这意味着您必须期待之前的 apply 工作实现能力开始执行下一个 apply 工作。
当然,咱们能够应用 apply_async 办法异步创立工作。然而同样,您须要应用 get 办法来阻塞地获取后果。它让咱们回到 join 办法的问题:
def main():
with Pool() as pool:
result_a = pool.apply_async(sum_to_num, args=(200_000_000,))
result_b = pool.apply_async(sum_to_num, args=(50_000_000,))
print(f"sum_to_num with 200_000_000 got a result of {result_a.get()}.")
print(f"sum_to_num with 50_000_000 got a result of {result_b.get()}.")
间接应用 ProcessPoolExecutor 的问题
那么,如果咱们应用 concurrent.futures.ProcesssPoolExecutor 来执行咱们的 CPU 绑定工作呢?
def main():
with ProcessPoolExecutor() as executor:
numbers = [200_000_000, 50_000_000]
for result in executor.map(sum_to_num, numbers):
print(f"sum_to_num got a result which is {result}.")
如代码所示,所有看起来都很棒,并且就像 asyncio.as_completed 一样被调用。然而看看后果;它们仍按启动程序获取。这与 asyncio.as_completed 齐全不同,后者依照执行程序获取后果:
应用 asyncio 的 run_in_executor 修复
侥幸的是,咱们能够应用 asyncio 来解决 IO-bound 工作,它的 run_in_executor 办法能够像 asyncio 一样调用多过程工作。不仅对立了并发和并行的 API,还解决了咱们下面遇到的各种问题:
async def main():
loop = asyncio.get_running_loop()
tasks = []
with ProcessPoolExecutor() as executor:
for number in [200_000_000, 50_000_000]:
tasks.append(loop.run_in_executor(executor, sum_to_num, number))
# Or we can just use the method asyncio.gather(*tasks)
for done in asyncio.as_completed(tasks):
result = await done
print(f"sum_to_num got a result which is {result}")
因为上一篇的示例代码都是模仿咱们应该调用的并发过程的办法,所以很多读者在学习之后在理论编码中还是须要帮忙了解如何应用。所以在理解了为什么咱们须要在 asyncio 中执行 CPU-bound 并行任务之后,明天咱们将通过一个真实世界的例子来解释如何应用 asyncio 同时解决 IO-bound 和 CPU-bound 工作,并领略 asyncio 对咱们的效率代码。
本文由 mdnice 多平台公布