Skip to content

AHABHGK

异步编程

Promise 与事件发布订阅

发布订阅模式是低级接口,Promise 是高级接口

Ryan Dahl: Node JS - JSConf EU 2009 Ryan Dahl: 我对 Node.js 遗憾的十件事 - JSConf EU 2018

1const { EventEmitter } = require('events')
2
3const isEPromise = promise => promise instanceof EPromise
4
5const PENDING = Symbol('PENDING')
6const FULFILLED = Symbol('FULFILLED')
7const REJECTED = Symbol('REJECTED')
8
9class EPromise extends EventEmitter {
10 constructor(f) {
11 super()
12
13 this.state = PENDING
14 this.value = undefined
15 this.reason = undefined
16
17 const resolve = (value) => {
18 if (this.state === PENDING) {
19 this.state = FULFILLED
20 this.value = value
21 setTimeout(() => this.emit('resolve'), 0)
22 }
23 }
24 const reject = (reason) => {
25 if (this.state === PENDING) {
26 this.state = REJECTED
27 this.reason = reason
28 setTimeout(() => this.emit('reject'), 0)
29 }
30 }
31
32 try {
33 f(resolve, reject)
34 } catch (e) {
35 reject(e)
36 }
37 }
38
39 then(onFulfilled, onRejected) {
40 const p = new EPromise((resolve, reject) => {
41 if (this.state === PENDING) {
42 this.once('resolve', () => resolve(onFulfilled(this.value)))
43 this.once('reject', () => reject(onRejected(this.reason)))
44 } else if (this.state === FULFILLED) {
45 setTimeout(() => resolve(onFulfilled(this.value)), 0)
46 } else if (this.state === REJECTED) {
47 setTimeout(() => reject(onRejected(this.reason)), 0)
48 } else throw new Error('promise state error')
49 })
50 return p
51 }
52
53 catch(f) {
54 this.then(v => v, f)
55 }
56
57 static resolve(v) {
58 if (isEPromise(v)) return v
59 return new EPromise(resolve => resolve(v))
60 }
61
62 static reject(r) {
63 return new EPromise((_, reject) => reject(r))
64 }
65}

其他 Promise 的实现实际上内部也是在 resolve、reject 的时候进行“事件的通知”

Promise 将不可变的部分封装,可变的部分通过 onFulfilled、onRejected 交给开发者

Promise 中的多异步协作:all、race、allSettle……

流程控制

  • 尾触发与 next:中间件
  • async:串行、并行、依赖处理

并发控制

1for (let i = 0; i < 100; i++) {
2 asyncFn()
3}

如果并发量过大,下层服务可能会吃不消,可以通过一个队列来控制