Skip to content

AHABHGK

转化数据流

转化类操作符

  • 对每个数据进行转化,映射

  • 不转化单个数据,而是把数据重新组合,比如上游传下来 1、2、3 传给下游 [1, 2, 3],无损回压控制

映射

map

1function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R>

project 中的 this 指向 thisArg,相当于一个 context 对象,当然也可以用闭包产生一个 context

mapTo

1oba.pipe(map(x => 1)) // mapTo(1)

pluck

1oba.pipe(map(x => x.target.tagName)) // pluck('target', 'tagName')

缓存窗口:无损回压控制

支持数组的以 buffer 开头,支持 Observable 对象的以 window 开头

windowTime、bufferTime

windowTime

bufferTime

第一个参数表示缓存窗口时间长度,第二个参数表示缓存窗口创建时间间隔,第三个参数表示每个缓存窗口存的数据最大个数

windowCount、bufferCount

缓存窗口长度根据数据个数

windowWhen、bufferWhen、windowToggle、bufferToggle

根据 Observable 控制开始结束时机

window、buffer

notifer: An Observable that completes the previous window and starts a new window.

1// On every click, emit array of most recent interval events
2const clicks = fromEvent(document, 'click')
3const intervalEvents = interval(1000)
4const buffered = intervalEvents.pipe(buffer(clicks))

高阶 map

1interval(1000).pipe(map((value) => timer(value * 100)))

HOmap

  • concatMap = map + concatAll

  • mergeMap = map + mergeAll

  • switchMap = map + switch

  • exhaustMap = map + exhaust

把 map 产生的高阶 Observable 利用对应的组合操作符合并为一阶的 Observable 对象

concatMap

拖拽:

1const mouseMove$ = fromEvent<MouseEvent>(document, 'mousemove')
2const mouseUp$ = fromEvent<MouseEvent>(document, 'mouseup')
3
4const mouseDownOnBtn$ = fromEvent<MouseEvent>(btn!, 'mousedown')
5const drag$ = mouseDownOnBtn$.pipe(
6 concatMap((startEvent) => {
7 const initialLeft = (btn as HTMLElement).offsetLeft
8 const initialTop = (btn as HTMLElement).offsetTop
9 const stop$ = mouseUp$
10 return mouseMove$.pipe(
11 takeUntil(stop$),
12 map((moveEvent) => ({
13 x: moveEvent.x - startEvent.x + initialLeft,
14 y: moveEvent.y - startEvent.y + initialTop,
15 }))
16 )
17 })
18)
19
20drag$.subscribe((e) => {
21 ;(btn as HTMLElement).style.left = `${e.x}px`
22 ;(btn as HTMLElement).style.top = `${e.y}px`
23})

mergeMap

其他的 Rx 和之前的版本都叫 flatMap

AJAX:

1fromEvent<MouseEvent>(btn!, 'click').pipe(
2 mergeMap(() => ajax(apiUrl))
3).subscribe(render)

switchMap

flatMapLatest

可以用来解决 AJAX 竞态问题

exhaustMap

flatMapFirst

高阶 mapTo

  • concatMapTo

  • mergeMapTo

  • switchMapTo

1oba.pipe(concatMap(() => inner$)) // concatMapTo(inner$)

expand

这个操作符类似于 mergeMap,但是,所有 expand 传递给下游的数据,同时也会传递给自己,就像是逐层“展开”所有的数据,expand构成了一个逻辑死循环,除非发生异常,或者 expand 的函数参数 project 代码中限定在某些情况下不再产生数据

2 的 n 次方:

1of(1).pipe(expand(value, index) => of(value * 2).pipe(delay(1000)))

数据分组

groupBy

groupBy 作为一个内容分发器,对上游推送的数据检测 key 值,如果 key 第一次出现,就产生一个新的 Observable 对象,否则就给相应 key 的 Observable 对象

根据奇偶性分组:

1interval(1000).pipe(groupBy((x) => x % 2))

groupBy

根据 key 产生的 Observable 对象是一个 GroupedObservable,有 key 属性可以分辨

事件委托:

1const clicks$ = fromEvent(document, 'click')
2const groupByClass$ = clicks$.pipe(groupBy(e => e.target.className))
3groupByClass$.pipe(
4 filter(value => value.key === 'foo'),
5 mergeAll(),
6).subscribe(fooHandler)

partition

It's like filter, but returns two Observables: one like the output of filter, and the other with values that did not pass the condition.

1const observableValues = of(1, 2, 3, 4, 5, 6)
2const [evens$, odds$] = partition(observableValues, (value, index) => value % 2 === 0)
3odds$.subscribe(x => console.log('odds', x))
4evens$.subscribe(x => console.log('evens', x))

累计数据

之前所有操作符,上游的数据之间都不会产生影响,非常符合 FP,但是有时需要依赖之前的数据

scan

参数类似 reduce 的参数,这俩区别在于 scan 对上游每一个数据都会产生一个规约结果,而 reduce 是对上游所有数据进行规约;reduce 最多只给下游传递一个数据,如果上游数据永不完结,那 reduce 也永远不会产生数据,而 scan 完全可以处理一个永不完结的上游 Observable 对象

scan 常用来维持应用状态

1const clicks = fromEvent(document, 'click')
2const ones = clicks.pipe(mapTo(1))
3const seed = 0
4const count = ones.pipe(scan((acc, one) => acc + one, seed))
5count.subscribe(x => console.log(x))

scan 源码中内部 acc 就是 seed,seed 的赋值同时要改变 private hasSeed: boolean,通过 set seed() 将改变 hasSeed 和 seed 赋值的逻辑进行耦合

mergeScan

It's like scan, but the Observables returned by the accumulator are merged into the outer Observable.

1const click$ = fromEvent(document, 'click')
2const one$ = click$.pipe(mapTo(1))
3const seed = 0
4const count$ = one$.pipe(
5 mergeScan((acc, one) => of(acc + one), seed),
6)
7count$.subscribe(x => console.log(x))

懒加载:

1const result$ = throttledScrollToEnd$.pipe(
2 mergeScan((acc, cur) => getData(acc.length).pipe(
3 map(newData => acc.pipe(
4 concat(newData),
5 ))),
6 [],
7 ),
8)