创建数据流
RxJS 提供的创建类操作符覆盖了几乎所有的数据流创建模式,没有必要重复发明轮子
创建同步操作流
产生哪些数据
数据之间先后顺序
create
1export class Observable<T> implements Subscribable<T> {2 // ...3 static create: Function = <T>(subscribe?: (subscriber: Subscriber<T>) => TeardownLogic) => {4 return new Observable<T>(subscribe);5 }6 // ...7}
of 列举数据
of 产生 Cold Observable
1const oba$ = of(1, 2, 3)
range 指定范围
1const numbers = range(1, 10)
range 只能每次递增 1,range(1.5, 3) 吐出 1.5、2.5、3.5
generate 循环创建
1generate(0, x => x < 10, x => x + 1)
repeat 重复数据
1const oba$ = new Observable((ob: Observer<number>) => {2 console.log('on subscribe')3 setTimeout(() => ob.next(1), 1000)4 setTimeout(() => ob.next(2), 2000)5 setTimeout(() => ob.next(3), 3000)6 setTimeout(() => ob.complete(), 4000)78 return () => console.log('on unsubscribe')9})1011oba$12 .pipe(repeat(2))13 .subscribe(console.log, null, () => console.log('on complete'))1415// on subscribe16// 117// 218// 319// on unsubscribe20// on subscribe21// 122// 223// 324// on complete25// on unsubscribe
如果去掉 setTimeout(() => ob.complete(), 4000)
1const oba$ = new Observable((ob: Observer<number>) => {2 console.log('on subscribe')3 setTimeout(() => ob.next(1), 1000)4 setTimeout(() => ob.next(2), 2000)5 setTimeout(() => ob.next(3), 3000)67 return () => console.log('on unsubscribe')8})910oba$11 .pipe(repeat(2))12 .subscribe(console.log, null, () => console.log('on complete'))1314// on subscribe15// 116// 217// 3
所以 repeat 依赖于上游完结的时机
empty、never、throwError
1empty()
1never()
1throwError()
创建异步操作流
相比创建同步操作流,还需要关心数据之间的时间间隔问题
interval、timer 定时产生数据
都是从 0 开始吐出数据,timer 第一个参数表示开始时间,默认 0,第二个参数同 interval
1timer(1000, 1000) == interval(1000)
from Observable-like to Observable
Observable-like: an Array, an array-like object, a string, a Promise, an iterable object, an Observable-like object
1from('abc') == from(['a', 'b', 'c'])
fromPromise from(promise)
1from(2 new Promise<number>((resolve, reject) => {3 setTimeout(() => {4 console.log('begin resolve')5 resolve(1)6 console.log('end resolve')7 }, 1000)8 })9 .then((res) => {10 console.log(res)11 return 212 })13).subscribe(console.log)
fromEvent
1// fromEvent.d.ts2import { Observable } from '../Observable';3export interface NodeStyleEventEmitter {4 addListener: (eventName: string | symbol, handler: NodeEventHandler) => this;5 removeListener: (eventName: string | symbol, handler: NodeEventHandler) => this;6}7export declare type NodeEventHandler = (...args: any[]) => void;8export interface NodeCompatibleEventEmitter {9 addListener: (eventName: string, handler: NodeEventHandler) => void | {};10 removeListener: (eventName: string, handler: NodeEventHandler) => void | {};11}12export interface JQueryStyleEventEmitter {13 on: (eventName: string, handler: Function) => void;14 off: (eventName: string, handler: Function) => void;15}16export interface HasEventTargetAddRemove<E> {17 addEventListener(type: string, listener: ((evt: E) => void) | null, options?: boolean | AddEventListenerOptions): void;18 removeEventListener(type: string, listener?: ((evt: E) => void) | null, options?: EventListenerOptions | boolean): void;19}20export declare type EventTargetLike<T> = HasEventTargetAddRemove<T> | NodeStyleEventEmitter | NodeCompatibleEventEmitter | JQueryStyleEventEmitter;21export declare type FromEventTarget<T> = EventTargetLike<T> | ArrayLike<EventTargetLike<T>>;22export interface EventListenerOptions {23 capture?: boolean;24 passive?: boolean;25 once?: boolean;26}27export interface AddEventListenerOptions extends EventListenerOptions {28 once?: boolean;29 passive?: boolean;30}31export declare function fromEvent<T>(target: FromEventTarget<T>, eventName: string): Observable<T>;32/** @deprecated resultSelector no longer supported, pipe to map instead */33export declare function fromEvent<T>(target: FromEventTarget<T>, eventName: string, resultSelector: (...args: any[]) => T): Observable<T>;34export declare function fromEvent<T>(target: FromEventTarget<T>, eventName: string, options: EventListenerOptions): Observable<T>;35/** @deprecated resultSelector no longer supported, pipe to map instead */36export declare function fromEvent<T>(target: FromEventTarget<T>, eventName: string, options: EventListenerOptions, resultSelector: (...args: any[]) => T): Observable<T>;
1const eventTarget = new EventTarget()2fromEvent(eventTarget as FromEventTarget<'open'>, 'open')3 .subscribe(console.log)4 // Event {isTrusted: false, constructor: Object}5sleep(1000)6 .then(() => eventTarget.dispatchEvent(new Event('open')))
fromEvent 产生 Hot Observable,数据的产生和订阅是无关的,数据的产生完全不受 RxJS 控制
fromEventPattern
fromEvent 显然不能覆盖所有的 Event,有的 Event 接口是不一致的,fromEventPattern 就是用来适配其他的 Event 接口
1function addClickHandler(handler) {2 document.addEventListener('click', handler)3}45function removeClickHandler(handler) {6 document.removeEventListener('click', handler)7}89const clicks$ = fromEventPattern(10 addClickHandler,11 removeClickHandler,12)13clicks$.subscribe(x => console.log(x))
ajax
ajax:
1const users = ajax({2 url: 'https://httpbin.org/delay/2',3 method: 'POST',4 headers: {5 'Content-Type': 'application/json',6 'rxjs-custom-header': 'Rxjs',7 },8 body: {9 rxjs: 'Hello World!',10 },11}).pipe(12 map(response => console.log('response: ', response)),13 catchError(error => {14 console.log('error: ', error)15 return of(error)16 })17)
fetch:
1const data$ = fromFetch('https://api.github.com/users?per_page=5').pipe(2 switchMap(response => {3 if (response.ok) {4 // OK return data5 return response.json();6 } else {7 // Server is returning a status requiring the client to try something else.8 return of({ error: true, message: `Error ${response.status}` })9 }10 }),11 catchError(err => {12 // Network or other error, handle appropriately13 console.error(err)14 return of({ error: true, message: err.message })15 })16)1718data$.subscribe({19 next: result => console.log(result),20 complete: () => console.log('done'),21})
repeatWhen
1// export declare function repeatWhen<T>(notifier: (notifications: Observable<any>) => Observable<any>): MonoTypeOperatorFunction<T>2const source = of(1, 2, 3)3// const documentClick$ = fromEvent(document, 'click')45source.pipe(6 // repeatWhen(() => documentClick$)7 repeatWhen(() => interval(1000)),8).subscribe(console.log)
根据完结的时机重复,但如果上游不是同步,那完结时机也不确定,那使用 interval 重新订阅的时机也难以确定,这时可以通过 notifier 的参数 notifications,它也是一个 Observable
1const source = of(1, 2, 3)23source.pipe(4 repeatWhen((ob) => ob.pipe(delay(1000))),5).subscribe(console.log)
defer
Creates the Observable lazily, that is, only when it is subscribed.
1const oba = defer(() => ajax(opts))