RxJS 源码解析(五)—— Operator III
在本文开始之前,先定义一些自定义术语,方便阅读。
- 顶流:调用了操作符的流。
- 上游流:操作符的内部订阅器所订阅的流。
- 下游流:由操作符的内部订阅器管理的流。
- 下游订阅:订阅了操作符生成的流的订阅者。
在上一篇中,我描述了 OuterSubscriber 和 InnerSubscriber 的作用,并将几个 Join Creation Operator 的源码解析了一遍。下面,我们将进入的是 Transformation Operator 的源码分析。
在知道了 OuterSubscriber 和 InnerSubscriber 是一种通过委托模式实现管理下游流订阅的方法后,我发现其实这种实现技巧用于很多的 operators。那么本篇及下一篇将会介绍更多这种类似的设计。
(PS:为了方便描述,我拿了官网的相关的图片,时序图的阅读顺序是从左到右。)
基础映射
map

map 是最为基础的映射,他将上游流的值经过运算,转发给下游订阅。这部分源码不是很复杂,实际上就是做了一层转发。
typescript
scan

Scan 和 的作用跟 reduce 一样,传入一个函数把一组数据打平,但是跟 reduce 不一样的点在于,每一次结合完毕都会立刻返回结果。
typescript
这部分的实现同样也不是很复杂,在拿到上游流数据后,使用 accumulator 对数据进行累加操作。
typescript
五种基础复合映射
所谓复合映射,意思就是这些操作符接收的参数是一个带有上游流数据作为参数并返回 Observable 的函数,同时把其中的订阅数据转发给下游订阅。
mergeMap,switchMap,exhaustMap,concatMap,mergeScan 是五种复合映射操作符,它使得上游流的数据可以传递给下游流,并交由其处理。concatMap 和 mergeScan 是 mergeMap 的特殊情况,所以我们只需要关注剩余的三种。
mergeMap,switchMap,exhaustMap,这三种操作符的源码结构分为这三个部分:
- 通过 lift 操作,将原有的流映射成新的流。
- 实现 Operator 接口,通过 call 返回一个 Subscriber。
- 通过继承 OuterSubscriber 实现这个 Subscriber。
其中,前两个部分都拥有非常类似的结构,都是通过这种样板代码来进行编写。
typescript
通过 _innerSub 提供的内部注册方法,在里面创建 InnerSubscriber,并传入当前的 OuterSubscriber 。
typescript
最终,交由 subscribeToResult 创建一个内部订阅来管理下游流。
mergeMap

mergeMap 提供的是一种合并操作,通过在内部维护了多个下游流的订阅,使得上游流可以将数据下发给多个下游流。它提供了一个并发数限制的参数,主要用于控制下游流并发的数量。
typescript
下面,我们关注的点将转移到 MergeMapSubscriber 。首先看看它的数据结构。
typescript
mergeMap的Subscriber
MergeMap的Subscriber,在 _next 成员函数调用的时候,会比较 active (下游流的数量) 与 concurrent (最大并发数)的大小,active 小于 concurrent 则调用_tryNext,否则将已经到来的数据放入缓冲区中,但是你知道的, JavaScript 并没有真正的并发,这就是一个异步队列。而每一次进行 _tryNext,都会通过 project 来创建一个下游流,同时让更新 active,将下游流传入并触发_innerSub。
typescript
在上游流完成时,会触发 _complete。
typescript
如果所有的下游流都已经完成,且缓冲区中没有数据,则通知下游订阅数据已经输出完毕。
mergeMap的notify
notifyNext 就是单纯的将结果传递给下游订阅,而 notifyComplete 则有意思多了。
通过 notifyComplete ,可以得知哪些流已经完成任务并且关闭。如果 buffer 中存在数据,那么将数据交由 _next 发送出去并创建新的下游流。过这种递归操作,可以将所有 buffer 中的数据都发送出去。最后判断上游流和下游流是不是都已经结束了,如果已经结束了,则通知下游订阅数据已经输出完毕。
typescript
switchMap

switchMap 提供的是一个上游流为主的映射操作,当上游流的订阅数据到来的时候,旧的下游流会被取消订阅,然后重新订阅一组新的下游流。
typescript
switchMap的Subscriber
innerSubscription 保存了当前下游流的订阅,所以这个操作符只会维护一个下游流的订阅。
typescript
当进行 next 操作的时候,会先创建新的下游流,如果旧的下游流存在,那么会被取消订阅。
typescript
该 Subscriber 重写了_complete 。这里意味着上游流已经输出完毕,那么如果下游订阅还在进行中,那么就需要取消订阅下游流。
typescript
switchMap的notify
跟之前一样, notifyNext 依旧是将下游流中的数据转发出去。主要关注点还是在于 notifyComplete。因为 innerSubscription 被置为空了,所以调用 this._complete 无意义,不会触发到其父类函数。
typescript
如果当前的下游流已经完成了,那么就要将它从下游订阅(destination)中移除,如果上游流已经停止(error 或者 complete 被调用,或者被取消订阅),那么还得调用 super._complete 表示已经完成。
exhaustMap

跟 switchMap 相反, exhaustMap 提供了一种以下游流为主的映射操作。如果下游流已经开启,那么上游流之后到来的订阅数据都将会被抛弃,直到该下游流完成订阅。下游流完成订阅后,上游流的数据才会继续跟新的下游流结合,并形成新的订阅。
typescript
exhaustMap的Subscriber
exhaustMap 的实现很简单,通过维护 hasSubscription 这样一个内部状态,标记下游流是否被订阅了。hasCompleted 则是上游流完成情况的标记。
typescript
订阅会调用 _next,标记下游流是否已经开启(订阅是否已经存在),如果未开启,则构建新的下游流,并标记 hasSubscription 为 true。
typescript
上游流和下游流的数据都已经输出完毕了,那么把完成信号传递给下游订阅。
typescript
exhaustMap的notify
如果下游流的数据输出完毕,那么就应该要将 hasSubscription 标记为 false。
typescript
concatMap
concatMap 是 mergeMap 的一种特殊形式。
typescript
mergeScan
mergeScan 的源码跟 mergeMap 类似。只不过就是把传入的函数替换了一下,并且在内部缓存了上一个结合后的值。
typescript
concat & merge
上一篇中,关于 concat 和 merge 两个相关的 operators 并没有讲到,因为这它们其实最终都是调用 mergeMap。
高阶操作符小结
通过这三个不同的映射操作符,使得上游流可以通过一定的方式跟下游流结合。那么,结合一张图,可以看看相关操作符的关系。

对这些操作符分一下类。
属于 Transformation Operators 的有:concatMap, concatMapTo, mergeMap, mergeMapTo, switchMap,switchMapTo,exhaustMap,exhaustMapTo。
属于 Join Creation Operators 的有:merge, concat。
属于 Join Operators 的有:mergeAll, concatAll, switchAll, startWith,endWith。
零散的高阶操作符
expand

expand 将传入的 Observable 工厂进行递归操作。与上面的复合映射类似,expand 也是一种复合映射,只不过,他会不断的去复合下游流的数据,也就是类似上图的模式。
expand的Subscriber
为了实现相对应的功能,expand 定义了以下数据结构。
typescript
上游流数据到来的时候,跟 mergeMap 比较类似,也会比较 active 和 concurrent,如果 active 大于 concurrent ,那么便会用buffer缓存上游流的数据,如果 active 小于 concurrent ,那么直接发送数据给到下游订阅,并订阅一个新的下游流。需要注意的一点,为了防止爆栈,expand 在这里加了一个判断条件,在 notify 中,将利用这一条件,来结束递归。
typescript
当上游流完成时,需要标记 hasComplete 为 true。这一步是结束递归的重要标志。
typescript
expand的notify
那么 expand 是怎么构成递归的呢,当下游流有数据到来的时候,他会直接调用 _next。最终形成了_next -> subscribeToProjection -> next -> notifyNext -> _next 这样的一条递归链。
typescript
下游流完成时,需要根据 hasCompleted 和 buffer 的状态来决定是否结束递归。在这里,也形成了一条这样的递归链: _next -> subscribeToProjection -> next -> notifyComplete -> _next。
typescript
exhaust

exhaust 是一种打平操作,它的源码并没有调用 exhaustMap。它的实现思路很简单,通过判断当前是否存在前一个下游流订阅(hasSubscription),来决定当前到来的下游流是否开启。
typescript
总结
本篇主要的内容集中在分析操作符是如何进行数据的映射,那么下一篇将讲解的是 buffer 和 window 相关的缓存操作符是如何运行和实现的。