合并数据流
合并类操作符
concat 首尾相连
concat 先从第一个 Observable 对象获取数据,第一个 Observable 就是调用 concat 的那个对象(类似 [1, 2].concat([3, 4])
),第一个 complete 之后 concat 会订阅第二个 Observable 以此类推,直至最后一个 Observable complete,concat 的 Observable 才完结
因此前一个 Observable 不 complete,后一个 Observable 就不会开始
1const oba1$ = of(1, 2, 3)2const oba2$ = of(4, 5, 6)3of().pipe(concat(oba1$, oba2$)).subscribe(console.log)
merge 先到先得快速通过
merge 会同时订阅所有的上游对象,一般用于异步数据流
1const source1$ = timer(0, 1000).pipe(map((n) => `A${n}`))2const source2$ = timer(500, 1000).pipe(map((n) => `B${n}`))3merge(source1$, source2$).subscribe(console.log)4// A05// B06// A17// B18// ...
merge 同时订阅是相对于异步来说,实际上仍然是依次订阅,由于订阅 of(1, 2, 3)
后还没有来得及订阅后一个,就已经吐出数据,所以输出与 concat 一样
1merge(of(1, 2, 3), of(4, 5, 6)).subscribe(console.log) // 1 2 3 4 5 6
有一个 concurrent: number 可选参数用来限流
1const source1$ = time(0, 1000)2const source2$ = time(500, 1000)3const source3$ = time(1000, 1000)4merge(source1$, source2$, source3$, 2).subscribe(console.log)
这样 source3$ 就不会进行订阅,除非 source1$ 或 source2$ complete
例子:因为在移动设备上 touchend 事件出现得比 click 更早,这两个事件的处理是一模一样的但是 fromEvent 不能同时获得两个事件的数据流,这时候就要借助 merge 的力量
1const click$ = fromEvent(element, 'click')2const touchend$ = fromEvent(element, 'touchend')3merge(click$, touchend$).subscribe(eventHandler)
zip 拉链式组合
就像拉链一样组合
1const age$ = of<number>(27, 25, 29)2const name$ = of<string>('Foo', 'Bar', 'Beer')3const isDev$ = of<boolean>(true, true, false)45zip(age$, name$, isDev$).subscribe(console.log)6// [27, 'Foo', true]7// [25, 'Bar', true]8// [29, 'Beer', false]
对于步调不一致时或数量不一致时,会把提前的自动延后,多的删减成少的
1const source1$ = interval(3000)2const source2$ = timer(3000, 1000)3zip(source1$, source2$).subscribe(console.log)
数据积压问题:如果一个吐出速度过慢,zip 会保存吐出速度过快的数据,等待另一个吐出数据,这时如果保存数据量过大,就会占用大量内存,zip 自身解决不了,会有其他方式解决
combineLatest 合并最后一个数据
Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.
1import { combineLatest, timer } from 'rxjs';23const firstTimer = timer(0, 1000); // emit 0, 1, 2... after every second, starting from now4const secondTimer = timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now5const combinedTimers = combineLatest(firstTimer, secondTimer);6combinedTimers.subscribe(value => console.log(value));7// Logs8// [0, 0] after 0.5s9// [1, 0] after 1s10// [1, 1] after 1.5s11// [2, 1] after 2s
对于同步的,类似于 merge
1const source1$ = of('a', 'b', 'c')2const source2$ = of(1, 2, 3)3const source3$ = of('x', 'y')4source1$.combineLatest(source2$, source3$)5 .subscribe(console.log, null, () => console.log('complete')6// ['c', 3, 'x']7// ['c', 3, 'y']8// 'complete'
最后一个参数可以是一个 resultSelector 函数,相当于 .pipe(map(/* codes */))
对于多重依赖的 source,就像 cpp 中的多重继承一样,会出现一些反常识的问题
1const original$ = timer(0, 1000)2const source1$ = original$.pipe(map(x => `${x}a`))3const source2$ = original$.pipe(map(x => `${x}b`))4combineLatest(source1$, source2$).subscribe(5 console.log,6 null,7 () => console.log('complete')8)9// ['0a', '0b']10// ['1a', '0b']11// ['1a', '1b']12// ['2a', '1b']13// ['2a', '2b']1415// 对于完全正统的 FRP 的定义,应该是这样16// ['0a', '0b']17// ['1a', '1b']18// ['2a', '2b']
如果想要纯正的输出,可以使用 withLatestFrom
withLatestFrom
Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.
1const source1$ = timer(0, 2000).pipe(map(x => 100 * x))2const source2$ = timer(500, 1000)3source1$.pipe(4 withLatestFrom(source2$, (a, b) => a + b),5).subscribe(6 console.log,7 null,8 () => console.log('complete'),9)10// 10111// 20312// 30513// ...
1const original$ = timer(0, 1000)2const source1$ = original$.pipe(map(x => `${x}a`))3const source2$ = original$.pipe(map(x => `${x}b`))4source1$.pipe(5 withLatestFrom(source2$),6).subscribe(7 console.log,8 null,9 () => console.log('complete'),10)11// ['0a', '0b']12// ['1a', '1b']13// ['2a', '2b']14// ['3a', '3b']
所以如果要合并完全独立的 Observable 对象,使用 combineLatest,如何要把一个Observable对象“映射”成新的数据流,同时要从其他 Observable 对象获取“最新数据”,就是用 withLatestFrom
race 胜者通吃
1race(2 from(sleep(500).then(() => 1)),3 from(sleep(1000).then(() => 2)),4).subscribe(5 console.log,6 null,7 () => console.log('on complete'),8)9// 110// 'on complete'
startWith
1of("from source")2 .pipe(startWith("first", "second"))3 .subscribe(x => console.log(x))4// 'first'5// 'second'6// 'from source'
startWith 的所有参数都是同步吐出的,如果需要异步吐出参数,那还是只能利用 concat
forkJoin
1const observable = forkJoin({2 foo: of(1, 2, 3, 4),3 bar: Promise.resolve(8),4 baz: timer(4000),5})6observable.subscribe({7 next: value => console.log(value),8 complete: () => console.log('This is how it ends!'),9})1011// Logs:12// { foo: 4, bar: 8, baz: 0 } after 4 seconds13// "This is how it ends!" immediately after
1const observable = forkJoin([2 of(1, 2, 3, 4),3 Promise.resolve(8),4 timer(4000),5])6observable.subscribe({7 next: value => console.log(value),8 complete: () => console.log('This is how it ends!'),9})1011// Logs:12// [4, 8, 0] after 4 seconds13// "This is how it ends!" immediately after
高阶 Observable
HO$::$ -> $
1interval(1000).pipe(2 take(2),3 map(x => interval(2000).pipe(4 take(2),5 map(y => `${x}:${y}`),6 )),7)
每产生一个数据,就对应产成一个 $
HO$ 的意义
用 Observable 来管理多个 Observable
数据流管理的是数据,但数据流本身也可以看作一种数据,使 Observable 对象通过 Observable 对象来管理
过程对数据进行抽象,同时过程也是数据,通过高阶过程对过程进行抽象
操作高阶 Observable 的合并类操作符
concatAll
Flattens an Observable-of-Observables by putting one inner Observable after the other.
1interval(1000).pipe(2 take(2),3 map(x => interval(2000).pipe(4 take(2),5 map(y => `${x}:${y}`),6 )),7).pipe(concatAll())当第一个 Observable 对象完结的时候,才会去订阅第二个内部 Observable 对象。虽然高阶 Observable 对象已经产生了第二个 Observable 对象,不代表 concatAll 会立刻去订阅它
mergeAll
Flattens an Observable-of-Observables.
1interval(1000).pipe(2 take(2),3 map(x => interval(2000).pipe(4 take(2),5 map(y => `${x}:${y}`),6 )),7).pipe(mergeAll())zipAll
1interval(1000).pipe(2 take(2),3 map(x => interval(2000).pipe(4 take(2),5 map(y => `${x}:${y}`),6 )),7).pipe(zipAll())1const ho$ = interval(1000).pipe(2 take(2),3 concat(Observable.never()),4 map(x => interval(1500).pipe(5 map(y => x+':'+y),6 take(2),7 )),8)9const concated$ = ho$.pipe(zipAll())现在,zipAll 的上游是一个永不完结的 Observable,当它拿到 2 个内部 Observable 的时候,无法确定是不是还有新的内部 Observable 产生,而根 据“拉链”的工作方式,来自不同数据源的数据要一对一配对,这样一来,zipAll就只能等待,等待上游高阶 Observable 完结,这样才能确定内部 Observable对象的数量。如果上游的高阶 Observable 不完结,那么 zipAll 就不会开始工作
combineAll(combineLatestAll 有点啰嗦)
1interval(1000).pipe(2 take(2),3 map(x => interval(2000).pipe(4 take(2),5 map(y => `${x}:${y}`),6 )),7).pipe(combineAll())combineAll 和 zipAll 一样,必须上游高阶 Observable 完结之后才能开始给下游产生数据,因为只有确定了作为输入的内部 Observable 对象的个数,才能拼凑出第一个传给下游的数据
没有 withLatestFromAll 是因为以上 All 后缀的操作符中所有 Observable 地位都是平等的,而 withLatestFrom 会有一个 Observable 来控制节奏,其他的只是提供数据,并不是平等的
进化的高阶 Observable 处理
switchAll
总是切换到最新的内部 Observable 对象获取数据。每当 switch 的上游高阶 Observable 产生一个内部 Observable 对象,switch 都会立刻订阅最新的内部 Observable 对象上,如果已经订阅了之前的内部 Observable 对象,就会退订那个过时的内部 Observable 对象
1interval(1000).pipe(2 take(3),3 map(x => interval(700).pipe(4 take(2),5 map(y => `${x}:${y}`),6 )),7).pipe(switchAll())exhaust
在耗尽当前内部 Observable 的数据之前不会切换到下一个内部 Observable 对象
1interval(1000).pipe(2 take(3),3 map(x => interval(700).pipe(4 take(2),5 map(y => `${x}:${y}`),6 )),7).pipe(exhaust())
上游高阶 Observable 完结并且当前内部 Observable 也完结,switchAll、exhaust 产生的 Observable 对象才会完结
如果上游高阶 Observable 对象没有完结,意味着可能会有新的内部 Observable 产生;如果内部 Observable 没有完结,毫无疑问应该继续产生数据
产生的数据依然是 Observable 对象的 Observable,称为高阶 Observable,RxJS 提供了合并高阶 Observable 对象中数据的操作符,实际上只是把多个 Observable 对象参数改成了一个高阶 Observable 对象