在这之前,我一直都没有讲过 Scheduler 的作用,那么本章就开始讲解 Scheduler 的设计思路和基本结构。RxJS 的存在是为了处理异步 IO,而异步 IO 所包含的一系列 API 肯定也是要经过进一步的封装才能让 RxJS 中的异步操作使用。

可以看到,它主要还是根据 JS 的所能够提供的异步能力来设计这些基本结构。
- AsyncScheduler:异步调度器,使用
setInterval实现。 - QueueScheduler: 队列异步调度器,继承了AsyncScheduler,但是
QueueAction是一种链式结构,使得调度以迭代器的形式进行。 - AnimationFrameScheduler:使用
requestAnimationFrame实现了帧调度器。 - AsapScheduler:使用
Promise.resolve().then()实现的微任务调度器。
SchedulerLike 、 Scheduler & Action
首先,SchedulerLike 提供了以下两个接口。
typescript
Scheduler 则实现了这些接口。
typescript
Scheduler 为后续的继承它的调度器定义了创建方式,通过传入一个 Action 工厂,使得内部可以构造特定的 Action 。而 Action 继承了 Subscription,意味着 Action 实际上是一种的订阅器。
typescript
上面的设计是一种名为Template Method的设计模式,这种方法有效地约束了后续的不同的 Scheduler 的实现。 定义一个操作中的算法的骨架,而将一些步骤延迟到子类中。它使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。
异步调度器
先来了解一下 Scheduler 的子类 AsyncScheduler,余下所有的 Scheduler 都会继承它。在这里,先不急着进行源码分析,我们需要先为了弄清楚调度器的运行原理,了解调度器是如何对异步 API 进行封装的。
首先,调度器本身也是基于观察者模式来进行设计,但是它又独立于 Rxjs 的 Observable。一般来说, AsyncScheduler 是这样调用的。
typescript
它的调用栈是这样的。
- AsyncScheduler.schedule
- AsyncAction.schedule
- AsyncAction.requestAsyncId
- listOnTimeout // 原生事件
- processTimers // 原生事件
- AsyncScheduler.flush
- AsyncAction.execute
- AsyncAction._execute
- AsyncAction.work
AsyncAction.schedule
跟着调用栈分析源码来溯源,在 AsyncScheduler 的schedule方法中,它先构造了 AsyncAction ,然后调用它的schedule。在这个方法中,实际上是对 Action 的内部状态进行更新,所以此处关注的地方就是在于schedule如何触发异步 API。
typescript
可以看到,从 scheduler 传入的回调函数最终会被 Action 持有,所以调用栈最终执行的work实际上就是回调函数。
AsyncAction.requestAsyncId
requestAsyncId是调用异步 API 的方法,这个方法在 AsyncAction 最终触发了setInterval这一异步 API。那么实际上,根据 Template Method 的设计,所有继承 AsyncAction 的 Action 都会通过这个方法实现相对应的异步 API 。至于 AsyncAction 为什么会使用setInterval而不是setTimeout,源代码里是这样说明的。
Actions only execute once by default, unless rescheduled from within the scheduled callback. This allows us to implement single and repeat actions via the same code path, without adding API surface area, as well as mimic traditional recursion but across asynchronous boundaries. However, JS runtimes and timers distinguish between intervals achieved by serial
setTimeoutcalls vs. a singlesetIntervalcall. An interval of serialsetTimeoutcalls can be individufenally delayed, which delays scheduling the nextsetTimeout, and so on.setIntervalattempts to guarantee the interval callback will be invoked more precisely to the interval period, regardless of load. Therefore, we usesetIntervalto schedule single and repeat actions. If the action reschedules itself with the same delay, the interval is not canceled. If the action doesn't reschedule, or reschedules with a different delay, the interval will be canceled after scheduled callback execution.
对于某一个 Action 来说,除非它在调度的回调中被重新调度,那么它默认只会执行一次。这样的方式可以使得我们通过统一的代码实现调度单一或重复的 Actions,而无需添加 API,并且可以模仿传统递归来扩展异步。然而, JS 的运行时或者计时器分别通过串行的
setTimout或者是单个setInterval来获取调用的定时器。串行的setTimout定时器可以单独延迟,这样做会延迟c下一个setTimout的调度,以此类推。而setInterval则不管程序运行的负载如何,它总是尝试去确保每一次定时器的回调更加精准的安排到合适的间隔时间。因此,我们使用setInterval来安排单一或重复的 Actions,如果 action 以相同的时延调度本身,那么当前定时器不会被取消。如果 action 只没有重新调度或者以不同的时延重新调度,则安排的回调执行后,改定时器会被取消。
typescript
AsyncScheduler.flush
所以,在 AsyncScheduler 中,新增的flush方法实际上是为 setInterval 服务的,它作为异步 API 的回调函数,主要步骤如下。
- 如果存在运行中的 Action ,它会保存所用调用它的 Action。
- 如果不存在运行中的 Action,它会执行所有调用队列中的
Action.execute。 - 处理
Action.execute的运行错误。
typescript
AsyncAction.execute
上述的 flush 调用了 action 的 execute 方法。该方法也是通过处理 action 的内部状态来获得执行结果,其中会调用 _execute 这一内部方法,这个内部方法主要作用是调用 AsyncAction.work ,并处理它出现的异常。
typescript
AsyncAction.recycleAsyncId
在分析到 Action.schedule 的时候,引用了源码内部的注释,其中有一句话很重要,那就是如果 action 以相同的时延调度本身,那么当前定时器不会被取消”,所以recycleAsyncId这个方法是需要处理这种情况。
typescript
运用 Template Method
AsyncScheduler 可以说已经把所有的地基都打好了,它可以直接拿来用,也可以继承并重写一些相关的接口把相应的异步 API 进行替换。
队列调度器
队列调度器根据调用者传入的时延来决定使用同步方式的调度还是setInterval方式的调度。
QueueScheduler 单纯继承了 AsyncScheduler,其主要实现在 QueueAction 中,通过重写schedule、execute以及requestAsyncId等方法来实现这种功能。
typescript
帧调度器 与 微任务调度器
帧调度器根据调用者传入的时延来决定使用requestAnimationFrame还是setInterval,微任务调度器则是根据时延来决定使用Promise.reslove().then()还是setInterval。
两者的调用类似,以至于可以结合起来分析。
Action
它们的 action 方法均重写了requestAsyncId和recycleAsyncId, 主要还是为了处理不同异步 API 。
typescript
Scheduler
它们的 flush,跟 AsyncScheduler 的 flush 实现思路差不多,依旧是轮询 actions 队列调用 action.execute ,只是它们的 flush 需要去处理额外的以下细节。
- action 传入可能为空。
- 处理 actions 的状态。
- 清空 scheduled,使得 scheduler 能够进行下一次调度。
typescript
Immediate
这里很有意思的一点, AsapScheduler 并没有直接通过Promise.reslove().then()来实现。而是把它封装成Immediate,形成setImmediate和clearImmediate两个 API ,这样就使得微任务的调用其他的定时 API 无异。
内部实现是通过一个 Map 保存标记当前的是第几个微任务,这里并不直接保存 Promise,因为 Promise 执行完毕后就自行释放了,所以它需要的只是一个标记。
typescript
总结
本篇分析了 RxJS 的调度器相关的一系列内容,通过封装 JS 异步 API ,调度器实现相对应的异步功能,增强了 RxJS 对异步 IO 的掌控。