RxJS 源码解析(一): Observable & Subscription

前言

ReactiveX 是 Reactive Extensions 的缩写,一般简写为 Rx ,最初是 LINQ 的一个扩展,由微软的架构师 Erik Meijer 领导的团队开发,在2012年11月开源。Rx 是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。

首先,官方对于 Rx 的定义。

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

简而言之,就是基于观察者队列实现了对异步和基础事件的编程。

Rxjs 是 Rx 的 JavaScript 的实现。本篇文章将简单的分析一下 Obersvable 和 Subscription 的源码是怎么进行的。

Observable

可观察对象是整个 Rx 的核心,主要的作用就是提供了一个观察者模式,使得调用者可以通过响应式的方式获取数据。

Observable 实际上就是一个单向链表,基本的数据结构如下:

typescript

其构造方法与 Promise 类似,通过传入一个函数包裹操作,并让这个函数来决定数据传递,这个函数的参数包含了一个订阅器。

typescript

订阅器提供了三个主要方法:next,error,complete。订阅器的实现很巧妙,其内部实现是一个链表。

跟 Promise 不同,Observable 不会立刻运行这个函数,而是等到它被订阅后,这个函数才会被执行,这种惰性求值的特性使得 Observable 可以在它仅被需要的地方进行计算。

lift

lift 方法提供了一个这样的功能,传入一个映射函数,并返回一个新的 Observable,这个新的 Observable 的 source 会指向创建它的 Observable。实际上,这种做法就是将这个映射函数用一个外覆类包裹起来,这个外覆类,正是 Observable。那么,看看它是如何实现。

typescript

pipe

Rxjs 跟其他语言实现的 ReactiveX 不一样的地方就是在于,它的映射方法不再是放在 Observable 内部,而是通过参数的形式传入到一个管道函数pipe中,在这个函数中,通过对管道函数的数组进行 reduce 后,就能够得到最终的Observable。这个 reduce 的过程也很巧妙,传入的函数的参数就是上游的 Observable,返回的就是一个给下游接收的 Observable,那么就可以把一个又一个的 Observable 串联起来

typescript

那么在使用过程中,pipe 通过重载给传入的函数提供类型信息。

typescript

// ... 其中 UnaryFunction 表示一元函数,通过这种链式操作,使得链条上的所有函数都可以拿到上一个函数返回的类型作为参数来处理,并返回给下一个函数。

subscribe

当 Observable 一旦调用 subscribe,那么就意味着其开始执行链条中的所有函数。subscribe 传入的参数是一个包含了 next ,error , complete 三个属性的对象;也可以是三个函数,分别对应 next,error,complete。

typescript

其具体实现是通过将传入的函数(对象)参数转化成 Subscriber 对象,而 Subscriber 继承了 Subscription。最后,返回的就是一个 subscription 给到调用者。

typescript

Subscriber 的 add 方法下面会讲。总之,Observable 就像一串或者一个爆竹,只有当它被点燃(subscribe)的时候,才会把一个又一个的 Observable 点着,最终迸发出巨大声响,而 subscribe 就是一个找到引线并点燃它们的过程。

Subscription

Subscription 则是通过一种树结构,它包含了叶节点和一个父节点或者父节点的集合。

typescript

add

add 方法主要的功能是连接不同的订阅,配合注释,其逻辑就是将函数或者订阅对象包裹后放入成员变量 subscriptions 中,并将这个包裹对象的父订阅对象设置为当前对象。

typescript

unsubscribe

取消订阅是订阅对象的主要功能,它为观察者模式提供了终结观察的方法。

typescript

结语

第一篇就先介绍这两个重要的组成类,由这两个类引申出来的一系列组合函数是 Rx 的核心,之后会陆续介绍这些操作函数。