RxPy是十分风行的响应式框架Reactive X的Python版本,其实这些版本都是一样的,只不过是各个语言的实现不同而已。因而,如果学会了其中一种,那么应用其余的响应式版本也是轻而易举的。之前我就据说过这个框架,最近决定好好钻研一下。
基本概念
Reactive X中有几个外围的概念,先来简略介绍一下。
Observable和Observer(可察看对象和观察者)
首先是Observable和Observer,它们别离是可察看对象和观察者。Observable能够了解为一个异步的数据源,会发送一系列的值。Observer则相似于消费者,须要先订阅Observable,而后才能够接管到其发射的值。能够说这组概念是设计模式中的观察者模式和生产者-消费者模式的综合体。
Operator(操作符)
另外一个十分重要的概念就是操作符了。操作符作用于Observable的数据流上,能够对其施加各种各样的操作。更重要的是,操作符还能够链式组合起来。这样的链式函数调用不仅将数据和操作分隔开来,而且代码更加清晰可读。一旦熟练掌握之后,你就会爱上这种感觉的。
Single(单例)
在RxJava和其变体中,还有一个比拟非凡的概念叫做Single,它是一种只会发射同一个值的Observable,说白了就是单例。当然如果你对Java等语言比拟相熟,那么单例想必也很相熟。
Subject(主体)
主体这个概念十分非凡,它既是Observable又是Observer。正是因为这个特点,所以Subject能够订阅其余Observable,也能够将发射对象给其余Observer。在某些场景中,Subject会有很大的作用。
Scheduler(调度器)
默认状况下Reactive X只运行在以后线程下,然而如果有需要的话,也能够用调度器来让Reactive X运行在多线程环境下。有很多调度器和对应的操作符,能够解决多线程场景下的各种要求。
Observer和Observable
先来看看一个最简略的例子,运行的后果会顺次打印这些数字。这里的of是一个操作符,能够依据给定的参数创立一个新的Observable。创立之后,就能够订阅Observable,三个回调办法在对应的机会执行。一旦Observer订阅了Observable,就会接管到后续Observable发射的各项值。
from rx import ofob = of(1, 2, 34, 5, 6, 7, 7)ob.subscribe( on_next=lambda i: print(f'Received: {i}'), on_error=lambda e: print(f'Error: {e}'), on_completed=lambda: print('Completed'))
这个例子看起来如同很简略,并且看起来没什么用。然而当你理解了Rx的一些外围概念,就会了解到这是一个如许弱小的工具。更重要的是,Observable生成数据和订阅的过程是异步的,如果你相熟的话,就能够利用这个个性做很多事件。
操作符
在RxPy中另一个十分重要的概念就是操作符了,甚至能够说操作符就是最重要的一个概念了。简直所有的性能都能够通过组合各个操作符来实现。熟练掌握操作符就是学好RxPy的要害了。操作符之间也能够用pipe函数连接起来,形成简单的操作链。
from rx import of, operators as opimport rxob = of(1, 2, 34, 5, 6, 7, 7)ob.pipe( op.map(lambda i: i ** 2), op.filter(lambda i: i >= 10)).subscribe(lambda i: print(f'Received: {i}'))
在RxPy中有大量操作符,能够实现各种各样的性能。咱们来简略看看其中一些罕用的操作符。如果你相熟Java8的流类库或者其余函数式编程类库的话,应该对这些操作符感到十分亲切。
创立型操作符
首先是创立Observable的操作符,列举了一些比拟罕用的创立型操作符。
操作符 | 作用 | |
---|---|---|
just(n) | 只蕴含1个值的Observable | |
repeated_value(v,n) | 反复n次值为v的Observable | |
of(a,b,c,d) | 蕴含所有参数的Observable | |
empty() | 一个空的Observable | |
from_iterable(iter) | 用iterable创立一个Observable | |
generate(0, lambda x: x < 10, lambda x: x + 1) | 用初始值和循环条件生成Observable | |
interval(n) | 以n秒为距离定时发送整数序列的Observable |
过滤型操作符
过滤型操作符的次要作用是对Observable进行筛选和过滤。
操作符 | 作用 |
---|---|
debounce | 按工夫距离过滤,在范畴内的值会被疏忽 |
distinct | 疏忽反复的值 |
elementAt | 只发射第n位的值 |
filter | 按条件过滤值 |
first/last | 发射首/尾值 |
skip | 跳过前n个值 |
take | 只取前n个值 |
转换型操作符
操作符 | 作用 |
---|---|
flatMap | 转换多个Observable的值并将它们合并为一个Observable |
groupBy | 对值进行分组,返回多个Observable |
map | 将Observable映射为另一个Observable |
scan | 将函数利用到Observable的每个值上,而后返回前面的值 |
算术操作符
操作符 | 作用 |
---|---|
average | 平均数 |
count | 个数 |
max | 最大值 |
min | 最小值 |
reduce | 将函数利用到每个值上,而后返回最终的计算结果 |
sum | 求和 |
Subject
Subject是一种非凡的对象,它既是Observer又是Observable。不过这个对象个别不太罕用,然而如果某些用处还是很有用的。所以还是要介绍一下。上面的代码,因为订阅的时候第一个值曾经发射进来了,所以只会打印订阅之后才发射的值。
from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject# Subject同时是Observer和Observableprint('--------Subject---------')subject = Subject()subject.on_next(1)subject.subscribe(lambda i: print(i))subject.on_next(2)subject.on_next(3)subject.on_next(4)subject.on_completed()# 2 3 4
另外还有几个非凡的Subject,上面来介绍一下。
ReplaySubject
ReplaySubject是一个非凡的Subject,它会记录所有发射过的值,不论什么时候订阅的。所以它能够用来当做缓存来应用。ReplaySubject还能够承受一个bufferSize参数,指定能够缓存的最近数据数,默认状况下是全副。
上面的代码和下面的代码简直齐全一样,然而因为应用了ReplaySubject,所以所有的值都会被打印。当然大家也能够试试把订阅语句放到其余地位,看看输入是否会产生变动。
# ReplaySubject会缓存所有值,如果指定参数的话只会缓存最近的几个值print('--------ReplaySubject---------')subject = ReplaySubject()subject.on_next(1)subject.subscribe(lambda i: print(i))subject.on_next(2)subject.on_next(3)subject.on_next(4)subject.on_completed()# 1 2 3 4
BehaviorSubject
BehaviorSubject是一个非凡的Subject,它只会记录最近一次发射的值。而且在创立它的时候,必须指定一个初始值,所有订阅它的对象都能够接管到这个初始值。当然如果订阅的晚了,这个初始值同样会被前面发射的值笼罩,这一点要留神。
# BehaviorSubject会缓存上次发射的值,除非Observable曾经敞开print('--------BehaviorSubject---------')subject = BehaviorSubject(0)subject.on_next(1)subject.on_next(2)subject.subscribe(lambda i: print(i))subject.on_next(3)subject.on_next(4)subject.on_completed()# 2 3 4
AsyncSubject
AsyncSubject是一个非凡的Subject,顾名思义它是一个异步的Subject,它只会在Observer实现的时候发射数据,而且只会发射最初一个数据。因而上面的代码仅仅会输入4.如果正文掉最初一行co_completed调用,那么什么也不会输入。
# AsyncSubject会缓存上次发射的值,而且仅会在Observable敞开后开始发射print('--------AsyncSubject---------')subject = AsyncSubject()subject.on_next(1)subject.on_next(2)subject.subscribe(lambda i: print(i))subject.on_next(3)subject.on_next(4)subject.on_completed()# 4
Scheduler
尽管RxPy算是异步的框架,然而其实它默认还是运行在单个线程之上的,因而如果应用了某些会妨碍线程运行的操作,那么程序就会卡死。当然针对这些状况,咱们就能够应用其余的Scheduler来调度工作,保障程序可能高效运行。
上面的例子创立了一个ThreadPoolScheduler,它是基于线程池的调度器。两个Observable用subscribe_on办法指定了调度器,因而它们会应用不同的线程来工作。
import rxfrom rx.scheduler import ThreadPoolSchedulerfrom rx import operators as opimport multiprocessingimport timeimport threadingimport randomdef long_work(value): time.sleep(random.randint(5, 20) / 10) return valuepool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())rx.range(5).pipe( op.map(lambda i: long_work(i + 1)), op.subscribe_on(pool_schedular)).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))rx.of(1, 2, 3, 4, 5).pipe( op.map(lambda i: i * 2), op.subscribe_on(pool_schedular)).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))
如果你察看过各个操作符的API的话,能够发现大部分操作符都反对可选的Scheduler参数,为操作符指定一个调度器。如果操作符上指定了调度器的话,会优先应用这个调度器;其次的话,会应用subscribe办法上指定的调度器;如果以上都没有指定的话,就会应用默认的调度器。
利用场景
好了,介绍了一些Reactive X的常识之后,上面来看看如何来应用Reactive X。在很多利用场景下,都能够利用Reactive X来形象数据处理,把概念简单化。
避免反复发送
很多状况下咱们都须要管制事件的产生距离,比方有一个按钮不小心按了好几次,只心愿第一次按钮失效。这种状况下能够应用debounce操作符,它会过滤Observable,小于指定工夫距离的数据会被过滤掉。debounce操作符会期待一段时间,直到过了间隔时间,才会发射最初一次的数据。如果想要过滤前面的数据,发送第一次的数据,则要应用throttle_first操作符。
上面的代码能够比拟好的演示这个操作符,疾速按回车键发送数据,留神察看按键和数据显示之间的关系,还能够把throttle_first操作符换成debounce操作符,而后再看看输入会产生什么变动,还能够齐全正文掉pipe中的操作符,再看看输入会有什么变动。
import rxfrom rx import operators as opfrom rx.subject import Subjectimport datetime# debounce操作符,仅在工夫距离之外的能够发射ob = Subject()ob.pipe( op.throttle_first(3) # op.debounce(3)).subscribe( on_next=lambda i: print(i), on_completed=lambda: print('Completed'))print('press enter to print, press other key to exit')while True: s = input() if s == '': ob.on_next(datetime.datetime.now().time()) else: ob.on_completed() break
操作数据流
如果须要对一些数据进行操作,那么同样有一大堆操作符能够满足需要。当然这部分性能并不是Reactive X独有的,如果你对Java 8的流类库有所理解,会发现这两者这方面的性能简直是齐全一样的。
上面是个简略的例子,将两个数据源联合起来,而后找进去其中所有的偶数。
import rxfrom rx import operators as opfrom rx.subject import Subjectimport datetime# 操作数据流some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)some_data2 = rx.from_iterable(range(10, 20))some_data.pipe( op.merge(some_data2), op.filter(lambda i: i % 2 == 0), # op.map(lambda i: i * 2)).subscribe(lambda i: print(i))
再或者一个利用reduce的简略例子,求1-100的整数和。
import rxfrom rx import operators as opfrom rx.subject import Subjectimport datetimerx.range(1, 101).pipe( op.reduce(lambda acc, i: acc + i, 0)).subscribe(lambda i: print(i))