RxJS 源码解读之派生 Subject
前言
RxJS 源码解读之派生 Subject
,即 BehaviorSubject
、 ReplySubject
、 AsyncSubject
、 VoidSubject
。
在上篇中,我们介绍了 Subject 的核心实现,在 RxJS 中, Subject 派生出了其他几种具有特殊功能的 Subject :
- BehaviorSubject
- ReplySubject
- AsyncSubject
- VoidSubject
这些派生的 Subject 都继承自 Subject ( VoidSubject 除外)。
正文
VoidSubject
VoidSubject ,其实就是原生的 Subject ,但是它发出的值为 void ,即发出空值的 Subject 。
1 | const subject$ = new Subject<void>(); |
VoidSubject 的实现就是 Subject 的实现,所以这里直接跳过。
BehaviorSubject
BehaviorSubject 可以让观察者拿到最近的一个值。最近的一个值的意思是,当一个观察者订阅这个 Subject 时,会立马得到一个“上次发出的值”,如果该 Subject 还未发出值,则使用构造时的初始值。
1 | const subject$ = new BehaviorSubject<string>("init value"); |
为了实现这个特性, BehaviorSubject 在内部挂在了一个 _value
属性,在每次发出值的时候顺便把该值记录下来:
1 | export class BehaviorSubject<T> extends Subject<T> { |
接着在每次有新的观察者加入的时候,使用 _value
属性来执行 next
函数:
1 | export class BehaviorSubject<T> extends Subject<T> { |
重写 _subscribe
内的核心其实就一句 !subscription.closed && subscriber.next(this._value)
。
ReplaySubject
ReplaySubject ,它的功能比 BehaviorSubject 更“广泛”。简单点讲 BehaviorSubject 只能保存一个历史值,而 ReplaySubject 能保存多个历史值,并且能以次数和时间维度来进行约束。
虽然 BehaviorSubject 可以看作是一个特殊的 ReplaySubject ,但是 ReplaySubject 并不能保证加入的订阅一定能够接收到一个值,也就是说 ReplaySubject 的构造函数不会保存一个默认的初始值。 ReplaySubject 的本质是“回放”历史值。
在 ReplaySubject 中,为了从次数和时间方面来限制回放的个数,它新增了一个数组用来保存历史值,并且在构造的时候会指定回放的数量大小和最长有效时间:
1 | export class ReplaySubject<T> extends Subject<T> { |
可以在构造函数中看到:
- 默认情况下回放个数为无限大,即不会限制回放的个数。
- 默认情况下有效时间也是无限大,即所有的回放项都不会过期。
所以,如果不指定参数,那么 ReplaySubject 发出的值都会保存下来,当新的订阅者加入的时候,这些历史的值就会发送给这个新的订阅者。
_infiniteTimeWindow
属性用来标志是否开启了时间限制。
这里还有一个注意的点是 _buffer
属性的类型是 Array<T | number>
,这是因为在指定了回放时间的情况下, _buffer
的类型其实是 [T, number, T, number , ...]
,而如果不指定回放时间则 _buffer
的类型为 Array<T>
。这可以在重写的 next
方法中看到。
1 | export class ReplaySubject<T> extends Subject<T> { |
从 next
函数的代码中可以看到,在 _buffer
中存的是 Date.now() + _windowTime
,这表示的是该回放项的过期时间,接着调用了一个 _trimBuffer
函数,这个函数的作用是更新 _buffer
,把过期的回放项删除掉:
1 | export class ReplaySubject<T> extends Subject<T> { |
可以看到在清理 _buffer
的时候先按大小进行清理,然后如果开启了时间限制的情况下,再根据时间来把过期的项清理掉。
为了在观察者订阅的时候能够接收到需要回放的值, ReplaySubject 和 BehaviorSubject 一样重写了 _subscribe
方法:
1 | export class ReplaySubject<T> extends Subject<T> { |
这里可以看到它调用父类的两个方法 _innerSubscribe
和 _checkFinalizedStatuses
,在 Subject 中是先 _checkFinalizedStatuses
再 _innerSubscribe
,如果 Subject 出错或者完成,在 _checkFinalizedStatuses
中会调用相应的 complete
和 error
,而 _innerSubscribe
只是把 Subscriber 放到 Map 中而已,而由于 ReplaySubject 存在缓存,我们肯定希望能完整的回放整个流程,如果一上来就调用 _checkFinalizedStatuses
的话,就变成先得到了一个完成的通知,再接收到了缓存的值,这和 ReplaySubject (回放主题)的名字不对应。
这里还使用了一个 _buffer
的副本,防止在调用中出现修改 _buffer
的情况,即当一个订阅者加入时, _buffer
就已经确定,无法在观察过程中改变 _buffer
。
看源码的时候感觉这个根据时间来判断最右的过期项的过程使用遍历不会导致性能问题吗,为什么不用二分搜索呢?毕竟每次 next
和 subscribe
都会清理一次缓存…
AsyncSubject
AsyncSubject 看名字你可能会觉得可能和异步有什么关系,但其实关系不大, AsyncSubject 的核心就是只发出完成前的一个值,比如一个 AsyncSubject 通过发出了 99 个 1 ,然后发出了一个 0 ,接着调用 complete
表示完成,这样它的所有订阅者都只会收到一个 0 和一个完成的通知。
1 | const subject$ = new AsyncSubject<number>(); |
所以 AsyncSubject 第一个就要重写 next
函数,让它不要再通知所有的订阅者了,而是把每次发出的值记录下来:
1 | export class AsyncSubject<T> extends Subject<T> { |
接着当一个 AsyncSubject 完成之后,把记录的值发出去(如果存在的话):
1 | export class AsyncSubject<T> extends Subject<T> { |
当然,重写这两个只是解决了在 AsyncSubject 完成前对已存在的订阅者进行通知,而如果此时有新的订阅者添加进来,那么是无法收到值的,这和预期的功能不符,所以还需要重写 _checkFinalizedStatuses
方法,让它在检测到完成的时候把值发送给当前的订阅者:
1 | export class AsyncSubject<T> extends Subject<T> { |
这样新的订阅者加入的时候就会调用一次 _checkFinalizedStatuses
来进行判断了。
后记
至此, RxJS 的 Observable 、 Subscriber 、 Subscription 和 Subject 都讲完了。
下文我们会讲 RxJS 中的 Scheduler ,它和前面这四个的关系不是很大,它只是定义了订阅者获取值的时机。