Skip to content

AHABHGK

创建数据流

RxJS 提供的创建类操作符覆盖了几乎所有的数据流创建模式,没有必要重复发明轮子

创建同步操作流

  • 产生哪些数据

  • 数据之间先后顺序

create

1export class Observable<T> implements Subscribable<T> {
2 // ...
3 static create: Function = <T>(subscribe?: (subscriber: Subscriber<T>) => TeardownLogic) => {
4 return new Observable<T>(subscribe);
5 }
6 // ...
7}

of 列举数据

of 产生 Cold Observable

1const oba$ = of(1, 2, 3)

range 指定范围

1const numbers = range(1, 10)

range

range 只能每次递增 1,range(1.5, 3) 吐出 1.5、2.5、3.5

generate 循环创建

1generate(0, x => x < 10, x => x + 1)

repeat 重复数据

1const oba$ = new Observable((ob: Observer<number>) => {
2 console.log('on subscribe')
3 setTimeout(() => ob.next(1), 1000)
4 setTimeout(() => ob.next(2), 2000)
5 setTimeout(() => ob.next(3), 3000)
6 setTimeout(() => ob.complete(), 4000)
7
8 return () => console.log('on unsubscribe')
9})
10
11oba$
12 .pipe(repeat(2))
13 .subscribe(console.log, null, () => console.log('on complete'))
14
15// on subscribe
16// 1
17// 2
18// 3
19// on unsubscribe
20// on subscribe
21// 1
22// 2
23// 3
24// on complete
25// on unsubscribe

repeat

如果去掉 setTimeout(() => ob.complete(), 4000)

1const oba$ = new Observable((ob: Observer<number>) => {
2 console.log('on subscribe')
3 setTimeout(() => ob.next(1), 1000)
4 setTimeout(() => ob.next(2), 2000)
5 setTimeout(() => ob.next(3), 3000)
6
7 return () => console.log('on unsubscribe')
8})
9
10oba$
11 .pipe(repeat(2))
12 .subscribe(console.log, null, () => console.log('on complete'))
13
14// on subscribe
15// 1
16// 2
17// 3

repeat2

所以 repeat 依赖于上游完结的时机

empty、never、throwError

1empty()

empty

1never()

never

1throwError()

throwError

创建异步操作流

相比创建同步操作流,还需要关心数据之间的时间间隔问题

interval、timer 定时产生数据

都是从 0 开始吐出数据,timer 第一个参数表示开始时间,默认 0,第二个参数同 interval

1timer(1000, 1000) == interval(1000)

from Observable-like to Observable

Observable-like: an Array, an array-like object, a string, a Promise, an iterable object, an Observable-like object

1from('abc') == from(['a', 'b', 'c'])

fromPromise from(promise)

1from(
2 new Promise<number>((resolve, reject) => {
3 setTimeout(() => {
4 console.log('begin resolve')
5 resolve(1)
6 console.log('end resolve')
7 }, 1000)
8 })
9 .then((res) => {
10 console.log(res)
11 return 2
12 })
13).subscribe(console.log)

fromEvent

1// fromEvent.d.ts
2import { Observable } from '../Observable';
3export interface NodeStyleEventEmitter {
4 addListener: (eventName: string | symbol, handler: NodeEventHandler) => this;
5 removeListener: (eventName: string | symbol, handler: NodeEventHandler) => this;
6}
7export declare type NodeEventHandler = (...args: any[]) => void;
8export interface NodeCompatibleEventEmitter {
9 addListener: (eventName: string, handler: NodeEventHandler) => void | {};
10 removeListener: (eventName: string, handler: NodeEventHandler) => void | {};
11}
12export interface JQueryStyleEventEmitter {
13 on: (eventName: string, handler: Function) => void;
14 off: (eventName: string, handler: Function) => void;
15}
16export interface HasEventTargetAddRemove<E> {
17 addEventListener(type: string, listener: ((evt: E) => void) | null, options?: boolean | AddEventListenerOptions): void;
18 removeEventListener(type: string, listener?: ((evt: E) => void) | null, options?: EventListenerOptions | boolean): void;
19}
20export declare type EventTargetLike<T> = HasEventTargetAddRemove<T> | NodeStyleEventEmitter | NodeCompatibleEventEmitter | JQueryStyleEventEmitter;
21export declare type FromEventTarget<T> = EventTargetLike<T> | ArrayLike<EventTargetLike<T>>;
22export interface EventListenerOptions {
23 capture?: boolean;
24 passive?: boolean;
25 once?: boolean;
26}
27export interface AddEventListenerOptions extends EventListenerOptions {
28 once?: boolean;
29 passive?: boolean;
30}
31export declare function fromEvent<T>(target: FromEventTarget<T>, eventName: string): Observable<T>;
32/** @deprecated resultSelector no longer supported, pipe to map instead */
33export declare function fromEvent<T>(target: FromEventTarget<T>, eventName: string, resultSelector: (...args: any[]) => T): Observable<T>;
34export declare function fromEvent<T>(target: FromEventTarget<T>, eventName: string, options: EventListenerOptions): Observable<T>;
35/** @deprecated resultSelector no longer supported, pipe to map instead */
36export declare function fromEvent<T>(target: FromEventTarget<T>, eventName: string, options: EventListenerOptions, resultSelector: (...args: any[]) => T): Observable<T>;
1const eventTarget = new EventTarget()
2fromEvent(eventTarget as FromEventTarget<'open'>, 'open')
3 .subscribe(console.log)
4 // Event {isTrusted: false, constructor: Object}
5sleep(1000)
6 .then(() => eventTarget.dispatchEvent(new Event('open')))

fromEvent 产生 Hot Observable,数据的产生和订阅是无关的,数据的产生完全不受 RxJS 控制

fromEventPattern

fromEvent 显然不能覆盖所有的 Event,有的 Event 接口是不一致的,fromEventPattern 就是用来适配其他的 Event 接口

1function addClickHandler(handler) {
2 document.addEventListener('click', handler)
3}
4
5function removeClickHandler(handler) {
6 document.removeEventListener('click', handler)
7}
8
9const clicks$ = fromEventPattern(
10 addClickHandler,
11 removeClickHandler,
12)
13clicks$.subscribe(x => console.log(x))

ajax

ajax:

1const users = ajax({
2 url: 'https://httpbin.org/delay/2',
3 method: 'POST',
4 headers: {
5 'Content-Type': 'application/json',
6 'rxjs-custom-header': 'Rxjs',
7 },
8 body: {
9 rxjs: 'Hello World!',
10 },
11}).pipe(
12 map(response => console.log('response: ', response)),
13 catchError(error => {
14 console.log('error: ', error)
15 return of(error)
16 })
17)

fetch:

1const data$ = fromFetch('https://api.github.com/users?per_page=5').pipe(
2 switchMap(response => {
3 if (response.ok) {
4 // OK return data
5 return response.json();
6 } else {
7 // Server is returning a status requiring the client to try something else.
8 return of({ error: true, message: `Error ${response.status}` })
9 }
10 }),
11 catchError(err => {
12 // Network or other error, handle appropriately
13 console.error(err)
14 return of({ error: true, message: err.message })
15 })
16)
17
18data$.subscribe({
19 next: result => console.log(result),
20 complete: () => console.log('done'),
21})

repeatWhen

1// export declare function repeatWhen<T>(notifier: (notifications: Observable<any>) => Observable<any>): MonoTypeOperatorFunction<T>
2const source = of(1, 2, 3)
3// const documentClick$ = fromEvent(document, 'click')
4
5source.pipe(
6 // repeatWhen(() => documentClick$)
7 repeatWhen(() => interval(1000)),
8).subscribe(console.log)

根据完结的时机重复,但如果上游不是同步,那完结时机也不确定,那使用 interval 重新订阅的时机也难以确定,这时可以通过 notifier 的参数 notifications,它也是一个 Observable

1const source = of(1, 2, 3)
2
3source.pipe(
4 repeatWhen((ob) => ob.pipe(delay(1000))),
5).subscribe(console.log)

defer

Creates the Observable lazily, that is, only when it is subscribed.

1const oba = defer(() => ajax(opts))