RxJs——map,filter第二种实现
上一节我们实现了map和filter函数,我们将这些函数都挂载在MyObservable对象上,这里存在一个问题,类似map和filter这样的操作型函数很多,所以不可能将他们都挂载在MyObservable对象上,因此,这里出现了第二种实现。
这些操作函数能串联起来的本质就是能够形成嵌套调用,因此我想到了使用pipe,pipe的本质是接收一个 RxJS 操作符的运行结果作为参数,并返回一个 Observable
实例。
代码实例
map实现
<span class="hljs">export</span> <span class="hljs">function</span> <span class="hljs">map</span>(<span class="hljs">fn</span>) {
<span class="hljs">return</span> <span class="hljs">(observable)=></span>{
<span class="hljs">return</span> <span class="hljs">new</span> <span class="hljs">MyObservable</span>(<span class="hljs">observer =></span> {
observable.<span class="hljs">subscribe</span>({
<span class="hljs">next</span>: <span class="hljs">val=></span> observer.<span class="hljs">next</span>(<span class="hljs">fn</span>(val)),
<span class="hljs">error</span>: <span class="hljs">err =></span> observer.<span class="hljs">error</span>(err),
<span class="hljs">complete</span>: <span class="hljs">() =></span> observer.<span class="hljs">complete</span>()
});
});
}
}
filter实现
<span class="hljs">export</span> <span class="hljs">function</span> <span class="hljs">filter</span>(<span class="hljs">fn</span>) {
<span class="hljs">return</span> <span class="hljs">(observable)=></span>{
<span class="hljs">return</span> <span class="hljs">new</span> <span class="hljs">MyObservable</span>(<span class="hljs">observer =></span> {
observable.<span class="hljs">subscribe</span>({
<span class="hljs">next</span>: <span class="hljs">val=></span> <span class="hljs">fn</span>(val)? observer.<span class="hljs">next</span>(val): <span class="hljs">()=></span>{},
<span class="hljs">error</span>: <span class="hljs">err =></span> observer.<span class="hljs">error</span>(err),
<span class="hljs">complete</span>: <span class="hljs">() =></span> observer.<span class="hljs">complete</span>()
});
});
}
}
从这里我们可以看出 RxJS 操作符的运行结果就是map或者filter执行后的返回函数,返回值就是内部的一个MyObservable对象实例。
pipe单参数实现
<span class="hljs">pipe</span>(<span class="hljs">operation</span>) {
<span class="hljs">return</span> <span class="hljs">operation</span>(<span class="hljs">this</span>);
}
注意这里的this实际上就是对应函数中的(observable)这个参数。然后调用operation(this)后返回的是一个新的Observable,同时这个参数observable会执行subscribe方法,这个方法会将这些Observable串起来调用。
pipe多参数实现
<span class="hljs">pipe</span>(<span class="hljs">...operations</span>) {
<span class="hljs">return</span> operations.<span class="hljs">reduce</span>(<span class="hljs">(prev, fn) =></span> <span class="hljs">fn</span>(prev), <span class="hljs">this</span>);
}
以上这个函数实现的具体功能就是形成一个函数嵌套调用,并且方向是从左向右的。
这个函数的实现最经典的算是在redux中的一段源码啦,有兴趣的可以看看这个框架,本身代码不多,但是阅读起来不容易理解,感兴趣的可以去看看。
下面我用测试代码验证下这个函数:
<span class="hljs">const</span> letObservable = <span class="hljs">of</span>(<span class="hljs">1</span>,<span class="hljs">2</span>,<span class="hljs">3</span>);
<span class="hljs">const</span> a = <span class="hljs">interval</span>(<span class="hljs">500</span>).<span class="hljs">pipe</span>(<span class="hljs">map</span>(<span class="hljs">(v) =></span> <span class="hljs">'a'</span> + v), <span class="hljs">take</span>(<span class="hljs">3</span>));
<span class="hljs">const</span> b = <span class="hljs">interval</span>(<span class="hljs">500</span>).<span class="hljs">pipe</span>(<span class="hljs">map</span>(<span class="hljs">(v) =></span> <span class="hljs">'b'</span> + v), <span class="hljs">take</span>(<span class="hljs">3</span>));
letObservable.<span class="hljs">pipe</span>(<span class="hljs">merge</span>(a, b)).<span class="hljs">subscribe</span>(<span class="hljs">(value) =></span> <span class="hljs">console</span>.<span class="hljs">log</span>(value));
日志信息如下:
这里我实现了另外的take和merge方法,调用情况可以知道和RxJs的效果一致。这里我也贴出他们的实现。
take实现
<span class="hljs">export</span> <span class="hljs">function</span> <span class="hljs">take</span>(<span class="hljs">num</span>) {
<span class="hljs">return</span> <span class="hljs">(observable) =></span> (
<span class="hljs">new</span> <span class="hljs">MyObservable</span>(<span class="hljs">observer =></span> {
<span class="hljs">let</span> times = <span class="hljs">0</span>;
<span class="hljs">let</span> subscription = observable.<span class="hljs">subscribe</span>({
<span class="hljs">next</span>: <span class="hljs">val =></span> {
times++;
<span class="hljs">if</span> (num >= times) {
observer.<span class="hljs">next</span>(val)
} <span class="hljs">else</span> {
observer.<span class="hljs">complete</span>()
<span class="hljs">//if (subscription)subscription.unsubscribe() </span>
}
},
<span class="hljs">error</span>: <span class="hljs">err =></span> observer.<span class="hljs">error</span>(err),
<span class="hljs">complete</span>: <span class="hljs">() =></span> observer.<span class="hljs">complete</span>(),
});
})
)
}
tap实现
<span class="hljs">export</span> <span class="hljs">function</span> <span class="hljs">tap</span>(<span class="hljs">fn</span>) {
<span class="hljs">return</span> <span class="hljs">(observable) =></span> {
<span class="hljs">return</span> <span class="hljs">new</span> <span class="hljs">MyObservable</span>(<span class="hljs">observer =></span> {
observable.<span class="hljs">subscribe</span>({
<span class="hljs">next</span>: <span class="hljs">val =></span> {
<span class="hljs">fn</span>(val);
observer.<span class="hljs">next</span>(val);
},
<span class="hljs">error</span>: <span class="hljs">err =></span> observer.<span class="hljs">error</span>(err),
<span class="hljs">complete</span>: <span class="hljs">() =></span> observer.<span class="hljs">complete</span>(),
});
});
};
}
merge实现
<span class="hljs">export</span> <span class="hljs">function</span> <span class="hljs">merge</span>(<span class="hljs">...observables</span>) {
<span class="hljs">return</span> <span class="hljs">(observable) =></span> {
<span class="hljs">let</span> completeNum = <span class="hljs">0</span>;
<span class="hljs">if</span> (observable) {
observables = [observable,...observables];
}
<span class="hljs">return</span> <span class="hljs">new</span> <span class="hljs">MyObservable</span>(<span class="hljs">observer =></span> {
observables.<span class="hljs">forEach</span>(<span class="hljs">observable =></span> {
observable.<span class="hljs">subscribe</span>({
<span class="hljs">next</span>: <span class="hljs">val =></span> observer.<span class="hljs">next</span>(val),
<span class="hljs">error</span>: <span class="hljs">err =></span> {
observables.<span class="hljs">forEach</span>(observable.<span class="hljs">unsubscribe</span>);
observer.<span class="hljs">error</span>(err)
} ,
<span class="hljs">complete</span>: <span class="hljs">() =></span> {
completeNum++;
<span class="hljs">if</span> (completeNum === observables.<span class="hljs">length</span>) {
observer.<span class="hljs">complete</span>();
}
},
});
});
});
};
}
总结:
1、一些基本的操作(比如map或者filter)都是返回一个函数类型,而函数的参数实际上就是一个Observable 对象,并且返回一个新的Observable对象。
2、pipe管道符操作的参数是一个函数数组,而这个函数数组中的对象都是rxjs中的一些基本操作,这些操作 可以进行组合。
3、接以上第一点,返回的新的Observable对象与后续的Observable对象构成一个链式结构,这些对象接收 订阅后开始执行代码逻辑