乐趣区

关于python:使用协程池Coroutine-Pool作为RxPY的数据源Observable

对于协程与 RxPY

协程(coroutine)是一个有很长历史的概念,它是计算机程序的一类组件,推广了合作式多任务的子程序。其具体的概念和历史请参照维基百科中的条目:https://en.wikipedia.org/wiki/Coroutine。
Python 天生反对的生成器 (generator) 其实就是协程的一种实现,生成器容许执行被挂起与被复原。然而因为不足更多语法上的反对,以及不足利用生成器实现异步编程的成熟模式,限度了生成器作为协程参加合作式多任务编程的用处。不过当初状况产生了扭转,Python 自 3.6 版本开始增加了 async/await 的语法间接反对协程的异步编程,同时在 asyncio 库中提供了协程编程的接口以及必要的根底实现。在 Python 里,多个协程是通过音讯循环(Evet Loop)的调度从而异步执行的。请参见笔者的另一篇文章,感受一下协程与传统的程序执行以及多线程之间的分割和区别:Python 协程(Coroutine)体验
RxPY 是响应式编程(Reactive programming)在 Python 中的实现。响应式编程或反应式编程是一种面向数据流和变动流传的申明式编程范式。其具体的信息请参照维基百科中的条目:https://en.wikipedia.org/wiki/Reactive_programming。在 RxPY 中,数据流事实上是以同步的形式执行的。组成数据处理管道(Data Pipe)的所有函数均以同步的运行栈的形式调度。只是通过全面的依赖反转使得开发者能以不便灵便的申明形式来组装和更改数据处理管道。从而取得极大的灵活性和可扩展性。
在一些特定的状况下,如何联合这两种编程范式,使得它们可能相互协作就变得十分乏味。这样做既能利用 Reactive 申明编程的形式来简化程序,又能利用协程异步运行的形式来进步 IO 的并行水平,从而进步程序的执行速度和效率。例如咱们有数据处理管道,在其数据源或是数据指标反对异步 IO 的状况下,就能够思考二者的联合使用。
这个帖子 https://blog.oakbits.com/rxpy-and-asyncio.html 提供了一些对于二者联合应用的用例和代码实现。在 RxPY 文档 里也提供了一个对于在 RxPY 中应用 asyncio 作为数据源的例子。

对于协程池

跟线程相比,协程自身以及其调度都要轻量很多。但这并不意味着协程能够不受限制地部署和运行。其限度次要来自于内外两个方向。外部的限度次要是无限的运行资源例如内存,处理器等。咱们晓得一个协程对应于一个内存对象,过多的流动协程将有可能会耗尽内存,也可能会导致期待处理器的工夫超过 IO 的等待时间,从而不再能进步程序运行的速度,使得减少的内存资源耗费变得毫无意义。内部的限度次要来自于数据源或数据指标的并行性限度。例如咱们要通过数据 API 分页地获取一个大数据集,而作为数据源的 API 很可能对可能平安拜访的并行度有着或明或暗的限度。超过限度数目的并行申请将有可能被回绝,甚至会导致 API 自身运行 / 数据产生凌乱,直至进行服务。协程池是一种在协程环境中限度并行度的简略无效的办法。跟多线程环境下的线程池的概念类似,协程池领有肯定数量的协程,每个协程独立地支付并执行工作。当某个协程实现一个工作时,该协程将持续支付下一个工作直至所有工作实现。除此以外,asynio 中提供的同步原语也能够被用来限度并行的水平和流动协程的数量,同步原语的应用不在本文的探讨范畴。

程序实列

以下程序能够在 Python 3.6+ 上运行。

import asyncio
import random
import rx
import functools
import selectors
import time


def job_generator():              # 1
    job_id = 1
    while True:
        yield job_id
        job_id += 1


async def worker(worker_ame, job_gen, observer):   # 2
    for job_id in job_gen:
        if job_id >= 200:                          # 3
            job_gen.close()
            break
        await asyncio.sleep(random.uniform(0.01, 0.1))  # 4
        observer.on_next(f'{worker_ame}:  {job_id}')    # 5


def data_source():                                      # 6
    
    def on_subscribe(observer, scheduler):              # 7
        
        async def _aio_sub(loop):                       # 8
            tasks = []                                  # 9
            job_gen = job_generator()
            for i in range(3):
                task = asyncio.create_task(worker(f'Worker-{i}', job_gen, observer))
                tasks.append(task)
            
            # Wait until all worker tasks are finished/cancelled.
            try: 
                await asyncio.gather(*tasks)            # 10
                loop.call_soon(observer.on_completed)
            except Exception as e:                      # 11
                loop.call_soon(functools.partial(observer.on_error, e))
                for task in tasks:
                    task.cancel()
                raise e
            
        selector = selectors.SelectSelector()
        loop = asyncio.SelectorEventLoop(selector)
        asyncio.set_event_loop(loop)
        loop.run_until_complete(_aio_sub(loop))

    return rx.create(on_subscribe)


if __name__ == '__main__':
    started_at = time.monotonic()
    source = data_source()                          # 12
    source.subscribe(                               # 13
        on_next = lambda i: print("Received {0}".format(i)),
        on_error = lambda e: print("Error Occurred: {0}".format(e)),
        on_completed = lambda: print("Done!"),
    )
    total_time = time.monotonic() - started_at      # 14
    print('====')
    print(f'Used {total_time:.2f} seconds')

一个可能的输入看起来像是这个样子:

c:\PortableApps>python RxPYWithAsyncIO.py
Received Worker-0:  1
Received Worker-0:  4
Received Worker-0:  5
Received Worker-1:  2
Received Worker-2:  3
Received Worker-0:  6
Received Worker-2:  8
Received Worker-1:  7
Received Worker-1:  11
Received Worker-0:  9

..........

Received Worker-1:  188
Received Worker-0:  184
Received Worker-2:  190
Received Worker-0:  192
Received Worker-1:  191
Received Worker-1:  195
Received Worker-0:  194
Received Worker-2:  193
Received Worker-1:  196
Received Worker-1:  199
Received Worker-2:  198
Received Worker-0:  197
Done!
====
Used 3.52 seconds

留神在输入中咱们冀望看到的几处要害内容:

  1. 200 行型如 Recieved <CoroutineName>: <Job_Id>。其中 CoroutineName 为 Worker-0,1,2 之一。Job_Id 的值为 0 -199。特地的,输入的 Job_Id 不会是依照程序的,这是因为在代码中,每个 Job 的运行工夫是一个随机值。不同 job 的运行工夫有长有短。另外能够察看到所有的协程均参加了工作的解决,它们在输入中交替呈现。
  2. 所有 Job 实现后输入的“Done!”。这是在所有协程实现当前,Observable 发送 on_complete 到注册(Subscribe)的 Observer,由 Observer 打印出的音讯。
  3. 最初输入的总的耗费的工夫,通常在应用 3 个协程的状况下是 3 秒多。能够看到本次运行的工夫是 3.52 秒
    上面让咱们沿着代码中标注的序号来做具体的解读。
  4. job_generator 是工作生成器,其实例为所有协程共享。工作协程 worker 从这里支付工作并实现。之后支付下一个工作。从一个惟一的工作生成器支付工作保障了工作不会被反复散发。在拜访数据 API 并且分页获取数据的利用中,工作生成器所产生的内容可能是带有页号的 URL;在网络爬虫中,其产生的内容可能是指标网站的地址。留神到这里应用了 job 而不是 task 作为其名称,这是为了跟 asyncio 中的 task(对应于一个可并行运行的协程)有所区别。
  5. worker 是一个异步函数,运行起来就是一个工作协程。多个 worker 组成了本例中数据源的协程池。Worker 承受一个名字用于输入日志,一个工作生成器 job_generator 的实例,以及作为数据指标的 observer。Worker 继续地从工作生成器中支付工作,实现工作,将后果发送到 observer。因为协程都是轮流调度的,不会产生多个协程同时运行的状况,因而在协程间共享的数据资源例如 job_generator 和 observer 不须要做任何的爱护。
  6. 抉择在 worker 中判断完结条件是有现实意义的。很多时候咱们不太可能在一开始就确定所有的工作,很可能须要在一个或者多个工作的后果中去判断是否还有余下的工作。例如在分页获取数据的利用中,经常须要在返回的数据集中查看是否有下一页的 URL 或是判断当前页号是否曾经达到或超过了数据集的总页数。当发现完结条件达到的时候,除了完结本协程外,还要告诉其余协程尽快完结工作,为此敞开工作生成器是一个好办法。当其余协程实现手上的工作再次尝试支付时会发现曾经没有更多的工作了。
  7. 这行代码模仿了一个异步的 IO,期待一段随机的工夫。在事实环境中通常会应用异步 IO 获取数据,例如应用 aiohttp 拜访 http 数据 API。
  8. 通过调用 observer 的 on_next 办法将工作的后果放入数据处理管道。这里只是简略地将 job_id 加上解决协程的名字作为后果。事实环境中大多须要解决和转换 IO 获取的后果数据。留神这里的 on_next 办法是同步办法,如果管道中的数据处理过于耗时的话,会重大阻塞整个协程池的运行。有必要的话,这里须要应用并行化的办法来保障 on_next 调用尽快返回,这部分内容不在本文的探讨范畴之内。
  9. 一个简略的 factory 函数,应用 RxPY 的 create 办法将函数 on_subscribe 包装成 observable 并返回。这是 RxPY 中罕用的手法,详情请参见其文档和实例。
  10. on_subscribe 函数是逾越同步和异步世界的桥梁,它自身实现了 RxPY 对于 Observable 的接口协议,是一个传统的同步函数。在其外部定义了顶层的异步函数_aio_sub,被驱动运行时首先创立并启动音讯循环(Event Loop),应用 loop.run_until_complete 将异步函数_aio_sub 搁置并运行在音讯循环上。这一步同时也将本人的线程能源(就是流动的运行栈)交到音讯循环,并期待_aio_sub 运行完结,最初以传统同步的形式完结并退出。
  11. _aio_sub 是异步世界的顶层入口。当被音讯循环驱动运行的时候,它首先创立协程池,而后期待所有在协程池中的协程运行完结,收集运行后果并返回。
  12. 在协程池中,每个协程 worker 都将被创立成一个 asyncio 的 task,这使得它们可能被音讯循环交替地调度运行,并由创建者期待收集运行后果。本例地代码中能够看出咱们部署了 3 个协程。读者能够自行调整协程和工作的数量,察看总的运行工夫。
  13. asyncio.gather 有三个作用。期待所有 task 运行完结;收集所有 task 的返回值,本例中因为 worker 自身没有返回值,所以当所有运行顺利的话,收集到的返回值将会全副是空值 None;当协程运行中有任何异样抛出时,将会抛出第一个产生的异样。
  14. 本例中当第一个异样被捕捉时,程序将异样传递到数据流上从而终止数据流。而后勾销所有协程的运行并抛出异样,这事实上终止了数据管道的运行。这样的行为对于分页获取数据的利用是正当的。对于须要爬大量网站的网络爬虫来说,多数网站抛出异样是失常的,在这种状况下应该只将捕捉的异样记入日志,而后持续期待协程池的运行直至完结。这须要首先改变协程 worker 的运行逻辑。
  15. 创立 observable 作为数据源。RxPY 的常见代码。
  16. 在数据源上注册一个最简略的 observer。该 observer 仅仅将收到的数据和事件打印进去。在这一步,简单的数据处理管道也能够被组装。其最初的注册调用 source.subscribe 实际上通过运行 on_subscribe 函数而启动了整个数据管道的运行。咱们曾经晓得 RxPY 的调用是同步的,当 source.subscribe 运行完结返回的时候,咱们可能确定的是数据管道的运行曾经完结,要么所有冀望的数据都曾经通过管道顺利解决,要么在解决中产生并抛出了异样。
  17. 计算并输入运行工夫的简略办法,没有什么须要多解释的。
退出移动版