Skip to content

AHABHGK

多播

多播就是让一个数据流的内容被多个 Observer 订阅

  • Subject

  • BehaviorSubject

  • ReplaySubject

  • AsyncSubject

数据流的多播

  • 单播

  • 多播

  • 广播

广播会影响全局环境,筛选消息的责任在接收方,同时多个广播时容易混乱,所以 RxJS 不支持多播

Cold Observable 无法实现多播,每次订阅都会是彼此之间不同的 Observable 对象

1const oba$ = interval(1000)
2oba$.subscribe(console.log)
3sleep(3000).then(() => oba$.subscribe(console.log))

Hot 数据流

fromPromise、fromEvent、fromEventPattern 产生 Hot Observable,这些数据源都来自外部(DOM、Promise、EventPattern、WebSocket……),与 Observable 无关

Hot Observable 实现多播,但有时 Cold Observable 也需要实现多播,这时就需要 Subject,将 Cold Observable 变为 Hot Observable

Subject

为了保证 RxJS 的 Immutable 特性,Cold Observable 转换为 Hot Observable 时不能改变本身,而是产生一个新的对象,这个对象订阅 Cold Observable,同时被订阅,所以 Subject 需要有订阅和被订阅的能力(是一个 Observable 同时是一个 Observer,因此 next*(error|complete)?

1// 实现类似观察者模式,通过 new 出来的实例中的 next、error、complete 方法订阅 Observable
2export declare class Subject<T> extends Observable<T> implements SubscriptionLike {
3 // ...
4 observers: Observer<T>[] = []
5
6 next(value: T) {
7 this.observers.forEach((ob) => ob.next(T))
8 }
9 // error complete 类似
10}

实现多播

1const subject = new Subject<number>()
2
3subject.subscribe({
4 next: (v) => console.log(`observerA: ${v}`)
5})
6subject.subscribe({
7 next: (v) => console.log(`observerB: ${v}`)
8})
9
10subject.next(1)
11subject.next(2)
12
13// Logs:
14// observerA: 1
15// observerB: 1
16// observerA: 2
17// observerB: 2
1const subject = new Subject<number>()
2const oba$ = interval(1000)
3oba$.subscribe(subject)
4subject.subscribe(console.log)
5sleep(3000).then(() => subject.subscribe(console.log))

Subject 可以有多个上游

new Subject 得到的是一个有 next、error、compete 方法的对象,同时具有 subscribe 方法进行订阅,对于 observer 的那三个方法就类似发布者一样,依次通知所有的订阅者,所以对于上游的数据 subject 实例只是调用那三个方法依次通知订阅者,不论是那个上游

Subject 错误处理

Observable 内部类似这样处理 subscribe,忽略了一些其他的逻辑

1try {
2 // observer.next 那些
3} catch (err) {
4 // observer.error(err)
5}

这里 observer 就是 subject,subject 内部使用类似 this.observers.forEach((ob) => ob.next(data)) 处理数据

subject 就是一个转接头,上游的数据通过 subject 传给下游

多播操作符

multicast 最基本的操作符

1const subject = new Subject<number>()
2// const oba$ = multicast(subject)(interval(1000))
3const oba$ = interval(1000).pipe(multicast(subject)) as ConnectableObservable<number>
4oba$.subscribe(console.log)
5sleep(3000).then(() => oba$.subscribe(console.log))
6sleep(6000).then(() => oba$.connect())
7// after 6000 ms
8// 0
9// 0
10// 1
11// 1
12// 2
13// 2
14// ...

multicast 返回一个 ConnectableObservable,可以通过 connect 方法控制多播的时机

如果同步的数据流,通过 subject 实现多播,在 subject 订阅之前就订阅 subject,那就没有任何输出:

1const source = from([1, 2, 3])
2const subject = new Subject()
3const multicasted = source.subscribe(subject)
4
5subject.subscribe({
6 next: v => console.log(`observerA: ${v}`)
7})
8subject.subscribe({
9 next: v => console.log(`observerB: ${v}`)
10})

如果之后订阅 subject,则会正常输出:

1const source = from([1, 2, 3])
2const subject = new Subject()
3
4subject.subscribe({
5 next: v => console.log(`observerA: ${v}`)
6})
7subject.subscribe({
8 next: v => console.log(`observerB: ${v}`)
9})
10
11const multicasted = source.subscribe(subject)

此时就可以通过 connect 控制多播时机

但是此时如果我们退订,就需要在多个订阅者都退订后,在手动退订 subject(ConnectableObservable),这样就造成逻辑冗杂,RxJS 通过 refCount 解决这一问题,refCount 就像自动对订阅者计数一样

refCount makes the multicasted Observable automatically start executing when the first subscriber arrives, and stop executing when the last subscriber leaves.

之前:

1const source = interval(500);
2const subject = new Subject();
3const multicasted = source.pipe(multicast(subject));
4let subscription1, subscription2, subscriptionConnect;
5
6subscription1 = multicasted.subscribe({
7 next: (v) => console.log(`observerA: ${v}`)
8});
9// We should call `connect()` here, because the first
10// subscriber to `multicasted` is interested in consuming values
11subscriptionConnect = multicasted.connect();
12
13setTimeout(() => {
14 subscription2 = multicasted.subscribe({
15 next: (v) => console.log(`observerB: ${v}`)
16 });
17}, 600);
18
19setTimeout(() => {
20 subscription1.unsubscribe();
21}, 1200);
22
23// We should unsubscribe the shared Observable execution here,
24// because `multicasted` would have no more subscribers after this
25setTimeout(() => {
26 subscription2.unsubscribe();
27 subscriptionConnect.unsubscribe(); // for the shared Observable execution
28}, 2000);

之后:

1const source = interval(500);
2const subject = new Subject();
3const refCounted = source.pipe(multicast(subject), refCount());
4let subscription1, subscription2;
5
6// This calls `connect()`, because
7// it is the first subscriber to `refCounted`
8console.log('observerA subscribed');
9subscription1 = refCounted.subscribe({
10 next: (v) => console.log(`observerA: ${v}`)
11});
12
13setTimeout(() => {
14 console.log('observerB subscribed');
15 subscription2 = refCounted.subscribe({
16 next: (v) => console.log(`observerB: ${v}`)
17 });
18}, 600);
19
20setTimeout(() => {
21 console.log('observerA unsubscribed');
22 subscription1.unsubscribe();
23}, 1200);
24
25// This is when the shared Observable execution will stop, because
26// `refCounted` would have no more subscribers after this
27setTimeout(() => {
28 console.log('observerB unsubscribed');
29 subscription2.unsubscribe();
30}, 2000);
31
32// Logs
33// observerA subscribed
34// observerA: 0
35// observerB subscribed
36// observerA: 1
37// observerB: 1
38// observerA unsubscribed
39// observerB: 2
40// observerB unsubscribed

multicast 参数也可以是一个 SubjectFactory 函数,与直接使用 Subject 实例的区别在于,使用 refCount 自动退订 subject 后,之后再有订阅就没有用,因为 Subject 实例退订后生命周期结束,而使用工厂函数时会重新调用该函数,产生一个新的 Subject 实例

1const coldSource$ = interval(1000).pipe(take(3))
2const tick$ = coldSource$.pipe(multicast(new Subject()), refCount())
3
4tick$.subscribe(value => console.log('observer 1: ' + value))
5
6setTimeout(() => {
7 tick$.subscribe(value => console.log('observer 2: ' + value))
8}, 1500)
9
10setTimeout(() => {
11 tick$.subscribe(value => console.log('observer 3: ' + value))
12}, 5000)
13// observer 1: 0
14// observer 1: 1
15// observer 2: 1
16// observer 1: 2
17// observer 2: 2
1const coldSource$ = interval(1000).pipe(take(3))
2const tick$ = coldSource$.pipe(multicast(() => new Subject()), refCount())
3
4tick$.subscribe(value => console.log('observer 1: ' + value))
5
6setTimeout(() => {
7 tick$.subscribe(value => console.log('observer 2: ' + value))
8}, 1500)
9
10setTimeout(() => {
11 tick$.subscribe(value => console.log('observer 3: ' + value))
12}, 5000)
13// observer 1: 0
14// observer 1: 1
15// observer 2: 1
16// observer 1: 2
17// observer 2: 2
18// observer 3: 0
19// observer 3: 1
20// observer 3: 2

selector 参数:只要指定了 selector 参数,就指定了 multicast 返回的 Observable 对象的生成方法

publish

1function publish(selector) {
2 if (selector) {
3 return this.multicast(() => new Subject(), selector)
4 } else {
5 return this.multicast(new Subject())
6 }
7}

share

1function share() {
2 return multicast(() => new Subject()).refCount()
3}

高级多播,增强的 Subject

  • publishLast AsyncSubject

  • publishReplay ReplaySubject

  • publishBehavior BehaviorSubject

publishLast AsyncSubject

1export function publishLast<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
2 return (source: Observable<T>) => multicast(new AsyncSubject<T>())(source);
3}
4
5export class AsyncSubject<T> extends Subject<T> {
6 private value: T = null;
7 private hasNext: boolean = false;
8 private hasCompleted: boolean = false;
9
10 next(value: T): void {
11 if (!this.hasCompleted) {
12 this.value = value;
13 this.hasNext = true;
14 }
15 }
16
17 error(error: any): void {
18 if (!this.hasCompleted) {
19 super.error(error);
20 }
21 }
22
23 complete(): void {
24 this.hasCompleted = true;
25 if (this.hasNext) {
26 super.next(this.value);
27 }
28 super.complete();
29 }
30}

publishReplay ReplaySubject

replay 有回放的能力,只订阅一次数据源,pipe 只运行一次,把数据记录下来在吐出一遍,而 re-subscribe 是多次订阅,pipe 运行多次,把所有数据都重新产生一遍

1function publishReplay(
2 bufferSize = Number.POSITIVE_INFINITY,
3 windowTime = Number.POSITIVE_INFINITY,
4) {
5 return multicast(new ReplaySubject(bufferSize, windowTime));
6}
1const tick$ = interval(1000).pipe(
2 take(3),
3 tap(x => console.log('source ', x)),
4)
5const sharedTick$ = (tick$.pipe(publishReplay()) as ConnectableObservable<number>).refCount()
6sharedTick$.subscribe(value => console.log('observer 1: ' + value))
7setTimeout(() => {
8 sharedTick$.subscribe(value => console.log('observer 2: ' + value))
9}, 5000)
10// source 0
11// observer 1: 0
12// source 1
13// observer 1: 1
14// source 2
15// observer 1: 2
16// 同时打印出来下面的
17// observer 2: 0
18// observer 2: 1
19// observer 2: 2

tap 原来叫 do,tap 有窃听的意思,不能改变数据,但常用来触发副作用

publishBehavior BehaviorSubject

1export class BehaviorSubject<T> extends Subject<T> {
2 constructor(private _value: T) {
3 super();
4 }
5 // ...
6 next(value: T): void {
7 super.next(this._value = value);
8 }
9}

提供一个“默认数据”,当添加 Observer 的时候,即使上游还没有吐出数据 Observer 也会立即获得这个“默认数据”;而且,这个 “默认数据” 总是会被上游吐出的最新数据替代