RxJS 一些有趣的 operate 操作符
前言
RxJS
一些有趣的 operate
操作符
之前我们写了关于 RxJS
中的几个概念,Observable
, Subject
, pipe
, Scheduler
等
而 RxJS
之所以很方便,很大程度上取决于它封装好的一些 operate
操作符
前文的 pipe
管道也属于操作符的一种
正文
RxJS
中,个人理解其实就是两大类的操作符,一类是创建类操作符,一类就是管道类
不过文档中给它分类的更加详细
There are operators for different purposes, and they may be categorized as: creation, transformation, filtering, joining, multicasting, error handling, utility, etc. In the following list you will find all the operators organized in categories.
创建类操作符
在 RxJS
中,创建类的操作符其实不多,有几个比较常见的
fromEvent
fromEvent
使得我们能够进行 dom
元素事件的绑定
1 | import { fromEvent } from "rxjs"; |
这样子每次我们点击就可以输出一次值,该值为事件 event
对象
如果我们想要解绑该事件
我们可以调用返回的 Subscription
的 unsubscribe
方法
1 | import { fromEvent } from "rxjs"; |
interval
setInterval
的一个包装,使得我们可以启用一个定时器
1 | import { interval } from "rxjs"; |
每次会输出该次对应的索引(从 0
开始)
取消该定时器,同样调用 Subscription
的 unsubscribe
方法
1 | import { interval } from "rxjs"; |
of
把传入的每个值都经过 Observable
发送出来
1 | import { of } from "rxjs"; |
传入 1, 2, 3, 4
,那么输出就是 1, 2, 3, 4
了
range
生成一个序列,然后经由 Observable
发送出来
1 | import { range } from "rxjs"; |
注意,这里的参数不是起始和结束,而是起始和个数
即输出不是 2, 3, 4, 5
而是 2, 3, 4, 5, 6
generate
能够像 for
那样生成值
1 | import { generate } from "rxjs"; |
上面的代码表示初始值为 1
, 结束条件为值小于 5
, 然后每次都会自增 1
和下面的代码等价
1 | new Observable((subscriber) => { |
这样子我们就能从 1
输出到 4
merge
合并多个 Observable
为单个 Observable
,然后一一输出对应的值
1 | import { fromEvent, interval, map, merge } from "rxjs"; |
经过 merge
之后,新的 Observable
会在每次点击和间隔 1s
输出一次对应的值
forkJoin
合并多个 Observable
为单个 Observable
,收集每个 Observable
完成前的一个值,然后输出
1 | import { forkJoin, fromEvent, interval, map, take } from "rxjs"; |
在点击两次且等待超过三秒之后,会输出一个数组,这个数组包含二者的值
concat
合并多个 Observable
为单个 Observable
,和 merge
不同的是,concat
会在第一个 Observable
完成之后再订阅第二个 Observable
,以此类推
1 | import { concat, fromEvent, interval, map, take } from "rxjs"; |
这里我们必须点击两次之后,定时器才会启动,如果点击事件不使用 take(2)
限定次数的话,那么定时器永远不会开始
因为点击事件永远无法完成
race
合并多个 Observable
为单个 Observable
,只会输出一个发出值的 Observable
1 | import { fromEvent, interval, map, race, take } from "rxjs"; |
此时如果立马进行点击,那么定时器就不会输出,如果等到定时器输出第一个值,那么点击事件就不会生效
定时器“赢”
点击事件“赢”
zip
合并多个 Observable
为单个 Observable
,每个 Observable
发出的每个值会组成一个数组作为新的值发出
1 | import { fromEvent, interval, map, take, zip } from "rxjs"; |
每个点击和每次定时器执行作为值发送出去,新值数量由这些 Observable
发出的值的最少的个数来确定
在这里点击事件只会发生两次,所以新的 Observable
只会发出两个值
定时器的第三个值无法被匹配,故不会发送到新的 Observable
对象中
一旦某个传入的 Observable
完成,其他传入的 Observable
会被 unsubscribe
,所以不用担心资源释放问题
combineLatest
合并多个 Observable
为单个 Observable
,与 zip
类似,但是 combineLatest
会取最近的一次值组成一个新值然后发出
1 | import { combineLatest, fromEvent, interval, map, take, zip } from "rxjs"; |
这里假如我们等三秒之后在点击,那么输出的值是 [event, '定时器:2']
而 zip
这里会输出 [event, '定时器:0']
除了上面这些常用的, RxJS
还封装了 ajax
,不过前端一般都是用 axios
来进行 http
请求
所以感觉用处不大
还有一些比较少见的创建操作符,比如 defer
, timer
, iif
等,都不难,看一下文档一下子就懂了
管道操作符
在之前的文章 RxJS 使用之 pipe 管道 中,我们已经列举了一些常见的管道操作符了
比如 map
, filter
, first
, last
等
这些一般都比较容易理解
现在我们来列举一些高级一点的管道
takeUntil
传入一个 Observable
,在这个 Observable
完成时,源 Observable
则完成
1 | import { fromEvent, interval, map, take, takeUntil } from "rxjs"; |
这里如果我们等 3s
之后再点击,那么点击事件不会生效,因为定时器已经完成了
在 3s
内的点击事件都是可以发出的
之前我们在 Subject
那一章讲过这个管道,在 antd
( angular
版本) 中非常常见
在需要对全局可观察对象订阅的地方,加上 pipe(takeUntil(destroy$))
,然后在 ngOnDestroy
钩子中完成 destroy$
即可比较优雅的管理这些订阅过程
distinct
对于源 Observable
发送的一系列的值,会经过去重之后再发送出来
对于基本类型来说,使用起来非常简单
1 | import { distinct, of} from "rxjs"; |
这里源 Observable
发出 1, 2, 3, 3, 4, 4
经过去重之后会输出 1, 2, 3, 4
对于引用类型,可以通过一个函数来提取唯一的部分进行比较,一般都是以对象的某个属性来去重,比如唯一 id
等
当然如果不传,那么就是按照引用的地址进行比较,即 obj1 === obj2
这种形式
1 | import { distinct, of } from "rxjs"; |
由于 s2
和 s3
的 id
是一样的,所以最终只会输出 s1
和 s2
distinctUntilChanged
这个就比较有意思了,从名字上看,也是去重,但是去重的模式不一样
这个管道的去重指和前一个值进行比较,如果相同,那么该值就不会被输出
1 | import { distinctUntilChanged, of } from "rxjs"; |
如果我们使用 distinct
,上面的代码输出为 1, 2
而 distinctUntilChanged
会输出 1, 2, 1, 2
当然这个管道也支持传入函数来确定唯一的值
比如,现在我们希望在窗口水平 resize
的时候进行一些操作,可以这么写
1 | import { distinctUntilChanged, fromEvent, map } from "rxjs"; |
效果如下:
当然,如果你想换成垂直 resize
,只要更改为 distinctUntilChanged((pre, cur) => pre.height === cur.height)
即可
可以想象一下如果我们不使用 RxJS
的话,这段逻辑写起来还是挺复杂的,而且免不得得产生一些局部变量(记录 pre
值)
retry
在 Observable
发生错误的时候可以重新订阅
1 | import { Observable, retry } from "rxjs"; |
在这里我们使用 retry(2)
表示如果源 Observable
发生了错误,那么重新订阅,最大次数为 2
次
也就是两次后,如果成功了,那无事发生,如果还是报错,那么错误就会通过 error
回调抛出
可以看到前两次的 error
并没有被抛出,只有最后一次的 error
被抛出
这个管道对于 http
请求还是挺有用的,直接就可以设置重拾次数
elementAt
直接取某个索引的元素,很像 js
中通过 []
取值
1 | import { elementAt, range } from "rxjs"; |
取索引 2
的值,对于 1, 2, 3, 4, 5
,也就是数字 3
效果如下:
single
只有在匹配到一个值的时候才会输出,其他情况下,没有值匹配或者匹配了多个值,则会报错
1 | import { of, single } from "rxjs"; |
效果如下:
combineLatestAll, concatAll, mergeAll
和 combineLatest
的效果一样,不过 combineLatestAll
作为管道主要用于高阶 Observable
1 | import { fromEvent, map, interval, take, combineLatestAll } from "rxjs"; |
这里每次点击产生一个定时器,每个定时器只输出两个值,只响应两次点击事件
意味着会有两个定时器,然后通过 combineLatestAll
结合两个定时器,完全可以看成如下代码
1 | import { combineLatest } from "rxjs"; |
效果如下:
除了 combineLatestAll
,其他的一些创建类操作符也有其对应的管道操作符
concat
对应concatAll
merge
对应mergeAll
exhaustAll
用于高阶 Observable
,当订阅第一个 Observable
,接下来在这个 Observable
完成前就订阅的 Observable
被抛弃
感觉很绕,我们可以写个例子
1 | import { fromEvent, interval, take, map, exhaustAll } from "rxjs"; |
这里我们使用 map
让生成的 Observable
成为高阶的,然后使用 exhaustAll
当我们点击的时候,会输出 0, 1, 2
,如果在 0, 1, 2
输出还没完成的时候,继续点击,那么这个 Observable
是会被抛弃的
也就是只有等到 0, 1, 2
输出完成,我们再次点击,那么才会继续输出 0, 1, 2
效果如下:
switchAll
用于高阶 Observable
,这个和 exhaustAll
相反,如果在前一个 Observable
还未完成时,得到了下一个 Observable
那么当前订阅的 Observable
会被完成,然后去订阅这个新的 Observable
1 | import { fromEvent, interval, take, map, tap, switchAll } from "rxjs"; |
这个例子和 exhaustAll
的一样,只不过我们使用 switchAll
来合并 Observable
在输出 0, 1, 2
输出未完成之前,继续点击的话,那么会重新输出 0, 1, 2
效果如下:
后记
RxJS
的系列基本就结束了,接下来学习 Angular
,在 RxJS
方面应该就问题不大了