入门
可以把一个“数据流”对象理解为一条河流,数据就是这条河流中流淌的水,代表“流”的变量标示符,都是用 $
符号结尾
比如一个按钮,我们需要记录用户按下去的时间,显示到页面上,同时发送给服务器,一般我们可以这样写:
1const btn = $('#btn')2let startTime: number = 03btn!.addEventListener('mousedown', () => {4 startTime = +new Date()5})6btn!.addEventListener('mouseup', () => {7 if (startTime) {8 const holdTime = +new Date() - startTime9 // ajax10 // render11 startTime = 012 }13})
两个函数交叉访问一个变量 startTime,易出错
如果我们用 RxJS 的方式,可以把 mouseDown 和 mouseUp 看成通过 fromEvent 创造的两个流,holdTime mouseDown,一个用来 render,一个用来 ajax
RxJS 引用了两个重要的编程思想:
函数式
响应式
函数式编程
声明式
纯函数
函数的执行过程完全由输入参数决定,不会受除参数之外的任何数据影响(引用透明)
函数不会修改任何外部状态,比如修改全局变量或传入的参数对象(无副作用)
数据不可变性
1const arrayPush = (arr, newValue) => arr.push(newValue) // 不纯,arr.push 改变原数组2const arrayPush = (arr, newValue) => [...arr, newValue]
纯函数易于单元测试,不纯的需要 mock,修改代码同时也要修改测试中的 mock
FRP
数据流抽象了很多现实问题(DOM、WS、AJAX……)
擅长处理异步操作(对数据“推送”,产生数据时推送数据,无需关心数据是同步还是异步产生的)
把复杂问题分解成简单问题的组合
Observable / Observer
Observable:可被观察者
Observer:观察者
Observable 实现了观察者模式和迭代器模式,是这两个模式的组合
观察者模式
观察者模式要解决的问题,就是在一个持续产生事件的系统中,如何分割功能,让不同模块只需要处理一部分逻辑,这种分而治之的思想是基本的系统设计概念
观察者模式将逻辑分为 Publisher 和 Observer,Publisher 负责发布事件,Observer 负责处理事件
1import { of } from 'rxjs'23const source$ = of(1, 2, 3)4source$.subscribe(console.log) // console.log 作为观察者,只管处理
RxJS 中,Observable 对象作为 Publisher,subscribe 的参数作为 Observer
迭代器模式
Iterator 指能够遍历一个数据集合的对象(树、数组、链表……)Iterator 的作用就是通过一个通用的接口,是开发者不必关心数据集合的具体实现方式
1function* gen() {2 yield 13 yield 24 return 35}67const iter = gen()89console.log(iter.next()) // { value: 1, done: false }10console.log(iter.next()) // { value: 2, done: false }11console.log(iter.next()) // { value: 3, done: true }12console.log(iter.next()) // { value: undefined, done: true }1314const iter2 = gen()15for (const v of iter2) console.log(v) // "downlevelIteration": true
“拉”式的迭代器实现,RxJS 中是“推”式的迭代器实现,其迭代器的使用者,并不用从 Observable 上“拉”数据,而是只要 subscribe 上 Observable 对象之后,自然就能接收到消息的推送
拉和推是从数据消费者的角度描述,比如:AJAX 是网页(数据消费者)从服务器拉取数据,而通过 WebSocket 服务器可以推送给网页数据
创造 Observable
Observable = Publisher + Iterator
1const onSubscribe = (subscriber: Subscriber<number>) => {2 subscriber.next(1)3 subscriber.next(2)4 subscriber.next(3)5}67const source$ = new Observable(onSubscribe)89const theObserver = {10 next: console.log,11}1213source$.subscribe(theObserver) // subscribe 事件发生,调用 onSubscribe
类型上 Observer 是 interface,Subscriber 是对 Observer 的实现 class
跨越时间的 Observable
Observable 用于发布事件,Observable 来做更合适
1const source$ = new Observable(subscriber => {2 let number = 1;3 const timer = setInterval(() => {4 if (number >= 3) clearInterval(timer)5 else subscriber.next(number++)6 }, 1000)7})
永无止境的 Observale
如果没有 clearInterval,数据流就不会终止,因为每次只吐出一个数据,然后被 subscriber 消化掉,所以内存不会增加。如果把所有的数据放到一个数组中,数组所占内存大小就会随数据的增加而增加
大部分数据流会终止,上面例子中吐出 1 2 3 后就应该终止,但 Observable 如果只是停止吐出数据,只不过不再点用 next 推送,并不能给予 subscriber 一个终止信号,subscriber 依然准备着接收数据
Observable 的完结和错误处理
1const source$ = new Observable(subscriber => {2 let number = 13 const timer = setInterval(() => {4 if (number >= 3) {5 clearInterval(timer)6 subscriber.complete() // 发出完结事件7 }8 else subscriber.next(number++)9 }, 1000)10})1112source$.subscribe(13 console.log,14 console.error, // 处理错误事件15 () => console.log('done'), // 处理完结事件16)
next*(error|complete)?
退订 Observable
1const sleep = (time: number) => new Promise<never>(resolve => setTimeout(resolve, time))23const observable = new Observable(function subscribe(subscriber) {4 const intervalId = setInterval(() => {5 console.log('interval...')6 subscriber.next('hi')7 }, 1000)8})910const subscription = observable.subscribe(x => console.log(x))1112await sleep(3500)13subscription.unsubscribe() // 退订之后仍然打印 interval...
1const observable = new Observable(function subscribe(subscriber) {2 const intervalId = setInterval(() => {3 console.log('interval...')4 subscriber.next('hi')5 }, 1000)67 return () => {8 clearInterval(intervalId)9 }10})1112const subscription = observable.subscribe(x => console.log(x))1314await sleep(3500)15subscription.unsubscribe() // 退订之后不再打印 interval...,return cleanUp 用来清除副作用
Hot Observable 和 Cold Observable
对于多个 Subscriber,比如第一个订阅 n 秒之后第二个在订阅,第二个是否接收错过 n 秒的数据
对于不接收错过的 n 秒数据,叫做 Hot Observable,对于从头开始接收的,叫做 Cold Observable
operators
Pipeable operators:传入 Observable 返回新的 Observable
Creation Operators:可以传入一些预定义行为调用函数来创建新的 Observable,或通过加入其他 Observable
1import { of } from 'rxjs'2import { map } from 'rxjs/operators'34map((x: number) => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`))
弹珠图