Rxjs expand的用法分析
Rxjs的expand()函数声明:
public expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable
expand()会递归调用project函数,project函数把源值映射为一个Observable,每次递归调用都是把前一次调用输出的Observable的源作为输入。最后把所有的Observable合并为一个Observable作为结果输出。
它接收三个参数:
- project:映射函数,以前一次调用输出额Observable作为输入,返回一个新的Observable。
- concurrent:最大并发次数,默认值为Number.POSITIVE_INFINITY,即无限次递归调用
- scheduler:默认为null,表示立即执行
单个源值
示例:
const source = Rx.Observable.of(1);
const powerTow = source
// 递归调用提供的函数
.expand(val => {
console.log(`输入: ${val}`);
return Rx.Observable.of(2*val);
})
.take(5);
const subscribe = powerTow.subscribe(val => console.log(`输出: ${val}`));
示例输出源值1,每次对源值乘以2。take(5)限定了只输出前5个Observable,包括初始的源值1。
结果为:
"输出: 1"
"输入: 1"
"输出: 2"
"输入: 2"
"输出: 4"
"输入: 4"
"输出: 8"
"输入: 8"
"输出: 16"
"输入: 16"
多源值
单个源值是比较容易理解,递归调用project函数是顺序的。当源值产生是异步且是多个的,每一个源值都会单独递归调用project映射函数。最后把所有产生的Observable合并为一个Observable作为结果。
示例:
var clicks = Rx.Observable.fromEvent(document, 'click');
var powersOfTwo = clicks
.map(e => 1)
.expand(x => {
console.log(`输入:${x}`);
return Rx.Observable.of(2 * x).delay(1000)})
.take(5);
powersOfTwo.subscribe(x => console.log(`输出:${x}`));
示例监听页面的点击事件,每次点击发送源值1,并对源值执行expand,expand的映射函数也是将源值乘以2,并延时1000毫秒。
点击一次,输出结果是和上面的例子一样。连续点击两次,输出结果:
输出:1
输入:1
输出:1
输入:1
输出:2
输入:2
输出:2
输入:2
输出:4
输入:4
由于例子里做了延时,多次点击的时间不同,输出的结果也是不同的。
递归调用的终止
默认情况下,expand递归调用时不限次数的,那如何终止呢。可以有以下一些可以终止递归调用:
1、使用take()操作符
2、映射函数project,返回一个empty Observable
var source = new Rx.Observable.of(3);
source.expand(function(x) {
console.log('count: ' + x);
x--;
return (x >= 0) ? Rx.Observable.just(x) : Rx.Observable.empty()
})
.subscribe(
function (x) {
console.log('Next: ' , x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});