共计 2995 个字符,预计需要花费 8 分钟才能阅读完成。
动动发财的小手,点个赞吧!
简介
困扰
在 Python 中应用并发编程来提高效率对于数据科学家来说并不常见。在后盾察看各种子过程或并发线程以放弃我的计算或 IO 绑定工作的程序总是令人满意的。
然而还有一点困扰我的是,当我在后盾并发解决成千盈百个文件或者执行成千盈百个过程时,我总是放心会不会有几个工作偷偷挂了,整个代码永远跑不完。我也很难晓得代码当初在哪里执行。
最蹩脚的是,当我看着一个空白屏幕时,很难说出我的代码须要多长时间能力执行或 ETA 是多少。这对我安顿工作日程的能力十分不利。
因而,我想要一种办法让我晓得代码执行到了哪里。
已有办法
比拟传统的做法是工作之间共享一块内存区域,在这块内存区域放一个计数器,当一个工作完结的时候让这个计数器 +1,而后用一个线程不停的打印这个计数器的值。
这素来都不是一个好的解决方案:一方面,我须要在你现有的业务逻辑中增加一段用于计数的代码,这违反了“低耦合,高内聚”的准则。另一方面,因为线程平安问题,我必须十分小心锁定机制,这会导致不必要的性能问题。
tqdm
有一天,我发现了 tqdm 库,它应用进度条来可视化我的代码进度。我能够应用进度条来可视化我的 asyncio 工作的实现和预计达到工夫吗?
那么本文我把这个办法分享给大家,让每个程序员都有机会监控本人并发工作的进度。
异步
在咱们开始之前,我心愿您理解一些 Python asyncio 的背景常识。我的文章形容了 asyncio 的一些罕用 API 的用法,这将有助于咱们更好地了解 tqdm 的设计:
tqdm 概述
如官方网站所述,tqdm 是一个显示循环进度条的工具。它应用简略、高度可定制并且占用资源少。
一个典型的用法是将一个可迭代对象传递给 tqdm 构造函数,而后你会失去一个如下所示的进度条:
from time import sleep | |
from tqdm import tqdm | |
def main(): | |
for _ in tqdm(range(100)): | |
# do something in the loop | |
sleep(0.1) | |
if __name__ == "__main__": | |
main() |
或者您能够在读取文件时手动浏览并更新进度条的进度:
import os | |
from tqdm import tqdm | |
def main(): | |
filename = "../data/large-dataset" | |
with (tqdm(total=os.path.getsize(filename)) as bar, | |
open(filename, "r", encoding="utf-8") as f): | |
for line in f: | |
bar.update(len(line)) | |
if __name__ == "__main__": | |
main() |
将 tqdm 与异步集成
总体而言,tqdm 十分易于应用。然而,GitHub 上须要更多对于将 tqdm 与 asyncio 集成的信息。所以我深入研究了源代码,看看 tqdm 是否反对 asyncio。
侥幸的是,最新版本的 tqdm 提供了包 tqdm.asyncio,它提供了类 tqdm_asyncio。
tqdm_asyncio 类有两个相干的办法。一个是 tqdm_asyncio.as_completed。从源码能够看出,它是对 asyncio.as_completed 的包装:
@classmethod | |
def as_completed(cls, fs, *, loop=None, timeout=None, total=None, **tqdm_kwargs): | |
"""Wrapper for `asyncio.as_completed`.""" | |
if total is None: | |
total = len(fs) | |
kwargs = {} | |
if version_info[:2] < (3, 10): | |
kwargs['loop'] = loop | |
yield from cls(asyncio.as_completed(fs, timeout=timeout, **kwargs), | |
total=total, **tqdm_kwargs) |
另一个是 tqdm_asyncio.gather,从源代码能够看出,它基于模仿 asyncio.gather 性能的 tqdm_asyncio.as_completed 的实现:
@classmethod | |
async def gather(cls, *fs, loop=None, timeout=None, total=None, **tqdm_kwargs): | |
"""Wrapper for `asyncio.gather`.""" | |
async def wrap_awaitable(i, f): | |
return i, await f | |
ifs = [wrap_awaitable(i, f) for i, f in enumerate(fs)] | |
res = [await f for f in cls.as_completed(ifs, loop=loop, timeout=timeout, | |
total=total, **tqdm_kwargs)] | |
return [i for _, i in sorted(res)] |
所以,接下来,我将形容这两个 API 的用法。在开始之前,咱们还须要做一些筹备工作。在这里,我写了一个简略的办法来模仿一个随机休眠工夫的并发工作:
import asyncio | |
import random | |
from tqdm.asyncio import tqdm_asyncio | |
class AsyncException(Exception): | |
def __int__(self, message): | |
super.__init__(self, message) | |
async def some_coro(simu_exception=False): | |
delay = round(random.uniform(1.0, 5.0), 2) | |
# We will simulate throwing an exception if simu_exception is True | |
if delay > 4 and simu_exception: | |
raise AsyncException("something wrong!") | |
await asyncio.sleep(delay) | |
return delay |
紧接着,咱们将创立 2000 个并发工作,而后应用 tqdm_asyncio.gather 而不是相熟的 asyncio.gather 办法来查看进度条是否失常工作:
async def main(): | |
tasks = [] | |
for _ in range(2000): | |
tasks.append(some_coro()) | |
await tqdm_asyncio.gather(*tasks) | |
print(f"All tasks done.") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
或者让咱们用 tqdm_asyncio.as_completed 替换 tqdm_asyncio.gather 并重试:
async def main(): | |
tasks = [] | |
for _ in range(2000): | |
tasks.append(some_coro()) | |
for done in tqdm_asyncio.as_completed(tasks): | |
await done | |
print(f"The tqdm_asyncio.as_completed also works fine.") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
本文由 mdnice 多平台公布