RxJS 源码解析(三)—— Operator I

本篇是 RxJS 源码解析的第三篇文章,使用源码的版本是 6.6.0 。本篇文章的内容可能会比较多,请耐心阅读。为了方便阅读,文中的相关代码均经过裁剪和处理。如有不妥,还请指正。

在 RxJS 中,Creation Operator 主要分为以下两类:

  • 执行一般创建操作的 Normal Creation Operator。
  • 执行复杂的创建操作的 Join Creation Operator。

在 pipe 中使用的 operator ,我称之为 Pipe Operator ,它主要分为以下几类:

  • 用于数据映射的 Transformation Operators
  • 过滤用的 Filtering Operators
  • 将当前的 Observable 多播化的 Multicasting Operators
  • 处理错误的 Error Handling Operators
  • 工具操作函数 Utility Operators
  • Conditional and Boolean Operators
  • Mathematical and Aggregate Operators

限于篇幅本篇将先介绍 Normal Creation Operator ,它的主要作用是帮助开发者快速创建所需要的 Observable。

of, empty & throwError

of 、empty 、throwError ,首先讲这三个 operator 的重要原因是,它提供了一系列基础的操作:next、complete、以及 error。

typescript

of source code

它的构建方式如下,其中,调度器是最后一个参数。

typescript

of 由两个函数 fromArray 和 scheduleArray。fromArray 是一个简单循环的函数,它将数据循环发送给 Observable 的订阅者。

typescript

empty source code

这部分的代码很简单,scheduler 部分可以忽略。实际上就是在 Observable 中调用 subscriber.complete()。

typescript

throwError source code

throwError 跟 empty 的实现是一致的,只不过 complete 换成了 error 。

typescript

iif & defer

iif 和 defer 的表现是一致的。

  • defer 的主要作用是延后了具体 Observable 的生成,是一个 Lazy Observable Factory。
  • iif 则是缩小了 defer 的表达范围,主要作用是增强了Rx 的命令式的语义。
typescript

iif Source Code

看到 iif 的源码的那一刻我震惊了,什么叫大道至简。

typescript

defer Source Code

defer 原理上比较简单:在构造 Observable 的时候,在传入的订阅函数中返回一个 Subscription。那么在这个传入的订阅函数中,defer 的过程分为以下三步:

  • 调用工厂,获取输入数据。
  • 调用 from 将数据转换成一个 observable
  • 返回这个 observable 的订阅。
typescript

其中的 ObservedValueOf 是这样定义的,使用了 ts 的 infer 来推导出 ObservableInput<T>T 的具体类型。

typescript

from source code

from 提供了一种映射的功能,可以将传入的数据映射成 Observables 。它可以接受以下参数:

  • 原生数组 和 Iterable<T>
  • dom 迭代器
  • Promise<T>
  • Observable<T>

稍微的修剪了一下,源码如下:

typescript

它直接创建一个新的 Observable,并且调用了 subscribeTo ,根据输入类型,对输入进行不同的处理。

  • 如果输入是 Observable,调用 subscribeToObservable.
  • 如果输入是原生数组,调用 subscribeToArray.
  • 如果输入是 Promise,调用 subscribeToPromise.
  • 如果输入是生成器,调用 subscribeToIterable.

subscribeToArray

如果输入是原生数组或者是实现了数组功能的数据结构,那么直接调用 subscriber.next 把所有数据依次发送给订阅者。

typescript

subscribeToObservable

如果输入是 Obervable,那么要通过一个特定的 Symbol 取出 Observable,然后再订阅它。

(基于 Symbol 的特性,当前很多项目都会使用一个固定的 Symbol 对特定数据取值,来验证这个数据是不是符合类型)。

typescript

subscribeToPromise

如果输入是一个 Promise,那么通过 then 获取到 Promise 的内容,并将内容发送给订阅者。

typescript

subscribeToIterable

生成器跟数组的方式类似,也是通过循环的方式将数据发送给订阅者。

typescript

generate

generate 可以让你用一种类似 for 循环的方式获得数据流。不过,我目前还没有遇到过非常需要这种方式生成流的方式,如果你遇到这种情况,欢迎交流。一般来说,我习惯于这样调用它。

typescript

原来的源码包含了较多的参数判断,把内部逻辑梳理一下,实际上就是分为三个大步骤:

  • 判断结束条件, 如果为假代表已经结束,则应该完成订阅,否则进行下一步。
  • 发送数据订阅给到订阅者。
  • 调用迭代方法,生成下一组数据,重复第一步。
typescript

其中 GenerateOptions 包含了三个成员,initialState,condition 以及 iterate 。

typescript

range

range 可以创建一个给定范围的数字流。这个主要就是提供了一个简单的语义化函数,主要就是通过循环给订阅者喂数据。

typescript

fromEvent & fromEventPattern

fromEvent source code fromEvent 使得 Observable 可以封装一系列的系统事件。既可以接受 NodeJS EventEmitter,也可以接受 DOM EventTarget, JQuery-like event target, NodeList 或者 HTMLCollection 等浏览器对象。

typescript

上面的代码可以分解成这三个步骤:

  • 在闭包中创建一个 handler 函数,handler 函数最终会调用 subscriber.next。
  • 为 target 添加指定事件监听。
  • 为 subscriber 添加一个销毁 target 事件监听的逻辑。

对于其他的事件监听,不再赘述,流程完全是一样。

fromEventPattern source code

fromEventPattern 则是对 fromEvent 的泛化。

typescript

它的源码的与 fromEvent 类似。

typescript

bindCallback, bindNodeCallback

它们都是一种特殊的 Operator ,思路应该是源于 Function.bind ,提供一种转换操作,将带有回调的函数转换成 Observable Factory。

  • bindCallback 是适配普通的回调函数;
  • bindNodeCallback 是为了 NodeJS 环境下的回调函数而设计,能把回调中的错误处理。
typescript

以下是 bindNodeCallback 的例子。

typescript

bindCallback 和 bindNodeCallback 的源码非常类似。

typescript

bindCallback 和 bindNodeCallback 的源码唯一不同的地方就是在于 handler 这个函数处理的内容不同,bindNodeCallback 传入的函数的回调,第一个参数为是错误信息。

typescript

源码中比较有趣的地方在于,创建的时候,返回的工厂函数包含了一个AsyncSubject。这个 AsyncSubject 保存了已经到来数据,可以看看例子中,ob1 被订阅了2次,第二次订阅后实际上是立刻就能拿到返回值;而 ob2 仍要执行一次 setTimeoutWithCallback。这种设计与这个 bind 的语义相吻合。

interval & timer 上面的 operators 中,我已经把 scheduler 相关的内容进行了裁剪,基本上与 scheduler 无关。而 interval 和 timer 都必须通过 scheduler 来相应的定时操作,所以这部分放到了最后。它们是用于创建定时数据源的 operators 。

•interval:传入的参数表示每隔指定毫秒发送一条数据。 •timer:传入的第一个参数是指第一条发送数据的时间间隔,第二个参数是指后续数据发送的间隔。

typescript

interval 和 timer 都使用了一个默认的异步调度器,这个异步调度器主要是通过 setInterval 来实现相应的功能,实际上 Rx 把异步调度器通过 interval 和 timer 转化成 Observable 的形式提供到给用户。

timer Source Code

timer 的实现如下图所示。它首先创建了一个 Observable ,然后在订阅函数中,返回调度器的订阅。在这里, scheduler 的 schedule 函数返回了一个 Subscription。

typescript

dispatch 实际上是一个递归函数,这个函数绑定了 SchedulerAction ,通过传入订阅者,使得 Action 内部的 setInterval 能够一直调用 suscriber.next。

typescript

interval Source Code

以下是 interval 的源码。

typescript

仔细的分析上面的代码,我发现 interval 的实现实际上就是 timer 的一个约束版本,它可以改写成这样。

typescript

本篇小结

整体而言,这部分的源码并没有写得很绕,删去了 scheduler 相关的内容后,逻辑立刻就变得清晰了起来。同时,从源码的风格上可以看到它们由不同的人来编写。

最后,限于本人的水平,文章可能会有错误的地方,欢迎指正。