发布于 1年前

RxJs——map,filter第二种实现

上一节我们实现了map和filter函数,我们将这些函数都挂载在MyObservable对象上,这里存在一个问题,类似map和filter这样的操作型函数很多,所以不可能将他们都挂载在MyObservable对象上,因此,这里出现了第二种实现。

这些操作函数能串联起来的本质就是能够形成嵌套调用,因此我想到了使用pipe,pipe的本质是接收一个 RxJS 操作符的运行结果作为参数,并返回一个 Observable 实例。

代码实例

map实现

<span class="hljs">export</span> <span class="hljs">function</span> <span class="hljs">map</span>(<span class="hljs">fn</span>) {
    <span class="hljs">return</span> <span class="hljs">(observable)=></span>{
        <span class="hljs">return</span> <span class="hljs">new</span> <span class="hljs">MyObservable</span>(<span class="hljs">observer =></span> {
            observable.<span class="hljs">subscribe</span>({
                <span class="hljs">next</span>: <span class="hljs">val=></span> observer.<span class="hljs">next</span>(<span class="hljs">fn</span>(val)),
                <span class="hljs">error</span>: <span class="hljs">err =></span> observer.<span class="hljs">error</span>(err),
                <span class="hljs">complete</span>: <span class="hljs">() =></span> observer.<span class="hljs">complete</span>()
            });
        });
    }
}

filter实现

<span class="hljs">export</span> <span class="hljs">function</span> <span class="hljs">filter</span>(<span class="hljs">fn</span>) {
    <span class="hljs">return</span> <span class="hljs">(observable)=></span>{
        <span class="hljs">return</span> <span class="hljs">new</span> <span class="hljs">MyObservable</span>(<span class="hljs">observer =></span> {
            observable.<span class="hljs">subscribe</span>({
                <span class="hljs">next</span>: <span class="hljs">val=></span> <span class="hljs">fn</span>(val)? observer.<span class="hljs">next</span>(val): <span class="hljs">()=></span>{},
                <span class="hljs">error</span>: <span class="hljs">err =></span> observer.<span class="hljs">error</span>(err),
                <span class="hljs">complete</span>: <span class="hljs">() =></span> observer.<span class="hljs">complete</span>()
            });
        });
    }
}

从这里我们可以看出 RxJS 操作符的运行结果就是map或者filter执行后的返回函数,返回值就是内部的一个MyObservable对象实例。

pipe单参数实现

<span class="hljs">pipe</span>(<span class="hljs">operation</span>) {
  <span class="hljs">return</span> <span class="hljs">operation</span>(<span class="hljs">this</span>);
}

注意这里的this实际上就是对应函数中的(observable)这个参数。然后调用operation(this)后返回的是一个新的Observable,同时这个参数observable会执行subscribe方法,这个方法会将这些Observable串起来调用。

pipe多参数实现

<span class="hljs">pipe</span>(<span class="hljs">...operations</span>) {
    <span class="hljs">return</span> operations.<span class="hljs">reduce</span>(<span class="hljs">(prev, fn) =></span> <span class="hljs">fn</span>(prev), <span class="hljs">this</span>);
}

以上这个函数实现的具体功能就是形成一个函数嵌套调用,并且方向是从左向右的。

这个函数的实现最经典的算是在redux中的一段源码啦,有兴趣的可以看看这个框架,本身代码不多,但是阅读起来不容易理解,感兴趣的可以去看看。

下面我用测试代码验证下这个函数:

<span class="hljs">const</span> letObservable = <span class="hljs">of</span>(<span class="hljs">1</span>,<span class="hljs">2</span>,<span class="hljs">3</span>);
<span class="hljs">const</span> a = <span class="hljs">interval</span>(<span class="hljs">500</span>).<span class="hljs">pipe</span>(<span class="hljs">map</span>(<span class="hljs">(v) =></span> <span class="hljs">'a'</span> + v), <span class="hljs">take</span>(<span class="hljs">3</span>));
<span class="hljs">const</span> b = <span class="hljs">interval</span>(<span class="hljs">500</span>).<span class="hljs">pipe</span>(<span class="hljs">map</span>(<span class="hljs">(v) =></span> <span class="hljs">'b'</span> + v), <span class="hljs">take</span>(<span class="hljs">3</span>));
letObservable.<span class="hljs">pipe</span>(<span class="hljs">merge</span>(a, b)).<span class="hljs">subscribe</span>(<span class="hljs">(value) =></span> <span class="hljs">console</span>.<span class="hljs">log</span>(value));

日志信息如下:

这里我实现了另外的take和merge方法,调用情况可以知道和RxJs的效果一致。这里我也贴出他们的实现。

take实现

<span class="hljs">export</span> <span class="hljs">function</span> <span class="hljs">take</span>(<span class="hljs">num</span>) {
    <span class="hljs">return</span> <span class="hljs">(observable) =></span> (
      <span class="hljs">new</span> <span class="hljs">MyObservable</span>(<span class="hljs">observer =></span> {
        <span class="hljs">let</span> times = <span class="hljs">0</span>;
        <span class="hljs">let</span> subscription = observable.<span class="hljs">subscribe</span>({
          <span class="hljs">next</span>: <span class="hljs">val =></span> {
            times++;
            <span class="hljs">if</span> (num >= times) {
              observer.<span class="hljs">next</span>(val)
            } <span class="hljs">else</span> {
              observer.<span class="hljs">complete</span>()
              <span class="hljs">//if (subscription)subscription.unsubscribe()         </span>
            }
          },
          <span class="hljs">error</span>: <span class="hljs">err =></span> observer.<span class="hljs">error</span>(err),
          <span class="hljs">complete</span>: <span class="hljs">() =></span> observer.<span class="hljs">complete</span>(),
        });
      })
    )
}

tap实现

<span class="hljs">export</span> <span class="hljs">function</span> <span class="hljs">tap</span>(<span class="hljs">fn</span>) {
    <span class="hljs">return</span> <span class="hljs">(observable) =></span> {
        <span class="hljs">return</span> <span class="hljs">new</span> <span class="hljs">MyObservable</span>(<span class="hljs">observer =></span> {
            observable.<span class="hljs">subscribe</span>({
                <span class="hljs">next</span>: <span class="hljs">val =></span> {
                    <span class="hljs">fn</span>(val);
                    observer.<span class="hljs">next</span>(val);
                },
                <span class="hljs">error</span>: <span class="hljs">err =></span> observer.<span class="hljs">error</span>(err),
                <span class="hljs">complete</span>: <span class="hljs">() =></span> observer.<span class="hljs">complete</span>(),
            });
        });
    };
}

merge实现

<span class="hljs">export</span> <span class="hljs">function</span> <span class="hljs">merge</span>(<span class="hljs">...observables</span>) {
    <span class="hljs">return</span> <span class="hljs">(observable) =></span> {
        <span class="hljs">let</span> completeNum = <span class="hljs">0</span>;
        <span class="hljs">if</span> (observable) {
            observables = [observable,...observables];
        }
        <span class="hljs">return</span> <span class="hljs">new</span> <span class="hljs">MyObservable</span>(<span class="hljs">observer =></span> {
            observables.<span class="hljs">forEach</span>(<span class="hljs">observable =></span> {

                observable.<span class="hljs">subscribe</span>({
                    <span class="hljs">next</span>: <span class="hljs">val =></span> observer.<span class="hljs">next</span>(val),
                    <span class="hljs">error</span>: <span class="hljs">err =></span> {
                        observables.<span class="hljs">forEach</span>(observable.<span class="hljs">unsubscribe</span>);
                        observer.<span class="hljs">error</span>(err)
                    } ,
                    <span class="hljs">complete</span>: <span class="hljs">() =></span> {
                        completeNum++;
                        <span class="hljs">if</span> (completeNum === observables.<span class="hljs">length</span>) {
                            observer.<span class="hljs">complete</span>();
                        }

                    },
                });
            });
        });
    };
}

总结:

1、一些基本的操作(比如map或者filter)都是返回一个函数类型,而函数的参数实际上就是一个Observable 对象,并且返回一个新的Observable对象。

2、pipe管道符操作的参数是一个函数数组,而这个函数数组中的对象都是rxjs中的一些基本操作,这些操作 可以进行组合。

3、接以上第一点,返回的新的Observable对象与后续的Observable对象构成一个链式结构,这些对象接收 订阅后开始执行代码逻辑

©2020 edoou.com   京ICP备16001874号-3