前言 在写完 pipe
管道之后,写一写 Subject
正文 什么是 Subject 什么是 Subject
An RxJS Subject
is a special type of Observable
that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable
), Subjects are multicast.
是一种特殊 Observable
可观察对象,它允许将值多播给多个观察者。普通的 Observable
是单播的,即每个 subscribe
的观察者都拥有独立的上下文,而 Subject
在写《 RxJS
的一些使用及简单理解》的时候,提到过 Observable
1 2 3 4 5 6 7 8 9 import { Observable } from "rxjs" ;const observable = new Observable ((subscriber ) => { subscriber.next (Math .random ()); subscriber.complete (); }); observable.subscribe ((val ) => console .log (val)); observable.subscribe ((val ) => console .log (val));
上面的代码会输出不同的值,即每调用一次 subscribe
,都会执行一次传入 Observable
如果使用了 Subject
,那么两次 subscribe
1 2 3 4 5 6 7 8 import { Subject } from "rxjs" ;const subject = new Subject ();subject.subscribe ((val ) => console .log (val)); subject.subscribe ((val ) => console .log (val)); subject.next (Math .random ());
是不是感觉很熟悉,很像 EventEmitter
完全可以看作在注册一个具名的事件, next
就像 emit
,而 subscribe
就像 on
对于每个 Subject
,它都维护了一个 Observer
队列,每当调用 subscribe
时把传入的 Observer
加入到队列中,当产生了新的值时,会遍历这个队列,将值发送给每个 Observer
PS:这里要注意,调用 Subject
的 subscribe
并不会和普通的 Observable
(通过 new Observable
产生的)对象调用 subscribe
一样,执行传入 Observable
回调( Subject
这里可能会有疑问:你这个 Subject
怎么还有 next
是一个可观察对象( Observable
),同时也是一个观察者( Observer
换句话说, Subject
Subject 有什么用 经过上面的分析,我们知道了 Subject
的核心能力就是可以把单播的 Observable
转成多播的 Observable
不过我们很难去想象实际的使用场景,所以我去扒了以下 ant-design 的 angular 版本,找到了比较优雅的应用
使用 Subject 来统一 unsubscribe 已 subscribe 的流 我们知道,当 Observable
被 subscribe
之后,会返回一个 Subscription
,调用它的 unsubscribe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import { Component , OnDestroy , OnInit } from '@angular/core' ;import { Observable , Subscription } from 'rxjs' ;@Component ({ selector : '.test' , template : `<div></div>` , }) export class TestComponent implements OnInit , OnDestroy { subscription1 = Subscription .EMPTY ; subscription2 = Subscription .EMPTY ; observable1; observable2; constructor ( ) { this .observable1 = new Observable ((subscriber ) => { let index = 0 ; const timer = setInterval (() => { subscriber.next (index++); }, 1000 ); return () => { clearInterval (timer); }; }); this .observable2 = new Observable ((subscriber ) => { let index = 0 ; const timer = setInterval (() => { subscriber.next (index++); }, 1000 ); return () => { clearInterval (timer); }; }); } ngOnInit ( ) { this .subscription1 = this .observable1 .subscribe ((val ) => console .log (val)); this .subscription2 = this .observable2 .subscribe ((val ) => console .log (val)); } ngOnDestroy ( ) { this .subscription1 .unsubscribe (); this .subscription2 .unsubscribe (); } }
看起来好像没什么问题,但是如果此时不是两个,而是 20
个 Observable
写 20
个 Subscription
变量,然后去依次调用 unsubscribe
这时候我们就可以使用 Subject
以及 pipe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import { Component , OnDestroy , OnInit } from '@angular/core' ;import { Observable , Subject , Subscription , takeUntil } from 'rxjs' ;@Component ({ selector : '.test' , template : `<div></div>` , }) export class TestComponent implements OnInit , OnDestroy { observable1; observable2; destory$ = new Subject <void >(); constructor ( ) { this .observable1 = new Observable ((subscriber ) => { let index = 0 ; const timer = setInterval (() => { subscriber.next (index++); }, 1000 ); return () => { clearInterval (timer); }; }); this .observable2 = new Observable ((subscriber ) => { let index = 0 ; const timer = setInterval (() => { subscriber.next (index++); }, 1000 ); return () => { clearInterval (timer); }; }); } ngOnInit ( ) { this .observable1 .pipe (takeUntil (this .destory$ )) .subscribe ((val ) => console .log (val)); this .observable2 .pipe (takeUntil (this .destory$ )) .subscribe ((val ) => console .log (val)); } ngOnDestroy ( ) { this .destory$ .next (); this .destory$ .complete (); } }
这里使用了 takeUntil
这个管道,这个管道的作用就是在传入的流发出值时,结束源流的订阅(调用 complete
可以用官网的一张图来解释 takeUntil
不过这似乎和多播特性无关,只是使用了 Subject
作为 Observer
的特性(可以调用 next
模拟 EventBus 事件总线 可以创建一个 Service
,内部使用 Subject
来模拟 EventBus
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import { Injectable , OnDestroy } from '@angular/core' ;import { Subject , Subscription , takeUntil } from 'rxjs' ;@Injectable ({ providedIn : 'root' , }) export class TestService implements OnDestroy { eventMap : Record <string , Subject <any []>> = {}; destroy$ = new Subject <void >(); on<T extends any []>( eventName : string , callback : (...args : T ) => void ): Subscription { if (this .eventMap [eventName] === undefined ) { this .eventMap [eventName] = new Subject <any []>(); } const subject = this .eventMap [eventName]; return subject .pipe (takeUntil (this .destroy$ )) .subscribe ((args ) => callback (...(args as T))); } emit<T extends any []>(eventName : string , ...args : T): void { if (this .eventMap [eventName] === undefined ) { return ; } this .eventMap [eventName].next (args); if (this .eventMap ['*' ] === undefined ) { return ; } this .eventMap ['*' ].next ([...args, eventName]); } ngOnDestroy ( ) { this .destroy$ .next (); this .destroy$ .complete (); } }
和 mitt
库的区别就是没有了 off
函数,而是使用 Subscription
的 unsubscribe
几种 Subject 子类 在官方的实现中,提供了几种 Subject
的子类供我们使用,每种子类 Subject
BehaviorSubject 特征:能够向订阅的 Observer
1 2 3 4 5 6 7 8 9 10 11 import { BehaviorSubject } from "rxjs" ;const subject = new BehaviorSubject (0 );subject.subscribe ((val ) => console .log ("第一个 Observer " , val)); subject.next (1 ); subject.next (2 ); subject.subscribe ((val ) => console .log ("第二个 Observer " , val));
注意,如果不向 BehaviorSubject
的构造函数传入默认的初始值,那么第一个 Observer
会打印 undefined
,也就是默认的初始值被置为了 undefined
1 2 3 4 5 import { BehaviorSubject } from "rxjs" ;const subject = new BehaviorSubject ();subject.subscribe ((val ) => console .log ("第一个 Observer " , val));
ReplaySubject 特征:能够向订阅的 Observer
立即发送“最近”的一些在限制范围内 的值
可以理解为一个特殊的 ReplaySubject
, ReplaySubject
可以通过指定 bufferSize
来获取“最近”的一些 值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import { ReplaySubject } from "rxjs" ;const subject = new ReplaySubject (3 );subject.subscribe ((val ) => console .log ("第一个 Observer " , val)); subject.next (1 ); subject.next (2 ); subject.next (3 ); subject.next (4 ); subject.subscribe ((val ) => console .log ("第二个 Observer " , val)); subject.next (5 )
除了限定 bufferSize
,还可以通过第二个参数指定 windowTime
即在 subscribe
之后,查找往前 windowTime
内,最大 bufferSize
个数的值,然后按顺序发送给 Observer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import { ReplaySubject } from "rxjs" ;const subject = new ReplaySubject (3 , 500 );setTimeout (() => { subject.next (1 ); }, 200 ); setTimeout (() => { subject.next (2 ); }, 400 ); setTimeout (() => { subject.next (3 ); }, 600 ); setTimeout (() => { subject.subscribe ((val ) => console .log (val)); }, 800 );
为什么输出 2 3
,而不是 1 2 3
因为我们在 800ms
时 subscribe
了,这时往前找 500ms
内的值,即 300ms
之后发送的值,而 1
是 200ms
发送的值,所以不会发送给 Observer
和 windowTime
共同限制给 Observer
如果这 bufferSize
个值正好都是 windowTime
内发出的,那么皆大欢喜,全丢给 Observer
如果某些值不在 windowTime
内发出(早于 windowTime
),那么就舍弃这部分值,把在 windowTime
内的值丢给 Observer
AsyncSubject 特征:只取最后一个 值,在 Subject
完成之后(调用 complete
1 2 3 4 5 6 7 8 9 10 import { AsyncSubject } from "rxjs" ;const subject = new AsyncSubject ();subject.next (1 ); subject.next (2 ); subject.subscribe ((val ) => console .log (val)); subject.complete ();
很像普通的 Observable
配合 last
1 2 3 4 5 6 7 8 9 10 import { last, Observable } from "rxjs" ;const observable = new Observable ((subscriber ) => { subscriber.next (1 ); subscriber.next (2 ); subscriber.next (3 ); subscriber.complete (); }); observable.pipe (last ()).subscribe ((val ) => console .log (val));