乐趣区

Python响应式类库RxPy简介

RxPy 是十分风行的响应式框架 Reactive X 的 Python 版本,其实这些版本都是一样的,只不过是各个语言的实现不同而已。因而,如果学会了其中一种,那么应用其余的响应式版本也是轻而易举的。之前我就据说过这个框架,最近决定好好钻研一下。

基本概念

Reactive X 中有几个外围的概念,先来简略介绍一下。

Observable 和 Observer(可察看对象和观察者)

首先是 Observable 和 Observer,它们别离是可察看对象和观察者。Observable 能够了解为一个异步的数据源,会发送一系列的值。Observer 则相似于消费者,须要先订阅 Observable,而后才能够接管到其发射的值。能够说这组概念是设计模式中的观察者模式和生产者 - 消费者模式的综合体。

Operator(操作符)

另外一个十分重要的概念就是操作符了。操作符作用于 Observable 的数据流上,能够对其施加各种各样的操作。更重要的是,操作符还能够链式组合起来。这样的链式函数调用不仅将数据和操作分隔开来,而且代码更加清晰可读。一旦熟练掌握之后,你就会爱上这种感觉的。

Single(单例)

在 RxJava 和其变体中,还有一个比拟非凡的概念叫做 Single,它是一种只会发射同一个值的 Observable,说白了就是单例。当然如果你对 Java 等语言比拟相熟,那么单例想必也很相熟。

Subject(主体)

主体这个概念十分非凡,它既是 Observable 又是 Observer。正是因为这个特点,所以 Subject 能够订阅其余 Observable,也能够将发射对象给其余 Observer。在某些场景中,Subject 会有很大的作用。

Scheduler(调度器)

默认状况下 Reactive X 只运行在以后线程下,然而如果有需要的话,也能够用调度器来让 Reactive X 运行在多线程环境下。有很多调度器和对应的操作符,能够解决多线程场景下的各种要求。

Observer 和 Observable

先来看看一个最简略的例子,运行的后果会顺次打印这些数字。这里的 of 是一个操作符,能够依据给定的参数创立一个新的 Observable。创立之后,就能够订阅 Observable,三个回调办法在对应的机会执行。一旦 Observer 订阅了 Observable,就会接管到后续 Observable 发射的各项值。

from rx import of

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(on_next=lambda i: print(f'Received: {i}'),
    on_error=lambda e: print(f'Error: {e}'),
    on_completed=lambda: print('Completed')

)

这个例子看起来如同很简略,并且看起来没什么用。然而当你理解了 Rx 的一些外围概念,就会了解到这是一个如许弱小的工具。更重要的是,Observable 生成数据和订阅的过程是异步的,如果你相熟的话,就能够利用这个个性做很多事件。

操作符

在 RxPy 中另一个十分重要的概念就是操作符了,甚至能够说操作符就是最重要的一个概念了。简直所有的性能都能够通过组合各个操作符来实现。熟练掌握操作符就是学好 RxPy 的要害了。操作符之间也能够用 pipe 函数连接起来,形成简单的操作链。

from rx import of, operators as op
import rx

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(op.map(lambda i: i ** 2),
    op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))

在 RxPy 中有大量操作符,能够实现各种各样的性能。咱们来简略看看其中一些罕用的操作符。如果你相熟 Java8 的流类库或者其余函数式编程类库的话,应该对这些操作符感到十分亲切。

创立型操作符

首先是创立 Observable 的操作符,列举了一些比拟罕用的创立型操作符。

操作符 作用
just(n) 只蕴含 1 个值的 Observable
repeated_value(v,n) 反复 n 次值为 v 的 Observable
of(a,b,c,d) 蕴含所有参数的 Observable
empty() 一个空的 Observable
from_iterable(iter) 用 iterable 创立一个 Observable
generate(0, lambda x: x < 10, lambda x: x + 1) 用初始值和循环条件生成 Observable
interval(n) 以 n 秒为距离定时发送整数序列的 Observable

过滤型操作符

过滤型操作符的次要作用是对 Observable 进行筛选和过滤。

操作符 作用
debounce 按工夫距离过滤,在范畴内的值会被疏忽
distinct 疏忽反复的值
elementAt 只发射第 n 位的值
filter 按条件过滤值
first/last 发射首 / 尾值
skip 跳过前 n 个值
take 只取前 n 个值

转换型操作符

操作符 作用
flatMap 转换多个 Observable 的值并将它们合并为一个 Observable
groupBy 对值进行分组,返回多个 Observable
map 将 Observable 映射为另一个 Observable
scan 将函数利用到 Observable 的每个值上,而后返回前面的值

算术操作符

操作符 作用
average 平均数
count 个数
max 最大值
min 最小值
reduce 将函数利用到每个值上,而后返回最终的计算结果
sum 求和

Subject

Subject 是一种非凡的对象,它既是 Observer 又是 Observable。不过这个对象个别不太罕用,然而如果某些用处还是很有用的。所以还是要介绍一下。上面的代码,因为订阅的时候第一个值曾经发射进来了,所以只会打印订阅之后才发射的值。

from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject

# Subject 同时是 Observer 和 Observable

print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

另外还有几个非凡的 Subject,上面来介绍一下。

ReplaySubject

ReplaySubject 是一个非凡的 Subject,它会记录所有发射过的值,不论什么时候订阅的。所以它能够用来当做缓存来应用。ReplaySubject 还能够承受一个 bufferSize 参数,指定能够缓存的最近数据数,默认状况下是全副。

上面的代码和下面的代码简直齐全一样,然而因为应用了 ReplaySubject,所以所有的值都会被打印。当然大家也能够试试把订阅语句放到其余地位,看看输入是否会产生变动。

# ReplaySubject 会缓存所有值,如果指定参数的话只会缓存最近的几个值
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4

BehaviorSubject

BehaviorSubject 是一个非凡的 Subject,它只会记录最近一次发射的值。而且在创立它的时候,必须指定一个初始值,所有订阅它的对象都能够接管到这个初始值。当然如果订阅的晚了,这个初始值同样会被前面发射的值笼罩,这一点要留神。

# BehaviorSubject 会缓存上次发射的值,除非 Observable 曾经敞开
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

AsyncSubject

AsyncSubject 是一个非凡的 Subject,顾名思义它是一个异步的 Subject,它只会在 Observer 实现的时候发射数据,而且只会发射最初一个数据。因而上面的代码仅仅会输入 4. 如果正文掉最初一行 co_completed 调用,那么什么也不会输入。

# AsyncSubject 会缓存上次发射的值,而且仅会在 Observable 敞开后开始发射
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4

Scheduler

尽管 RxPy 算是异步的框架,然而其实它默认还是运行在单个线程之上的,因而如果应用了某些会妨碍线程运行的操作,那么程序就会卡死。当然针对这些状况,咱们就能够应用其余的 Scheduler 来调度工作,保障程序可能高效运行。

上面的例子创立了一个 ThreadPoolScheduler,它是基于线程池的调度器。两个 Observable 用 subscribe_on 办法指定了调度器,因而它们会应用不同的线程来工作。

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op

import multiprocessing
import time
import threading
import random


def long_work(value):
    time.sleep(random.randint(5, 20) / 10)
    return value


pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())

rx.range(5).pipe(op.map(lambda i: long_work(i + 1)),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))

rx.of(1, 2, 3, 4, 5).pipe(op.map(lambda i: i * 2),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))

如果你察看过各个操作符的 API 的话,能够发现大部分操作符都反对可选的 Scheduler 参数,为操作符指定一个调度器。如果操作符上指定了调度器的话,会优先应用这个调度器;其次的话,会应用 subscribe 办法上指定的调度器;如果以上都没有指定的话,就会应用默认的调度器。

利用场景

好了,介绍了一些 Reactive X 的常识之后,上面来看看如何来应用 Reactive X。在很多利用场景下,都能够利用 Reactive X 来形象数据处理,把概念简单化。

避免反复发送

很多状况下咱们都须要管制事件的产生距离,比方有一个按钮不小心按了好几次,只心愿第一次按钮失效。这种状况下能够应用 debounce 操作符,它会过滤 Observable,小于指定工夫距离的数据会被过滤掉。debounce 操作符会期待一段时间,直到过了间隔时间,才会发射最初一次的数据。如果想要过滤前面的数据,发送第一次的数据,则要应用 throttle_first 操作符。

上面的代码能够比拟好的演示这个操作符,疾速按回车键发送数据,留神察看按键和数据显示之间的关系,还能够把 throttle_first 操作符换成 debounce 操作符,而后再看看输入会产生什么变动,还能够齐全正文掉 pipe 中的操作符,再看看输入会有什么变动。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# debounce 操作符,仅在工夫距离之外的能够发射

ob = Subject()
ob.pipe(op.throttle_first(3)
    # op.debounce(3)
).subscribe(on_next=lambda i: print(i),
    on_completed=lambda: print('Completed')
)

print('press enter to print, press other key to exit')
while True:
    s = input()
    if s == '':
        ob.on_next(datetime.datetime.now().time())
    else:
        ob.on_completed()
        break

操作数据流

如果须要对一些数据进行操作,那么同样有一大堆操作符能够满足需要。当然这部分性能并不是 Reactive X 独有的,如果你对 Java 8 的流类库有所理解,会发现这两者这方面的性能简直是齐全一样的。

上面是个简略的例子,将两个数据源联合起来,而后找进去其中所有的偶数。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# 操作数据流
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(op.merge(some_data2),
    op.filter(lambda i: i % 2 == 0),
    # op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))

再或者一个利用 reduce 的简略例子,求 1 -100 的整数和。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

rx.range(1, 101).pipe(op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))
退出移动版