转化数据流
转化类操作符
对每个数据进行转化,映射
不转化单个数据,而是把数据重新组合,比如上游传下来 1、2、3 传给下游 [1, 2, 3],无损回压控制
映射
map
1function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R>
project 中的 this 指向 thisArg,相当于一个 context 对象,当然也可以用闭包产生一个 context
mapTo
1oba.pipe(map(x => 1)) // mapTo(1)
pluck
1oba.pipe(map(x => x.target.tagName)) // pluck('target', 'tagName')
缓存窗口:无损回压控制
支持数组的以 buffer 开头,支持 Observable 对象的以 window 开头
windowTime、bufferTime
第一个参数表示缓存窗口时间长度,第二个参数表示缓存窗口创建时间间隔,第三个参数表示每个缓存窗口存的数据最大个数
windowCount、bufferCount
缓存窗口长度根据数据个数
windowWhen、bufferWhen、windowToggle、bufferToggle
根据 Observable 控制开始结束时机
window、buffer
notifer: An Observable that completes the previous window and starts a new window.
1// On every click, emit array of most recent interval events2const clicks = fromEvent(document, 'click')3const intervalEvents = interval(1000)4const buffered = intervalEvents.pipe(buffer(clicks))
高阶 map
1interval(1000).pipe(map((value) => timer(value * 100)))
concatMap = map + concatAll
mergeMap = map + mergeAll
switchMap = map + switch
exhaustMap = map + exhaust
把 map 产生的高阶 Observable 利用对应的组合操作符合并为一阶的 Observable 对象
concatMap
拖拽:
1const mouseMove$ = fromEvent<MouseEvent>(document, 'mousemove')2const mouseUp$ = fromEvent<MouseEvent>(document, 'mouseup')34const mouseDownOnBtn$ = fromEvent<MouseEvent>(btn!, 'mousedown')5const drag$ = mouseDownOnBtn$.pipe(6 concatMap((startEvent) => {7 const initialLeft = (btn as HTMLElement).offsetLeft8 const initialTop = (btn as HTMLElement).offsetTop9 const stop$ = mouseUp$10 return mouseMove$.pipe(11 takeUntil(stop$),12 map((moveEvent) => ({13 x: moveEvent.x - startEvent.x + initialLeft,14 y: moveEvent.y - startEvent.y + initialTop,15 }))16 )17 })18)1920drag$.subscribe((e) => {21 ;(btn as HTMLElement).style.left = `${e.x}px`22 ;(btn as HTMLElement).style.top = `${e.y}px`23})
mergeMap
其他的 Rx 和之前的版本都叫 flatMap
AJAX:
1fromEvent<MouseEvent>(btn!, 'click').pipe(2 mergeMap(() => ajax(apiUrl))3).subscribe(render)
switchMap
flatMapLatest
可以用来解决 AJAX 竞态问题
exhaustMap
flatMapFirst
高阶 mapTo
concatMapTo
mergeMapTo
switchMapTo
1oba.pipe(concatMap(() => inner$)) // concatMapTo(inner$)
expand
这个操作符类似于 mergeMap,但是,所有 expand 传递给下游的数据,同时也会传递给自己,就像是逐层“展开”所有的数据,expand构成了一个逻辑死循环,除非发生异常,或者 expand 的函数参数 project 代码中限定在某些情况下不再产生数据
2 的 n 次方:
1of(1).pipe(expand(value, index) => of(value * 2).pipe(delay(1000)))
数据分组
groupBy
groupBy 作为一个内容分发器,对上游推送的数据检测 key 值,如果 key 第一次出现,就产生一个新的 Observable 对象,否则就给相应 key 的 Observable 对象
根据奇偶性分组:
1interval(1000).pipe(groupBy((x) => x % 2))
根据 key 产生的 Observable 对象是一个 GroupedObservable,有 key 属性可以分辨
事件委托:
1const clicks$ = fromEvent(document, 'click')2const groupByClass$ = clicks$.pipe(groupBy(e => e.target.className))3groupByClass$.pipe(4 filter(value => value.key === 'foo'),5 mergeAll(),6).subscribe(fooHandler)
partition
It's like filter, but returns two Observables: one like the output of filter, and the other with values that did not pass the condition.
1const observableValues = of(1, 2, 3, 4, 5, 6)2const [evens$, odds$] = partition(observableValues, (value, index) => value % 2 === 0)3odds$.subscribe(x => console.log('odds', x))4evens$.subscribe(x => console.log('evens', x))
累计数据
之前所有操作符,上游的数据之间都不会产生影响,非常符合 FP,但是有时需要依赖之前的数据
scan
参数类似 reduce 的参数,这俩区别在于 scan 对上游每一个数据都会产生一个规约结果,而 reduce 是对上游所有数据进行规约;reduce 最多只给下游传递一个数据,如果上游数据永不完结,那 reduce 也永远不会产生数据,而 scan 完全可以处理一个永不完结的上游 Observable 对象
scan 常用来维持应用状态
1const clicks = fromEvent(document, 'click')2const ones = clicks.pipe(mapTo(1))3const seed = 04const count = ones.pipe(scan((acc, one) => acc + one, seed))5count.subscribe(x => console.log(x))
scan 源码中内部 acc 就是 seed,seed 的赋值同时要改变 private hasSeed: boolean
,通过 set seed()
将改变 hasSeed 和 seed 赋值的逻辑进行耦合
mergeScan
It's like scan, but the Observables returned by the accumulator are merged into the outer Observable.
1const click$ = fromEvent(document, 'click')2const one$ = click$.pipe(mapTo(1))3const seed = 04const count$ = one$.pipe(5 mergeScan((acc, one) => of(acc + one), seed),6)7count$.subscribe(x => console.log(x))
懒加载:
1const result$ = throttledScrollToEnd$.pipe(2 mergeScan((acc, cur) => getData(acc.length).pipe(3 map(newData => acc.pipe(4 concat(newData),5 ))),6 [],7 ),8)