Python响应式类库RxPy简介

RxPy是十分风行的响应式框架Reactive X的Python版本,其实这些版本都是一样的,只不过是各个语言的实现不同而已。因而,如果学会了其中一种,那么应用其余的响应式版本也是轻而易举的。之前我就据说过这个框架,最近决定好好钻研一下。

基本概念

Reactive X中有几个外围的概念,先来简略介绍一下。

Observable和Observer(可察看对象和观察者)

首先是Observable和Observer,它们别离是可察看对象和观察者。Observable能够了解为一个异步的数据源,会发送一系列的值。Observer则相似于消费者,须要先订阅Observable,而后才能够接管到其发射的值。能够说这组概念是设计模式中的观察者模式和生产者-消费者模式的综合体。

Operator(操作符)

另外一个十分重要的概念就是操作符了。操作符作用于Observable的数据流上,能够对其施加各种各样的操作。更重要的是,操作符还能够链式组合起来。这样的链式函数调用不仅将数据和操作分隔开来,而且代码更加清晰可读。一旦熟练掌握之后,你就会爱上这种感觉的。

Single(单例)

在RxJava和其变体中,还有一个比拟非凡的概念叫做Single,它是一种只会发射同一个值的Observable,说白了就是单例。当然如果你对Java等语言比拟相熟,那么单例想必也很相熟。

Subject(主体)

主体这个概念十分非凡,它既是Observable又是Observer。正是因为这个特点,所以Subject能够订阅其余Observable,也能够将发射对象给其余Observer。在某些场景中,Subject会有很大的作用。

Scheduler(调度器)

默认状况下Reactive X只运行在以后线程下,然而如果有需要的话,也能够用调度器来让Reactive X运行在多线程环境下。有很多调度器和对应的操作符,能够解决多线程场景下的各种要求。

Observer和Observable

先来看看一个最简略的例子,运行的后果会顺次打印这些数字。这里的of是一个操作符,能够依据给定的参数创立一个新的Observable。创立之后,就能够订阅Observable,三个回调办法在对应的机会执行。一旦Observer订阅了Observable,就会接管到后续Observable发射的各项值。

from rx import of

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
    on_next=lambda i: print(f'Received: {i}'),
    on_error=lambda e: print(f'Error: {e}'),
    on_completed=lambda: print('Completed')

)

这个例子看起来如同很简略,并且看起来没什么用。然而当你理解了Rx的一些外围概念,就会了解到这是一个如许弱小的工具。更重要的是,Observable生成数据和订阅的过程是异步的,如果你相熟的话,就能够利用这个个性做很多事件。

操作符

在RxPy中另一个十分重要的概念就是操作符了,甚至能够说操作符就是最重要的一个概念了。简直所有的性能都能够通过组合各个操作符来实现。熟练掌握操作符就是学好RxPy的要害了。操作符之间也能够用pipe函数连接起来,形成简单的操作链。

from rx import of, operators as op
import rx

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
    op.map(lambda i: i ** 2),
    op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))

在RxPy中有大量操作符,能够实现各种各样的性能。咱们来简略看看其中一些罕用的操作符。如果你相熟Java8的流类库或者其余函数式编程类库的话,应该对这些操作符感到十分亲切。

创立型操作符

首先是创立Observable的操作符,列举了一些比拟罕用的创立型操作符。

操作符 作用
just(n) 只蕴含1个值的Observable
repeated_value(v,n) 反复n次值为v的Observable
of(a,b,c,d) 蕴含所有参数的Observable
empty() 一个空的Observable
from_iterable(iter) 用iterable创立一个Observable
generate(0, lambda x: x < 10, lambda x: x + 1) 用初始值和循环条件生成Observable
interval(n) 以n秒为距离定时发送整数序列的Observable

过滤型操作符

过滤型操作符的次要作用是对Observable进行筛选和过滤。

操作符 作用
debounce 按工夫距离过滤,在范畴内的值会被疏忽
distinct 疏忽反复的值
elementAt 只发射第n位的值
filter 按条件过滤值
first/last 发射首/尾值
skip 跳过前n个值
take 只取前n个值

转换型操作符

操作符 作用
flatMap 转换多个Observable的值并将它们合并为一个Observable
groupBy 对值进行分组,返回多个Observable
map 将Observable映射为另一个Observable
scan 将函数利用到Observable的每个值上,而后返回前面的值

算术操作符

操作符 作用
average 平均数
count 个数
max 最大值
min 最小值
reduce 将函数利用到每个值上,而后返回最终的计算结果
sum 求和

Subject

Subject是一种非凡的对象,它既是Observer又是Observable。不过这个对象个别不太罕用,然而如果某些用处还是很有用的。所以还是要介绍一下。上面的代码,因为订阅的时候第一个值曾经发射进来了,所以只会打印订阅之后才发射的值。

from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject

# Subject同时是Observer和Observable

print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

另外还有几个非凡的Subject,上面来介绍一下。

ReplaySubject

ReplaySubject是一个非凡的Subject,它会记录所有发射过的值,不论什么时候订阅的。所以它能够用来当做缓存来应用。ReplaySubject还能够承受一个bufferSize参数,指定能够缓存的最近数据数,默认状况下是全副。

上面的代码和下面的代码简直齐全一样,然而因为应用了ReplaySubject,所以所有的值都会被打印。当然大家也能够试试把订阅语句放到其余地位,看看输入是否会产生变动。

# ReplaySubject会缓存所有值,如果指定参数的话只会缓存最近的几个值
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4

BehaviorSubject

BehaviorSubject是一个非凡的Subject,它只会记录最近一次发射的值。而且在创立它的时候,必须指定一个初始值,所有订阅它的对象都能够接管到这个初始值。当然如果订阅的晚了,这个初始值同样会被前面发射的值笼罩,这一点要留神。

# BehaviorSubject会缓存上次发射的值,除非Observable曾经敞开
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

AsyncSubject

AsyncSubject是一个非凡的Subject,顾名思义它是一个异步的Subject,它只会在Observer实现的时候发射数据,而且只会发射最初一个数据。因而上面的代码仅仅会输入4.如果正文掉最初一行co_completed调用,那么什么也不会输入。

# AsyncSubject会缓存上次发射的值,而且仅会在Observable敞开后开始发射
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4

Scheduler

尽管RxPy算是异步的框架,然而其实它默认还是运行在单个线程之上的,因而如果应用了某些会妨碍线程运行的操作,那么程序就会卡死。当然针对这些状况,咱们就能够应用其余的Scheduler来调度工作,保障程序可能高效运行。

上面的例子创立了一个ThreadPoolScheduler,它是基于线程池的调度器。两个Observable用subscribe_on办法指定了调度器,因而它们会应用不同的线程来工作。

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op

import multiprocessing
import time
import threading
import random


def long_work(value):
    time.sleep(random.randint(5, 20) / 10)
    return value


pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())

rx.range(5).pipe(
    op.map(lambda i: long_work(i + 1)),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))

rx.of(1, 2, 3, 4, 5).pipe(
    op.map(lambda i: i * 2),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))

如果你察看过各个操作符的API的话,能够发现大部分操作符都反对可选的Scheduler参数,为操作符指定一个调度器。如果操作符上指定了调度器的话,会优先应用这个调度器;其次的话,会应用subscribe办法上指定的调度器;如果以上都没有指定的话,就会应用默认的调度器。

利用场景

好了,介绍了一些Reactive X的常识之后,上面来看看如何来应用Reactive X。在很多利用场景下,都能够利用Reactive X来形象数据处理,把概念简单化。

避免反复发送

很多状况下咱们都须要管制事件的产生距离,比方有一个按钮不小心按了好几次,只心愿第一次按钮失效。这种状况下能够应用debounce操作符,它会过滤Observable,小于指定工夫距离的数据会被过滤掉。debounce操作符会期待一段时间,直到过了间隔时间,才会发射最初一次的数据。如果想要过滤前面的数据,发送第一次的数据,则要应用throttle_first操作符。

上面的代码能够比拟好的演示这个操作符,疾速按回车键发送数据,留神察看按键和数据显示之间的关系,还能够把throttle_first操作符换成debounce操作符,而后再看看输入会产生什么变动,还能够齐全正文掉pipe中的操作符,再看看输入会有什么变动。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# debounce操作符,仅在工夫距离之外的能够发射

ob = Subject()
ob.pipe(
    op.throttle_first(3)
    # op.debounce(3)
).subscribe(
    on_next=lambda i: print(i),
    on_completed=lambda: print('Completed')
)

print('press enter to print, press other key to exit')
while True:
    s = input()
    if s == '':
        ob.on_next(datetime.datetime.now().time())
    else:
        ob.on_completed()
        break

操作数据流

如果须要对一些数据进行操作,那么同样有一大堆操作符能够满足需要。当然这部分性能并不是Reactive X独有的,如果你对Java 8的流类库有所理解,会发现这两者这方面的性能简直是齐全一样的。

上面是个简略的例子,将两个数据源联合起来,而后找进去其中所有的偶数。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# 操作数据流
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
    op.merge(some_data2),
    op.filter(lambda i: i % 2 == 0),
    # op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))

再或者一个利用reduce的简略例子,求1-100的整数和。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

rx.range(1, 101).pipe(
    op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理