乐趣区

关于python:Python-协程Coroutine体验

概述

本文通过运行一段 Python 小程序,模仿一个实在的工作。比拟在多线程(Multi-thread)和在多协程(Coroutine)环境下的编程实现。发现和解释一些乏味的景象。以期为大家带来一些对协程的直观感触,加深对这种陈腐事物的了解。

Python 中的协程

协程(coroutine)是一个有很长历史的概念,它是计算机程序的一类组件,推广了合作式多任务的子程序。其具体的概念和历史请参照维基百科中的条目:https://en.wikipedia.org/wiki…
Python 天生反对的生成器 (generator) 其实就是协程的一种实现,生成器容许执行被挂起与被复原。然而因为不足更多语法上的反对,以及不足利用生成器实现异步编程的成熟模式,限度了生成器作为协程参加合作式多任务编程的用处。不过当初状况产生的扭转,Python 自 3.6 版本开始增加了 async/await 的语法间接反对协程的异步编程,同时在 asyncio 库中提供了协程编程的接口以及必要的根底实现。社区也在一直致力为现有的 IO 库提供异步的版本以便用于协程开发环境,例如 http client 目前至多在 aiohttp 以及 tornado 中都提供了可用于协程的异步版本。咱们晓得 IO 操作天生是异步的,为了适应广泛应用的同步编程模式,很多的 IO 库都采纳阻塞调用者的形式来实现同步。这样尽管简化了编程,可也带来的并行度不高的问题。在一些有大量耗时 IO 操作的环境里,利用不得不忍耐串行操作造成的漫长期待,或是转向多过程(Multi-Process)多线程编程以期进步并行水平。而多过程多线程编程又会引入争用、通信,同步、爱护等辣手的问题。而且咱们晓得即便是作为轻量级的线程也会对应一个独立的运行栈。线程的调度和切换不可避免地包含运行栈的切换和加载。如果在一个过程中有成千盈百的线程,那么相应的调度开销会急剧回升到难以忍受的水平。而且线程之间的同步和互锁也将成为一个噩梦。除去 boss 级别的死锁问题,其余任何的 bug 或是缺点在多线程环境下都难于重现和追踪,这是因为线程的调度有很大的随机性。

一个 Python 小程序

上面是一个 Python 的小程序,能够在 Python3.8 或者更新的版本上运行。

import threading
import time
import asyncio


def gen():
    s = 0
    while s < 1000:
        yield s
        s += 1


def unsafe_thread_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            time.sleep(0.01)
            t += v
    except StopIteration:
        print(f"{t}")


async def wrong_coroutine_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            time.sleep(0.01)
            t += v
    except StopIteration:
        print(f"{t}")
        

async def starter_with_wrong_workers():
    tasks = []
    for _ in range(10):
        task = asyncio.create_task(wrong_coroutine_worker(g))
        tasks.append(task)
    await asyncio.gather(*tasks)
    

async def right_coroutine_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            await asyncio.sleep(0.01)
            t += v
    except StopIteration:
        print(f"{t}")
        

async def starter_with_right_workers():
    tasks = []
    for _ in range(10):
        task = asyncio.create_task(right_coroutine_worker(g))
        tasks.append(task)
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    
    print('----------------- Sequence  -----------------')
    g = gen()
    started_at = time.monotonic()
    t = 0
    for v in g:
        time.sleep(0.01)
        t += v
    print(t)
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')
    
    print('----------------- Unsafe threading  -----------------')
    g = gen()
    started_at = time.monotonic()
    threads =[]
    for _ in range(10):
        w = threading.Thread(target=unsafe_thread_worker, args=[g])
        w.start()
        threads.append(w)
    for w in threads:
        w.join()
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')

    print('----------------- Async with wrong coroutine  -----------------')
    g = gen()
    started_at = time.monotonic()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(starter_with_wrong_workers())
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')
            
    print('----------------- Async with right coroutine  -----------------')
    g = gen()
    started_at = time.monotonic()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(starter_with_right_workers())
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')

一个典型的运行输入看起来像是这个样子的:

----------------- Sequence  -----------------
499500
total time consumed: 10.53 seconds
----------------- Unsafe threading  -----------------
 49804  49609 

 50033 
 49682 
 49574 
 50005 
 50143 
 50069  50219 

 50362 
total time consumed: 1.09 seconds
----------------- Async with wrong coroutine  -----------------
 499500 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
total time consumed: 10.55 seconds
----------------- Async with right coroutine  -----------------
 49500 
 49967 
 49973 
 50100 
 49965 
 50000 
 49968 
 49963 
 49964 
 50100 
total time consumed: 1.16 seconds

这个小程序实际上模仿了一个常见的实在工作。构想咱们通过一个 http 的数据 API 一页一页地获取一个比拟大地数据集。每页数据通过一个带有页号或是起始地位的 URL 予以标识,而后通过向 API 服务器发送一个 http request,并解析返回的 http response 中所蕴含的数据。其中的 http 拜访显然是一个耗时的 IO 操作。返回数据的解析和解决是一个计算密集型的操作,相比 IO 期待,其耗费的工夫不值一提。那个生成器 gen 能够看作是一个数据页面 URL 的生成器,也就是工作生成器。而后咱们应用 sleep 来模仿一个耗时的 IO 操作,应用加法来模仿数据的合并与剖析。你也能够把这个小程序设想成为一个网络爬虫,咱们在一个全局的列表里保留了所有指标网站的地址,而后或串行或并行地拜访所有地指标,取回咱们感兴趣的数据存储并合并剖析。
总之,咱们有 1000 个比拟独立的小工作,每个工作有一个耗时 0.01 秒的 IO 操作。

退出移动版