Skip to content

AHABHGK

合并数据流

合并类操作符

concat 首尾相连

concat 先从第一个 Observable 对象获取数据,第一个 Observable 就是调用 concat 的那个对象(类似 [1, 2].concat([3, 4])),第一个 complete 之后 concat 会订阅第二个 Observable 以此类推,直至最后一个 Observable complete,concat 的 Observable 才完结

因此前一个 Observable 不 complete,后一个 Observable 就不会开始

1const oba1$ = of(1, 2, 3)
2const oba2$ = of(4, 5, 6)
3of().pipe(concat(oba1$, oba2$)).subscribe(console.log)

merge 先到先得快速通过

merge 会同时订阅所有的上游对象,一般用于异步数据流

1const source1$ = timer(0, 1000).pipe(map((n) => `A${n}`))
2const source2$ = timer(500, 1000).pipe(map((n) => `B${n}`))
3merge(source1$, source2$).subscribe(console.log)
4// A0
5// B0
6// A1
7// B1
8// ...

merge 同时订阅是相对于异步来说,实际上仍然是依次订阅,由于订阅 of(1, 2, 3) 后还没有来得及订阅后一个,就已经吐出数据,所以输出与 concat 一样

1merge(of(1, 2, 3), of(4, 5, 6)).subscribe(console.log) // 1 2 3 4 5 6

有一个 concurrent: number 可选参数用来限流

1const source1$ = time(0, 1000)
2const source2$ = time(500, 1000)
3const source3$ = time(1000, 1000)
4merge(source1$, source2$, source3$, 2).subscribe(console.log)

这样 source3$ 就不会进行订阅,除非 source1$ 或 source2$ complete

例子:因为在移动设备上 touchend 事件出现得比 click 更早,这两个事件的处理是一模一样的但是 fromEvent 不能同时获得两个事件的数据流,这时候就要借助 merge 的力量

1const click$ = fromEvent(element, 'click')
2const touchend$ = fromEvent(element, 'touchend')
3merge(click$, touchend$).subscribe(eventHandler)

zip 拉链式组合

就像拉链一样组合

1const age$ = of<number>(27, 25, 29)
2const name$ = of<string>('Foo', 'Bar', 'Beer')
3const isDev$ = of<boolean>(true, true, false)
4
5zip(age$, name$, isDev$).subscribe(console.log)
6// [27, 'Foo', true]
7// [25, 'Bar', true]
8// [29, 'Beer', false]

对于步调不一致时或数量不一致时,会把提前的自动延后,多的删减成少的

1const source1$ = interval(3000)
2const source2$ = timer(3000, 1000)
3zip(source1$, source2$).subscribe(console.log)

zip

数据积压问题:如果一个吐出速度过慢,zip 会保存吐出速度过快的数据,等待另一个吐出数据,这时如果保存数据量过大,就会占用大量内存,zip 自身解决不了,会有其他方式解决

combineLatest 合并最后一个数据

Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.

combineLatest

1import { combineLatest, timer } from 'rxjs';
2
3const firstTimer = timer(0, 1000); // emit 0, 1, 2... after every second, starting from now
4const secondTimer = timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now
5const combinedTimers = combineLatest(firstTimer, secondTimer);
6combinedTimers.subscribe(value => console.log(value));
7// Logs
8// [0, 0] after 0.5s
9// [1, 0] after 1s
10// [1, 1] after 1.5s
11// [2, 1] after 2s

对于同步的,类似于 merge

1const source1$ = of('a', 'b', 'c')
2const source2$ = of(1, 2, 3)
3const source3$ = of('x', 'y')
4source1$.combineLatest(source2$, source3$)
5 .subscribe(console.log, null, () => console.log('complete')
6// ['c', 3, 'x']
7// ['c', 3, 'y']
8// 'complete'

最后一个参数可以是一个 resultSelector 函数,相当于 .pipe(map(/* codes */))

对于多重依赖的 source,就像 cpp 中的多重继承一样,会出现一些反常识的问题

1const original$ = timer(0, 1000)
2const source1$ = original$.pipe(map(x => `${x}a`))
3const source2$ = original$.pipe(map(x => `${x}b`))
4combineLatest(source1$, source2$).subscribe(
5 console.log,
6 null,
7 () => console.log('complete')
8)
9// ['0a', '0b']
10// ['1a', '0b']
11// ['1a', '1b']
12// ['2a', '1b']
13// ['2a', '2b']
14
15// 对于完全正统的 FRP 的定义,应该是这样
16// ['0a', '0b']
17// ['1a', '1b']
18// ['2a', '2b']

combineLatest-glitch

如果想要纯正的输出,可以使用 withLatestFrom

withLatestFrom

Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.

1const source1$ = timer(0, 2000).pipe(map(x => 100 * x))
2const source2$ = timer(500, 1000)
3source1$.pipe(
4 withLatestFrom(source2$, (a, b) => a + b),
5).subscribe(
6 console.log,
7 null,
8 () => console.log('complete'),
9)
10// 101
11// 203
12// 305
13// ...

withLatestFrom

1const original$ = timer(0, 1000)
2const source1$ = original$.pipe(map(x => `${x}a`))
3const source2$ = original$.pipe(map(x => `${x}b`))
4source1$.pipe(
5 withLatestFrom(source2$),
6).subscribe(
7 console.log,
8 null,
9 () => console.log('complete'),
10)
11// ['0a', '0b']
12// ['1a', '1b']
13// ['2a', '2b']
14// ['3a', '3b']

所以如果要合并完全独立的 Observable 对象,使用 combineLatest,如何要把一个Observable对象“映射”成新的数据流,同时要从其他 Observable 对象获取“最新数据”,就是用 withLatestFrom

race 胜者通吃

1race(
2 from(sleep(500).then(() => 1)),
3 from(sleep(1000).then(() => 2)),
4).subscribe(
5 console.log,
6 null,
7 () => console.log('on complete'),
8)
9// 1
10// 'on complete'

startWith

1of("from source")
2 .pipe(startWith("first", "second"))
3 .subscribe(x => console.log(x))
4// 'first'
5// 'second'
6// 'from source'

startWith 的所有参数都是同步吐出的,如果需要异步吐出参数,那还是只能利用 concat

forkJoin

1const observable = forkJoin({
2 foo: of(1, 2, 3, 4),
3 bar: Promise.resolve(8),
4 baz: timer(4000),
5})
6observable.subscribe({
7 next: value => console.log(value),
8 complete: () => console.log('This is how it ends!'),
9})
10
11// Logs:
12// { foo: 4, bar: 8, baz: 0 } after 4 seconds
13// "This is how it ends!" immediately after
1const observable = forkJoin([
2 of(1, 2, 3, 4),
3 Promise.resolve(8),
4 timer(4000),
5])
6observable.subscribe({
7 next: value => console.log(value),
8 complete: () => console.log('This is how it ends!'),
9})
10
11// Logs:
12// [4, 8, 0] after 4 seconds
13// "This is how it ends!" immediately after

高阶 Observable

HO$::$ -> $

1interval(1000).pipe(
2 take(2),
3 map(x => interval(2000).pipe(
4 take(2),
5 map(y => `${x}:${y}`),
6 )),
7)

每产生一个数据,就对应产成一个 $

HO$

HO$ 的意义

用 Observable 来管理多个 Observable

数据流管理的是数据,但数据流本身也可以看作一种数据,使 Observable 对象通过 Observable 对象来管理

过程对数据进行抽象,同时过程也是数据,通过高阶过程对过程进行抽象

操作高阶 Observable 的合并类操作符

  • concatAll

    Flattens an Observable-of-Observables by putting one inner Observable after the other.

    1interval(1000).pipe(
    2 take(2),
    3 map(x => interval(2000).pipe(
    4 take(2),
    5 map(y => `${x}:${y}`),
    6 )),
    7).pipe(concatAll())

    concatAll

    当第一个 Observable 对象完结的时候,才会去订阅第二个内部 Observable 对象。虽然高阶 Observable 对象已经产生了第二个 Observable 对象,不代表 concatAll 会立刻去订阅它

  • mergeAll

    Flattens an Observable-of-Observables.

    1interval(1000).pipe(
    2 take(2),
    3 map(x => interval(2000).pipe(
    4 take(2),
    5 map(y => `${x}:${y}`),
    6 )),
    7).pipe(mergeAll())

    mergeAll

  • zipAll

    1interval(1000).pipe(
    2 take(2),
    3 map(x => interval(2000).pipe(
    4 take(2),
    5 map(y => `${x}:${y}`),
    6 )),
    7).pipe(zipAll())

    zipAll

    1const ho$ = interval(1000).pipe(
    2 take(2),
    3 concat(Observable.never()),
    4 map(x => interval(1500).pipe(
    5 map(y => x+':'+y),
    6 take(2),
    7 )),
    8)
    9const concated$ = ho$.pipe(zipAll())

    现在,zipAll 的上游是一个永不完结的 Observable,当它拿到 2 个内部 Observable 的时候,无法确定是不是还有新的内部 Observable 产生,而根 据“拉链”的工作方式,来自不同数据源的数据要一对一配对,这样一来,zipAll就只能等待,等待上游高阶 Observable 完结,这样才能确定内部 Observable对象的数量。如果上游的高阶 Observable 不完结,那么 zipAll 就不会开始工作

  • combineAll(combineLatestAll 有点啰嗦)

    1interval(1000).pipe(
    2 take(2),
    3 map(x => interval(2000).pipe(
    4 take(2),
    5 map(y => `${x}:${y}`),
    6 )),
    7).pipe(combineAll())

    combineAll

    combineAll 和 zipAll 一样,必须上游高阶 Observable 完结之后才能开始给下游产生数据,因为只有确定了作为输入的内部 Observable 对象的个数,才能拼凑出第一个传给下游的数据

没有 withLatestFromAll 是因为以上 All 后缀的操作符中所有 Observable 地位都是平等的,而 withLatestFrom 会有一个 Observable 来控制节奏,其他的只是提供数据,并不是平等的

进化的高阶 Observable 处理

  • switchAll

    总是切换到最新的内部 Observable 对象获取数据。每当 switch 的上游高阶 Observable 产生一个内部 Observable 对象,switch 都会立刻订阅最新的内部 Observable 对象上,如果已经订阅了之前的内部 Observable 对象,就会退订那个过时的内部 Observable 对象

    1interval(1000).pipe(
    2 take(3),
    3 map(x => interval(700).pipe(
    4 take(2),
    5 map(y => `${x}:${y}`),
    6 )),
    7).pipe(switchAll())

    switchAll

  • exhaust

    在耗尽当前内部 Observable 的数据之前不会切换到下一个内部 Observable 对象

    1interval(1000).pipe(
    2 take(3),
    3 map(x => interval(700).pipe(
    4 take(2),
    5 map(y => `${x}:${y}`),
    6 )),
    7).pipe(exhaust())

    exhaust

上游高阶 Observable 完结并且当前内部 Observable 也完结,switchAll、exhaust 产生的 Observable 对象才会完结

如果上游高阶 Observable 对象没有完结,意味着可能会有新的内部 Observable 产生;如果内部 Observable 没有完结,毫无疑问应该继续产生数据

产生的数据依然是 Observable 对象的 Observable,称为高阶 Observable,RxJS 提供了合并高阶 Observable 对象中数据的操作符,实际上只是把多个 Observable 对象参数改成了一个高阶 Observable 对象