多播
多播就是让一个数据流的内容被多个 Observer 订阅
Subject
BehaviorSubject
ReplaySubject
AsyncSubject
数据流的多播
单播
多播
广播
广播会影响全局环境,筛选消息的责任在接收方,同时多个广播时容易混乱,所以 RxJS 不支持多播
Cold Observable 无法实现多播,每次订阅都会是彼此之间不同的 Observable 对象
1const oba$ = interval(1000)2oba$.subscribe(console.log)3sleep(3000).then(() => oba$.subscribe(console.log))
Hot 数据流
fromPromise、fromEvent、fromEventPattern 产生 Hot Observable,这些数据源都来自外部(DOM、Promise、EventPattern、WebSocket……),与 Observable 无关
Hot Observable 实现多播,但有时 Cold Observable 也需要实现多播,这时就需要 Subject,将 Cold Observable 变为 Hot Observable
Subject
为了保证 RxJS 的 Immutable 特性,Cold Observable 转换为 Hot Observable 时不能改变本身,而是产生一个新的对象,这个对象订阅 Cold Observable,同时被订阅,所以 Subject 需要有订阅和被订阅的能力(是一个 Observable 同时是一个 Observer,因此 next*(error|complete)?
)
1// 实现类似观察者模式,通过 new 出来的实例中的 next、error、complete 方法订阅 Observable2export declare class Subject<T> extends Observable<T> implements SubscriptionLike {3 // ...4 observers: Observer<T>[] = []56 next(value: T) {7 this.observers.forEach((ob) => ob.next(T))8 }9 // error complete 类似10}
实现多播
1const subject = new Subject<number>()23subject.subscribe({4 next: (v) => console.log(`observerA: ${v}`)5})6subject.subscribe({7 next: (v) => console.log(`observerB: ${v}`)8})910subject.next(1)11subject.next(2)1213// Logs:14// observerA: 115// observerB: 116// observerA: 217// observerB: 2
1const subject = new Subject<number>()2const oba$ = interval(1000)3oba$.subscribe(subject)4subject.subscribe(console.log)5sleep(3000).then(() => subject.subscribe(console.log))
Subject 可以有多个上游
new Subject 得到的是一个有 next、error、compete 方法的对象,同时具有 subscribe 方法进行订阅,对于 observer 的那三个方法就类似发布者一样,依次通知所有的订阅者,所以对于上游的数据 subject 实例只是调用那三个方法依次通知订阅者,不论是那个上游
Subject 错误处理
Observable 内部类似这样处理 subscribe,忽略了一些其他的逻辑
1try {2 // observer.next 那些3} catch (err) {4 // observer.error(err)5}
这里 observer 就是 subject,subject 内部使用类似 this.observers.forEach((ob) => ob.next(data))
处理数据
subject 就是一个转接头,上游的数据通过 subject 传给下游
多播操作符
multicast 最基本的操作符
1const subject = new Subject<number>()2// const oba$ = multicast(subject)(interval(1000))3const oba$ = interval(1000).pipe(multicast(subject)) as ConnectableObservable<number>4oba$.subscribe(console.log)5sleep(3000).then(() => oba$.subscribe(console.log))6sleep(6000).then(() => oba$.connect())7// after 6000 ms8// 09// 010// 111// 112// 213// 214// ...
multicast 返回一个 ConnectableObservable,可以通过 connect 方法控制多播的时机
如果同步的数据流,通过 subject 实现多播,在 subject 订阅之前就订阅 subject,那就没有任何输出:
1const source = from([1, 2, 3])2const subject = new Subject()3const multicasted = source.subscribe(subject)45subject.subscribe({6 next: v => console.log(`observerA: ${v}`)7})8subject.subscribe({9 next: v => console.log(`observerB: ${v}`)10})
如果之后订阅 subject,则会正常输出:
1const source = from([1, 2, 3])2const subject = new Subject()34subject.subscribe({5 next: v => console.log(`observerA: ${v}`)6})7subject.subscribe({8 next: v => console.log(`observerB: ${v}`)9})1011const multicasted = source.subscribe(subject)
此时就可以通过 connect 控制多播时机
但是此时如果我们退订,就需要在多个订阅者都退订后,在手动退订 subject(ConnectableObservable),这样就造成逻辑冗杂,RxJS 通过 refCount 解决这一问题,refCount 就像自动对订阅者计数一样
refCount makes the multicasted Observable automatically start executing when the first subscriber arrives, and stop executing when the last subscriber leaves.
之前:
1const source = interval(500);2const subject = new Subject();3const multicasted = source.pipe(multicast(subject));4let subscription1, subscription2, subscriptionConnect;56subscription1 = multicasted.subscribe({7 next: (v) => console.log(`observerA: ${v}`)8});9// We should call `connect()` here, because the first10// subscriber to `multicasted` is interested in consuming values11subscriptionConnect = multicasted.connect();1213setTimeout(() => {14 subscription2 = multicasted.subscribe({15 next: (v) => console.log(`observerB: ${v}`)16 });17}, 600);1819setTimeout(() => {20 subscription1.unsubscribe();21}, 1200);2223// We should unsubscribe the shared Observable execution here,24// because `multicasted` would have no more subscribers after this25setTimeout(() => {26 subscription2.unsubscribe();27 subscriptionConnect.unsubscribe(); // for the shared Observable execution28}, 2000);
之后:
1const source = interval(500);2const subject = new Subject();3const refCounted = source.pipe(multicast(subject), refCount());4let subscription1, subscription2;56// This calls `connect()`, because7// it is the first subscriber to `refCounted`8console.log('observerA subscribed');9subscription1 = refCounted.subscribe({10 next: (v) => console.log(`observerA: ${v}`)11});1213setTimeout(() => {14 console.log('observerB subscribed');15 subscription2 = refCounted.subscribe({16 next: (v) => console.log(`observerB: ${v}`)17 });18}, 600);1920setTimeout(() => {21 console.log('observerA unsubscribed');22 subscription1.unsubscribe();23}, 1200);2425// This is when the shared Observable execution will stop, because26// `refCounted` would have no more subscribers after this27setTimeout(() => {28 console.log('observerB unsubscribed');29 subscription2.unsubscribe();30}, 2000);3132// Logs33// observerA subscribed34// observerA: 035// observerB subscribed36// observerA: 137// observerB: 138// observerA unsubscribed39// observerB: 240// observerB unsubscribed
multicast 参数也可以是一个 SubjectFactory 函数,与直接使用 Subject 实例的区别在于,使用 refCount 自动退订 subject 后,之后再有订阅就没有用,因为 Subject 实例退订后生命周期结束,而使用工厂函数时会重新调用该函数,产生一个新的 Subject 实例
1const coldSource$ = interval(1000).pipe(take(3))2const tick$ = coldSource$.pipe(multicast(new Subject()), refCount())34tick$.subscribe(value => console.log('observer 1: ' + value))56setTimeout(() => {7 tick$.subscribe(value => console.log('observer 2: ' + value))8}, 1500)910setTimeout(() => {11 tick$.subscribe(value => console.log('observer 3: ' + value))12}, 5000)13// observer 1: 014// observer 1: 115// observer 2: 116// observer 1: 217// observer 2: 2
1const coldSource$ = interval(1000).pipe(take(3))2const tick$ = coldSource$.pipe(multicast(() => new Subject()), refCount())34tick$.subscribe(value => console.log('observer 1: ' + value))56setTimeout(() => {7 tick$.subscribe(value => console.log('observer 2: ' + value))8}, 1500)910setTimeout(() => {11 tick$.subscribe(value => console.log('observer 3: ' + value))12}, 5000)13// observer 1: 014// observer 1: 115// observer 2: 116// observer 1: 217// observer 2: 218// observer 3: 019// observer 3: 120// observer 3: 2
selector 参数:只要指定了 selector 参数,就指定了 multicast 返回的 Observable 对象的生成方法
publish
1function publish(selector) {2 if (selector) {3 return this.multicast(() => new Subject(), selector)4 } else {5 return this.multicast(new Subject())6 }7}
share
1function share() {2 return multicast(() => new Subject()).refCount()3}
高级多播,增强的 Subject
publishLast AsyncSubject
publishReplay ReplaySubject
publishBehavior BehaviorSubject
publishLast AsyncSubject
1export function publishLast<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>> {2 return (source: Observable<T>) => multicast(new AsyncSubject<T>())(source);3}45export class AsyncSubject<T> extends Subject<T> {6 private value: T = null;7 private hasNext: boolean = false;8 private hasCompleted: boolean = false;910 next(value: T): void {11 if (!this.hasCompleted) {12 this.value = value;13 this.hasNext = true;14 }15 }1617 error(error: any): void {18 if (!this.hasCompleted) {19 super.error(error);20 }21 }2223 complete(): void {24 this.hasCompleted = true;25 if (this.hasNext) {26 super.next(this.value);27 }28 super.complete();29 }30}
publishReplay ReplaySubject
replay 有回放的能力,只订阅一次数据源,pipe 只运行一次,把数据记录下来在吐出一遍,而 re-subscribe 是多次订阅,pipe 运行多次,把所有数据都重新产生一遍
1function publishReplay(2 bufferSize = Number.POSITIVE_INFINITY,3 windowTime = Number.POSITIVE_INFINITY,4) {5 return multicast(new ReplaySubject(bufferSize, windowTime));6}
1const tick$ = interval(1000).pipe(2 take(3),3 tap(x => console.log('source ', x)),4)5const sharedTick$ = (tick$.pipe(publishReplay()) as ConnectableObservable<number>).refCount()6sharedTick$.subscribe(value => console.log('observer 1: ' + value))7setTimeout(() => {8 sharedTick$.subscribe(value => console.log('observer 2: ' + value))9}, 5000)10// source 011// observer 1: 012// source 113// observer 1: 114// source 215// observer 1: 216// 同时打印出来下面的17// observer 2: 018// observer 2: 119// observer 2: 2
tap 原来叫 do,tap 有窃听的意思,不能改变数据,但常用来触发副作用
publishBehavior BehaviorSubject
1export class BehaviorSubject<T> extends Subject<T> {2 constructor(private _value: T) {3 super();4 }5 // ...6 next(value: T): void {7 super.next(this._value = value);8 }9}
提供一个“默认数据”,当添加 Observer 的时候,即使上游还没有吐出数据 Observer 也会立即获得这个“默认数据”;而且,这个 “默认数据” 总是会被上游吐出的最新数据替代