本文是 Rxjs 响应式编程 - 第二章:序列的深入研究这篇文章的学习笔记。
示例代码托管在:http://www.github.com/dashnowords/blogs
更多博文:《大史住在大前端》目录
一. 划重点
文中使用到的一些基本运算符:
-
map
- 映射 -
filter
- 过滤 -
reduce
- 有限列聚合 -
scan
- 无限列聚合 -
flatMap
- 拉平操作(重点) -
catch
- 捕获错误 -
retry
- 序列重试 -
from
- 生成可观测序列 -
range
- 生成有限的可观测序列 -
interval
- 每隔指定时间发出一次顺序整数 -
distinct
- 去除出现过的重复值
建议自己动手尝试一下,记住就可以了,有过 lodash
使用经验的开发者来说并不难。原文中使用 flatMap
转换序列时有一处应该是手误:
二. flatMap 功能解析
原文中在 http
请求拿到获取到数据后,最初使用了 forEach
实现了手动流程管理,于是原文提出了优化设想,试图探究如何依赖响应式编程的特性 将手动的数据加工转换改造为对流的转换,好让最终的消费者能够拿到直接可用的数据,而不是得到一个响应后手动进行很多后处理。在代码层面需要解决的问题就是,如何在不使用手动遍历的前提下将一个有限序列中的数据逐个发给订阅者,而不是一次性将整个数据集发过去。
假设我们现在并不知道有 flatMap
这样一个可以使用的方法,那么先来做一些尝试:
var quakes = Rx.Observable.create(function(observer) {
// 模拟得到的响应流
var response = {
features:[{earth:1},{earth:2}],
test:1
}
/* 最初的手动遍历代码
var quakes = response.features;
quakes.forEach(function(quake) {observer.onNext(quake);
});*/
observer.onNext(response);
})
// 为了能将 features 数组中的元素逐个发送给订阅者,需要构建新的流
.map(dataset){return Rx.Observable.from(dataset.features)
}
当我们订阅 quakes
这个事件流的时候,每次都会得到另一个Observable
,它是因为数据源经过了映射变换,从数据变成了可观测对象。那么为了得到最终的序列值,就需要再次订阅这个Observable
,这里需要注意的是可观测对象被订阅前是不启动的,所以不用担心它的时序问题。
quakes.subscribe(function(data){data.subscribe(function(quake){console.log(quake);
})
});
如果将 Observable
看成一个盒子,那么每一层盒子只是实现了流程控制功能性的封装,为了取得真正需要使用的数据,最终的订阅者不得不像剥洋葱似的通过 subscribe
一层层打开盒子拿到最里面的数据,这样的封装性对于数据在流中的传递具有很好的隔离性,但是对最终的数据消费者而言,却是一件很麻烦的事情。
这时 flatMap
运算符就派上用场了,它可以将冗余的包裹除掉,从而在主流被订阅时直接拿到要使用的数据,从大理石图来直观感受一下flatMap
:
乍看之下会觉得它和 merge
好像是一样的,其实还是有一些区别的。merge
的作用是将多个不同的流合并成为一个流,而上图中 A1,A2,A3 这三个流都是当主流 A 返回数据时新生成的,可以将他们想象为 A 的支流,如果你想在支流里捞鱼,就需要在每个支流里布网,而 flatMap
相当于提供了一张大网,将所有 A 的支流里的鱼都给捞上来。
所以在使用了 flatMap
后,就可以直接在一级订阅中拿到需要的数据了:
var quakes = Rx.Observable.create(function(observer) {
var response = {
features:[{earth:1},{earth:2}],
test:1
}
observer.onNext(response);
}).flatMap((data)=>{return Rx.Observable.from(data.features);
});
quakes.subscribe(function(quake) {console.log(quake)
});
三. flatMap 的推演
3.1 函数式编程基础知识回顾
如果本节的基本知识你尚不熟悉,可以通过 javascript 基础修炼(8)——指向 FP 世界的箭头函数这篇文章来简单回顾一下函数式编程的基本知识,然后再继续后续的部分。
/*map 运算符的作用
* 对所有容器类而言,它相当于打开容器,进行操作,然后把容器再盖上。*Container 在这里只是一个抽象定义,为了看清楚它对于容器中包含的值意味着什么。* 你会发现它其实就是 Observable 的抽象原型。*/
Container.prototype.map = function(f){return Container.of(f(this.__value))
}
// 基本的科里化函数
var curry = function(fn){args = [].slice.call(arguments, 1);
return function(){[].push.apply(args, arguments);
return fn.apply(this, args);
}
}
//map pointfree 风格的 map 运算符
var map = curry(function(f, any_functor_at_all) {return any_functor_at_all.map(f);
});
/*compose 函数组合方法
* 运行后返回一个新函数,这个函数接受一个参数。* 函数科里化的基本应用,也是函数式编程中运算管道构建的基本方法。*/
var compose = function (f, g) {return function (x) {return f(g(x));
}
};
/*IO 容器
* 一个简单的 Container 实现,用来做流程管理
* 这里需要注意,IO 实现的作用是函数的缓存,且总是返回新的 IO 实例
* 可以看做一个简化的 Promise, 重点是直观感受一下它作为函数的
* 容器是如何被使用的,对于理解 Observable 有很大帮助
*/
var IO = function(f) {this.__value = f;}
IO.of = function(x) {return new IO(function() {return x;});
}
IO.prototype.map = function(f) {return new IO(compose(f, this.__value));
}
如果上面的基本知识没有问题,那么就继续。
3.2 从一个容器的例子开始
现在来实现这样一个功能,读入一个文件的内容,将其中的 a
字符全部换成 b
字符,接着存入另一个文件,完成后在控制台输出一个消息,为了更明显地看到数据容器的作用,我们使用同步方法并将其包裹在 IO
容器中,然后利用函数式编程:
var fs = require('fs');
// 读取文件
var readFile = (filename)=>IO.of(fs.readFileSync(filename,'utf-8'));
// 转换字符
var transContent = (content)=>IO.of((content)=>content.replace('a','b'));
// 写入字符串
var writeFile = (content)=>IO.of(fs.writeFileSync('dest.txt',content));
当具体的函数被 IO
容器包裹起来而实现延迟执行的效果时,就无法按原来的方式使用 compose()
运算符直接对功能进行组合,因为 readFile
函数运行时的输出结果(一个 io
容器实例)和 transContent
函数需要的参数类型(字符串)不再匹配,在不修改原有函数定义的前提下,函数式编程中采用的做法是使用 map
操作符来预置一个参数:
/*
*map(transContent)是一个高阶函数,它的返回函数就可以接收一个容器实例,* 并对容器中的内容执行 map 操作。*/
var taskStep12 = compose(map(transContent), readFile);
这里比较晦涩,涉及到很多功能性函数的嵌套,建议手动推导一下 taskStep12
这个变量的值,它的结构是这样一种形式:
io{
__value:io{__value:someComposedFnExpression}
}
如果试图一次性将所有的步骤组合在一起,就需要采用下面的形式:
var task = compose(map(map(writeFile)),map(transContent),readFile);
// 组合后的 task 形式就是
//io{io{io{__value:someComposedFnExpression}}}
问题已经浮出水面了,每多加一个针对容器操作的步骤,书写时就需要多包裹一层map
,而运行时就需要多进入一层才能触及组合好的可以实现真正功能的函数表达式,真的是很麻烦。
提示一:
现在来回想一下原示例中的 Observable 对象,将其看做是一个容器(含有 map 类方法),那么如果 map 方法调用时传入的参数是一个运行时会生成新的 Observable 对象的方法时,就会产生 Observable 嵌套,得到 observable{observable{…..}}这样的结构,那么在最终的数据消费者通过
subscribe
方法订阅数据时,就不得不用很多个subscribe
才能拿到实际需要的数据。提示二:
没有相关经验的读者在使用 pointfree 风格的
map
操作符时可能会感到非常不适应,如果你觉得它很难理解,也可以尝试直接使用IO.prototype.map
这种 链式调用 风格的写法将上例中的三个步骤组合在一起来查看最后的结果,毕竟在Rxjs
中常使用的也就是Observable
这一个容器类。
3.3 Monad 登场
当我们看到问题所在后就不难发现,其实这个问题的解决方法并不复杂,我们要做的不过就是在必要的时候合并内容的容器,为此来定义两个合并运算的方法:
// 链式调用风格
IO.prototype.join = function(){return this.isNothing() ? IO.of(null):this.__value;
}
//pointfree 风格运算符
var join = (m)=>m.join();
这里引入一个新的概念 Monad
,它的定义是可以被展平的容器,也就是说拥有join
和of
方法并遵循一定规则的容器,都是Monad
,在这种设定下,3.1 中的示例就可以被改写为下面的形式:
var task = compose(join,map(writeFile),join,map(transContent),readFile);
不难发现 map
和join
总是需要成对出现的,那么再利用函数科里化的技巧将 map
和join
连起来:
var chain = curry(function(f,m){return m.map(f).join();})
那么组合后的函数就变成了下面的形式:
var task = compose(chain(writeFile),chain(transContent),readFile);
这里的chain
,就是FlatMap
。
3.4 对比总结
最后将上面几种形式放在一起再来回顾一下:
// 原有形式
var task = compose(map(map(writeFile)),map(transContent),readFile);
//map-join 形式
var task = compose(join,map(writeFile),join,map(transContent),readFile);
//chain 形式(flatMap)
var task = compose(chain(writeFile),chain(transContent),readFile);
如果理解了这几种形式,就不难理解 flatMap
的拉平效应了,所谓flatMap
,说白了其实就是将容器展开的一种操作。
3.5 一点疑问
flatMap
所解决问题,是在函数式编程引入了 Functor
的概念将逻辑函数包裹在容器中后才产生的,那么这种容器概念的引入对函数式编程到底有什么意义,笔者尚未搞清楚,相关内容留作以后补充。
四. 资料参考
《javascript 函数式编程指南》https://llh911001.gitbooks.io/mostly-adequate-guide-chinese/content/