RxPy 是十分风行的响应式框架 Reactive X 的 Python 版本,其实这些版本都是一样的,只不过是各个语言的实现不同而已。因而,如果学会了其中一种,那么应用其余的响应式版本也是轻而易举的。之前我就据说过这个框架,最近决定好好钻研一下。
基本概念
Reactive X 中有几个外围的概念,先来简略介绍一下。
Observable 和 Observer(可察看对象和观察者)
首先是 Observable 和 Observer,它们别离是可察看对象和观察者。Observable 能够了解为一个异步的数据源,会发送一系列的值。Observer 则相似于消费者,须要先订阅 Observable,而后才能够接管到其发射的值。能够说这组概念是设计模式中的观察者模式和生产者 - 消费者模式的综合体。
Operator(操作符)
另外一个十分重要的概念就是操作符了。操作符作用于 Observable 的数据流上,能够对其施加各种各样的操作。更重要的是,操作符还能够链式组合起来。这样的链式函数调用不仅将数据和操作分隔开来,而且代码更加清晰可读。一旦熟练掌握之后,你就会爱上这种感觉的。
Single(单例)
在 RxJava 和其变体中,还有一个比拟非凡的概念叫做 Single,它是一种只会发射同一个值的 Observable,说白了就是单例。当然如果你对 Java 等语言比拟相熟,那么单例想必也很相熟。
Subject(主体)
主体这个概念十分非凡,它既是 Observable 又是 Observer。正是因为这个特点,所以 Subject 能够订阅其余 Observable,也能够将发射对象给其余 Observer。在某些场景中,Subject 会有很大的作用。
Scheduler(调度器)
默认状况下 Reactive X 只运行在以后线程下,然而如果有需要的话,也能够用调度器来让 Reactive X 运行在多线程环境下。有很多调度器和对应的操作符,能够解决多线程场景下的各种要求。
Observer 和 Observable
先来看看一个最简略的例子,运行的后果会顺次打印这些数字。这里的 of 是一个操作符,能够依据给定的参数创立一个新的 Observable。创立之后,就能够订阅 Observable,三个回调办法在对应的机会执行。一旦 Observer 订阅了 Observable,就会接管到后续 Observable 发射的各项值。
from rx import of
ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(on_next=lambda i: print(f'Received: {i}'),
on_error=lambda e: print(f'Error: {e}'),
on_completed=lambda: print('Completed')
)
这个例子看起来如同很简略,并且看起来没什么用。然而当你理解了 Rx 的一些外围概念,就会了解到这是一个如许弱小的工具。更重要的是,Observable 生成数据和订阅的过程是异步的,如果你相熟的话,就能够利用这个个性做很多事件。
操作符
在 RxPy 中另一个十分重要的概念就是操作符了,甚至能够说操作符就是最重要的一个概念了。简直所有的性能都能够通过组合各个操作符来实现。熟练掌握操作符就是学好 RxPy 的要害了。操作符之间也能够用 pipe 函数连接起来,形成简单的操作链。
from rx import of, operators as op
import rx
ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(op.map(lambda i: i ** 2),
op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))
在 RxPy 中有大量操作符,能够实现各种各样的性能。咱们来简略看看其中一些罕用的操作符。如果你相熟 Java8 的流类库或者其余函数式编程类库的话,应该对这些操作符感到十分亲切。
创立型操作符
首先是创立 Observable 的操作符,列举了一些比拟罕用的创立型操作符。
操作符 | 作用 | |
---|---|---|
just(n) | 只蕴含 1 个值的 Observable | |
repeated_value(v,n) | 反复 n 次值为 v 的 Observable | |
of(a,b,c,d) | 蕴含所有参数的 Observable | |
empty() | 一个空的 Observable | |
from_iterable(iter) | 用 iterable 创立一个 Observable | |
generate(0, lambda x: x < 10, lambda x: x + 1) | 用初始值和循环条件生成 Observable | |
interval(n) | 以 n 秒为距离定时发送整数序列的 Observable |
过滤型操作符
过滤型操作符的次要作用是对 Observable 进行筛选和过滤。
操作符 | 作用 |
---|---|
debounce | 按工夫距离过滤,在范畴内的值会被疏忽 |
distinct | 疏忽反复的值 |
elementAt | 只发射第 n 位的值 |
filter | 按条件过滤值 |
first/last | 发射首 / 尾值 |
skip | 跳过前 n 个值 |
take | 只取前 n 个值 |
转换型操作符
操作符 | 作用 |
---|---|
flatMap | 转换多个 Observable 的值并将它们合并为一个 Observable |
groupBy | 对值进行分组,返回多个 Observable |
map | 将 Observable 映射为另一个 Observable |
scan | 将函数利用到 Observable 的每个值上,而后返回前面的值 |
算术操作符
操作符 | 作用 |
---|---|
average | 平均数 |
count | 个数 |
max | 最大值 |
min | 最小值 |
reduce | 将函数利用到每个值上,而后返回最终的计算结果 |
sum | 求和 |
Subject
Subject 是一种非凡的对象,它既是 Observer 又是 Observable。不过这个对象个别不太罕用,然而如果某些用处还是很有用的。所以还是要介绍一下。上面的代码,因为订阅的时候第一个值曾经发射进来了,所以只会打印订阅之后才发射的值。
from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject
# Subject 同时是 Observer 和 Observable
print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4
另外还有几个非凡的 Subject,上面来介绍一下。
ReplaySubject
ReplaySubject 是一个非凡的 Subject,它会记录所有发射过的值,不论什么时候订阅的。所以它能够用来当做缓存来应用。ReplaySubject 还能够承受一个 bufferSize 参数,指定能够缓存的最近数据数,默认状况下是全副。
上面的代码和下面的代码简直齐全一样,然而因为应用了 ReplaySubject,所以所有的值都会被打印。当然大家也能够试试把订阅语句放到其余地位,看看输入是否会产生变动。
# ReplaySubject 会缓存所有值,如果指定参数的话只会缓存最近的几个值
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4
BehaviorSubject
BehaviorSubject 是一个非凡的 Subject,它只会记录最近一次发射的值。而且在创立它的时候,必须指定一个初始值,所有订阅它的对象都能够接管到这个初始值。当然如果订阅的晚了,这个初始值同样会被前面发射的值笼罩,这一点要留神。
# BehaviorSubject 会缓存上次发射的值,除非 Observable 曾经敞开
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4
AsyncSubject
AsyncSubject 是一个非凡的 Subject,顾名思义它是一个异步的 Subject,它只会在 Observer 实现的时候发射数据,而且只会发射最初一个数据。因而上面的代码仅仅会输入 4. 如果正文掉最初一行 co_completed 调用,那么什么也不会输入。
# AsyncSubject 会缓存上次发射的值,而且仅会在 Observable 敞开后开始发射
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4
Scheduler
尽管 RxPy 算是异步的框架,然而其实它默认还是运行在单个线程之上的,因而如果应用了某些会妨碍线程运行的操作,那么程序就会卡死。当然针对这些状况,咱们就能够应用其余的 Scheduler 来调度工作,保障程序可能高效运行。
上面的例子创立了一个 ThreadPoolScheduler,它是基于线程池的调度器。两个 Observable 用 subscribe_on 办法指定了调度器,因而它们会应用不同的线程来工作。
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op
import multiprocessing
import time
import threading
import random
def long_work(value):
time.sleep(random.randint(5, 20) / 10)
return value
pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())
rx.range(5).pipe(op.map(lambda i: long_work(i + 1)),
op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))
rx.of(1, 2, 3, 4, 5).pipe(op.map(lambda i: i * 2),
op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))
如果你察看过各个操作符的 API 的话,能够发现大部分操作符都反对可选的 Scheduler 参数,为操作符指定一个调度器。如果操作符上指定了调度器的话,会优先应用这个调度器;其次的话,会应用 subscribe 办法上指定的调度器;如果以上都没有指定的话,就会应用默认的调度器。
利用场景
好了,介绍了一些 Reactive X 的常识之后,上面来看看如何来应用 Reactive X。在很多利用场景下,都能够利用 Reactive X 来形象数据处理,把概念简单化。
避免反复发送
很多状况下咱们都须要管制事件的产生距离,比方有一个按钮不小心按了好几次,只心愿第一次按钮失效。这种状况下能够应用 debounce 操作符,它会过滤 Observable,小于指定工夫距离的数据会被过滤掉。debounce 操作符会期待一段时间,直到过了间隔时间,才会发射最初一次的数据。如果想要过滤前面的数据,发送第一次的数据,则要应用 throttle_first 操作符。
上面的代码能够比拟好的演示这个操作符,疾速按回车键发送数据,留神察看按键和数据显示之间的关系,还能够把 throttle_first 操作符换成 debounce 操作符,而后再看看输入会产生什么变动,还能够齐全正文掉 pipe 中的操作符,再看看输入会有什么变动。
import rx
from rx import operators as op
from rx.subject import Subject
import datetime
# debounce 操作符,仅在工夫距离之外的能够发射
ob = Subject()
ob.pipe(op.throttle_first(3)
# op.debounce(3)
).subscribe(on_next=lambda i: print(i),
on_completed=lambda: print('Completed')
)
print('press enter to print, press other key to exit')
while True:
s = input()
if s == '':
ob.on_next(datetime.datetime.now().time())
else:
ob.on_completed()
break
操作数据流
如果须要对一些数据进行操作,那么同样有一大堆操作符能够满足需要。当然这部分性能并不是 Reactive X 独有的,如果你对 Java 8 的流类库有所理解,会发现这两者这方面的性能简直是齐全一样的。
上面是个简略的例子,将两个数据源联合起来,而后找进去其中所有的偶数。
import rx
from rx import operators as op
from rx.subject import Subject
import datetime
# 操作数据流
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(op.merge(some_data2),
op.filter(lambda i: i % 2 == 0),
# op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))
再或者一个利用 reduce 的简略例子,求 1 -100 的整数和。
import rx
from rx import operators as op
from rx.subject import Subject
import datetime
rx.range(1, 101).pipe(op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))