RxJS 源码解析(二): Multicast Observable

(文中 RxJS 所使用的源码版本为 6.6.0)

上一篇,我们分析了 Oberservable 和 Subscription 的具体实现方法。这一篇,将会了解一系列不同的 多播观察源(Muticasted Observable)。。这些 Observable 在 RxJS 中主要是以 Subject 命名,它们有以下几种不同的实现:

  • Subject
  • AnonymousSubject
  • BehaviorSubject
  • ReplaySubject -AsyncSubject

所谓 Muticasted Observable,就是这个 Observable 可以持续的发送数据给到订阅它的订阅者们。

Subject

Subject 是最基础的 Muticasted Observable,订阅者对其进行订阅后,将会拿到 Subject 之后发送的数据。但是,如果订阅者在数据发送后再订阅,那么它将永远都拿不到这条数据。用一下例子简单说明一下:

typescript

Subject 的实现通过将观察员们放入数组中,如果有事件即将到来,通知当前所有已经在位的观察员们。

typescript

通过重写_subscribe,让客户在订阅时,将其回调函数保存到observers数组中。

typescript

Subject 通过创建一个新的指向于它的observable,完成和Observable之间的转换。

typescript

AnonymousSubject

AnonymousSubject 是 Subject 的一个轻量级壳,只负责把事件转发给预先指定的 destination Observer,自身并不维护 observers 列表,因此也不会出现重复订阅的问题。它主要被 Subject 在 lift 时,用来生成同构对象,既保留 Subject 的对外接口,又避免冗余状态。

typescript

它重载 _subscribe,移除了 Subject 本身具备的保存订阅者的功能了。

typescript

通过阅读源码使用到 AnonymousSubject 的地方,我认为 AnonymousSubject 主要的功能还是为 Subject 的 lift 方法提供一个封装,lift 需要返回的是一个符合当前类的同构对象。

typescript

如果直接重新构造一个 Subject 虽然符合同构,但是,在订阅时,会重复地把回调函数添加到新的 Subject 实例的 observers 中,导致订阅者触发多次;如果直接使用 Observable ,那么又不符合同构,因为 Observable 并不具备 next,error 和 complete 等功能,那么AnonymousSubject 就是这么一个承前启后的对象,通过重载复写 Subject 的一些方法,使得其既具备同构,也不会有冗余状态。

BehaviorSubject

BehaviorSubject 为 Subject 提供了数据持久化(相对于 Subject 本身)功能,它本身存储了已经到来的数据,可以看看以下例子。

typescript

BehaviorSubject 拥有一个 _value 成员,每次调用 next 发送数据的时候,BehaviorSubject 都会将数据保存到 _value 中。

typescript

调用 next 的时候,会把传入的 value 保存起来,并交由 Subject 的 next 来处理。

typescript

当 BehaviorSubject 被订阅的时候,也会把当前存储的数据发送给订阅者,通过重写 _subscribe 实现这个功能。

typescript

AsyncSubject

AsyncSubject 并没有提供相应的异步操作,而是把控制最终数据到来的权力交给调用者,订阅者只会接收到 AsyncSubject 最终的数据。正如官方例子所展示的的,当它单独调用 next 的时候,订阅者并不会接收到数据,而只有当它调用 complete 的时候,订阅者才会接收到最终到来的消息。以下例子可以说明 AsyncSubject 的运作方式。

typescript

AsyncSubject 通过保留发送状态和完成状态,来达到以上目的。

typescript

AsyncSubject 的 next 不会调用 Subject 的 next,而是保存未完成状态下最新到来的数据。

typescript

那么 Subject 的 next 会在 AsyncSubject 的 complete 方法中调用。

typescript

ReplaySubject

ReplaySubject 的作用是在给定的时间内,发送所有的已经收到的缓冲区数据,当时间过期后,将销毁之前已经收到的数据,重新收集即将到来的数据。所以在构造的时候,需要给定两个值,一个是缓冲区的大小(bufferSize),一个是给定缓冲区存活的窗口时间(windowTime),需要注意的是 ReplaySubject 所使用的缓冲区的策略是 FIFO。

下面举出两个例子,可以先感受一下 ReplaySubject 的行为。第一个如下:

typescript

下面是第二个例子,这个 ReplaySubject 带有一个窗口时间。

typescript

其实 ReplaySubject 跟 BehaviorSubject 很类似,但是不同的点在于,ReplaySubject 多了缓冲区和窗口时间,也算是扩展了 BehaviorSubject 的使用场景。

在源码中,还有第三个参数,那就是调度器(scheduler),一般来说,使用默认调度器已经可以覆盖大部分需求,关于调度器的部分会在之后讲到。

typescript

上面的源码中,ReplaySubject 在构造时会根据不同的窗口时间来设置 next 具体的运行内容,主要以下两种方式。

  • nextInfiniteTimeWindow
  • nextTimeWindow

nextInfiniteTimeWindow

如果窗口时间是无限的,那么就意味着缓冲区数据的约束条件只会是将来的数据。

typescript

nextTimeWindow

如果窗口时间是有限的,那么缓冲区的约束条件就由两条组成:窗口时间和将来的数据。这时,缓冲区数据就由 ReplayEvent 组成。ReplayEvent 保存了到来的数据的内容和其当前的时间戳。

typescript

那么通过 _trimBufferThenGetEvents 对缓冲区数据进行生死判断后,再把完整的数据交由 Subject 的 next 发送出去。

typescript

_trimBufferThenGetEvents

这个方法是根据不同的 event 对象中的时间戳与当前的时间戳进行判断,同时根据缓冲区的大小,从而得到这个对象中的数据是否能够保留的凭证。

typescript

订阅过程

ReplaySubject 的订阅过程比较特殊,因为订阅的时候需要发送缓冲区数据,而且在不同时间进行订阅也会使得缓冲区中的数据变化,所以订阅是需要考虑的问题会比较多。那么,抓住 _infiniteTimeWindow 这个变量来看代码会变得很容易。

typescript

最后

本章我主要简单分析了 5 种主要的 Subject,这些 Subject 实现了不同类型的 多播观察者(Muticasted Observable),对 Observable 进行了扩展。

限于本人能力水平有限,如有错误,欢迎指出。