Koa2 源码分析
— SourceCode — 2 min read
Table of Contents
序
Koa 由 express 同一个团队打造,目的是代替 express 成为下一代 Node.js 后端框架,Koa 由于十分简洁,其实更适合做一些框架的基础(egg.js、think.js),如今看来并不能代替 express,express 本身自带一些中间件支持,基于 express 编写的 nest 更是大而全,结合 TypeScript 写起来真的爽,虽然 Koa 用的人不如 express 多,但 Koa 确实有其进步的地方:支持 async/await
的异步中间件机制
本文由创建服务开始,讲述了 new Koa()
到 app.listen
Koa 所做的事,再由一条 http 请求详细介绍了 Koa 异步中间件机制,之后讲述对 ctx 的修改如何改动原生 req res,并带出一些 http 相关的知识,最后讲了 Koa 如何进行错误处理
🐣 创建服务
1.1 new Koa() 实例化
继承 EventEmitter,constructor 初始化参数
1this.middleware = [];2this.context = Object.create(context);3this.request = Object.create(request);4this.response = Object.create(response);
1.2 app.use(fn) 添加中间件
如何判断一个函数是 generator 函数:is-generator-function
1use(fn) {2 // 兼容 v1 Generator,判断 fn 是不是 generator,是则通过 koa-convert 用 co 进行处理3 this.middleware.push(fn)4 return this5}
1.3 app.listen(8080) 监听端口
1listen(...args) {2 const server = http.createServer(this.callback());3 return server.listen(...args);4}
就是调用 http 创建服务的 listen,关键在于 createServer 的参数
1.3.1 this.callback() 返回什么
组合中间件
通过 EventEmitter 监听 error 事件,触发 error 时用 this.onerror 处理
返回 handleRequest,作为 createServer 的回调函数,其中创建 ctx 然后调用自己的 handleRequest 传入 ctx 和组合好的中间件来处理请求
1callback() {2 const fn = compose(this.middleware);34 if (!this.listenerCount('error')) this.on('error', this.onerror);56 const handleRequest = (req, res) => {7 const ctx = this.createContext(req, res);8 return this.handleRequest(ctx, fn);9 };1011 return handleRequest;12}
1.3.1.1 this.createContext(req, res)
创建 ctx,挂载 Koa 的 request 和 response,挂载原生的 req 和 res
1createContext(req, res) {2 const context = Object.create(this.context);3 const request = context.request = Object.create(this.request);4 const response = context.response = Object.create(this.response);5 context.app = request.app = response.app = this;6 context.req = request.req = response.req = req;7 context.res = request.res = response.res = res;8 request.ctx = response.ctx = context;9 request.response = response;10 response.request = request;11 context.originalUrl = request.originalUrl = req.url;12 context.state = {};13 return context;14}
1.3.1.2 this.handleRequest(ctx, fn)
这个可以说是第一个中间件,处理默认的 onerror 和最后用户响应的 respond
最后 fnMiddleware(ctx).then(handleResponse).catch(onerror)
可以看出组合的中间件返回一个 promise,之后调用 respond(ctx) 发送响应,这里 catch 是默认的错误处理
1handleRequest(ctx, fnMiddleware) {2 const res = ctx.res;3 res.statusCode = 404;4 const onerror = err => ctx.onerror(err);5 const handleResponse = () => respond(ctx);6 onFinished(res, onerror);7 return fnMiddleware(ctx).then(handleResponse).catch(onerror);8}
中间件中通过给 ctx.body 赋值,传到 respond 中,对于不存在的状态码的请求响应 res.end(),respond 对于 HEAD 请求返回 res.end(),对于字符串和 Buffer 的 body 响应 res.end(body),对于流的 body 响应 body.pipe(res),最后通过 JSON.stringify(body) 处理其他的形式,如果是 JSON 则 res.end(body) 返回 stringify 得 JSON,如果报错通过后面 catch 处理,同时 fnMiddleware 中其他报错也通过 catch 处理
1function respond(ctx) {2 // 判断是否同意 ctx.respond3 // 204 205 304 忽略 body 的状态码4 // 处理 HEAD 请求5 // 处理 ctx.body == null67 // responses8 if (Buffer.isBuffer(body)) return res.end(body);9 if ('string' == typeof body) return res.end(body);10 if (body instanceof Stream) return body.pipe(res);1112 // body: json13 body = JSON.stringify(body);14 if (!res.headersSent) {15 ctx.length = Buffer.byteLength(body);16 }17 res.end(body)18}
🐥 一条 http 请求:Koa 核心 - 异步中间件机制
直接先看怎样组合中间件
1function compose (middleware) {2 // 检查参数3 if (!Array.isArray(middleware)) throw new TypeError('Middleware stack must be an array!')4 for (const fn of middleware) {5 if (typeof fn !== 'function') throw new TypeError('Middleware must be composed of functions!')6 }78 return function (context, next) {9 // last called middleware #10 let index = -111 return dispatch(0)12 function dispatch (i) {13 if (i <= index) return Promise.reject(new Error('next() called multiple times'))14 index = i15 let fn = middleware[i]16 if (i === middleware.length) fn = next17 if (!fn) return Promise.resolve()18 try {19 return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));20 } catch (err) {21 return Promise.reject(err)22 }23 }24 }25}
返回一个组合后的中间件,参数也和普通的中间件一样,dispatch 返回一个 Promise.resolve,从 dispatch(0) 开始展开写得到的 fnMiddleware 就是这样的:
1// fnMiddleware2function fnMiddleware(context, next) {3 return Promise.resolve(middleware[0](context, function next() {4 return Promise.resolve(middleware[1](context, function next() {5 return Promise.resolve(middleware[2](context, function next() {6 return Promise.resolve() // 这里 i === middleware.length 同时 fn 就是 fnMiddleware(ctx) 中传入的 undefined,所以直接 resolve7 }))8 }))9 }))10}
通过 fnMiddleware(ctx) 传入 context,各个 middleware 对 ctx 上的属性进行更改添加(ctx.body、ctx.type……)由于 ctx 对象传入 middlware context 参数的值是 ctx 的地址,所以之后的 middleware 中可以得到之前 middleware 中修改后的 ctx,内部调用 await next()
就是 resolve 下一个 middleware,这样就实现了 Koa 的洋葱模型机制
我们发现这其实和 JS 的 FP 中 compose 很像,我们用 reduce 实现试试:
1// compose2const compose = (fns) =>3 (context, next) => fns.reduceRight(4 (acc, cur) => Promise.resolve(cur(context, () => acc)),5 Promise.resolve('resolve'),6 )78const sleep = (time) => new Promise(resolve => setTimeout(() => resolve(time), time))910const ctx = { body: 0 }1112const middlewares = [13 async (ctx, next) => {14 ctx.body = 115 console.log(ctx.body) // 116 await next().then(() => {17 ctx.body = 618 console.log(ctx.body) // 619 })20 },21 async (ctx, next) => {22 await sleep(1000)23 ctx.body = 224 console.log(ctx.body) // 225 await next().then(() => {26 ctx.body = 527 console.log(ctx.body) // 528 })29 },30 async (ctx, next) => {31 await sleep(1000)32 ctx.body = 333 console.log(ctx.body) // 334 await next().then((r) => {35 console.log(r)36 ctx.body = 437 console.log(ctx.body) // 438 })39 },40]4142compose(middlewares)(ctx)
这时发现只会停顿 1 秒,然后顺序也不对,而换成官方的 compose 就一切正常,停顿两秒同时顺序正确,这是因为 reduce 和 reduceRight 是同步的,只停顿一秒是因为两个 sleep 并行了(类似 Promise.all([asyncFn1, asyncFn2])
),Koa 中间件的特殊与对处理请求的进步点就在于此,不同于 express 和 redux 的中间件机制,Koa 实现的是异步中间件
同时这样也可以通过 i 来检查一个 middleware 中是否多次调用 next 的作用,Koa 没有使用这种方式也是为了保证 i 可以检查多次调用 next
1function dispatch (i) {2 if (i <= index) return Promise.reject(new Error('next() called multiple times'))3 index = i // 这里修改 index4 let fn = middleware[0]5 if (i === middleware.length) fn = next6 if (!fn) return Promise.resolve()7 try {8 return Promise.resolve((async (ctx, next) => {9 // 假设现在是 dispatch(0),此时 index = 0,i = 010 await dispatch.bind(null, 0 + 1)() // 内部修改 index = 111 await dispatch.bind(null, 0 + 1)() // 内部 i <= index:1 <= 1 成立 reject 掉12 })(context, dispatch.bind(null, 0 + 1)));13 } catch (err) {14 return Promise.reject(err)15 }16}
看完中间件机制再来看 http 请求从接受到响应就简单了,this.callback 返回的就是 http.createServer 的回调函数,所以 req res 从这里接收,之后 req res 进入 createContext 挂载到 ctx 对象上,之后把组合好的 fnMiddleware 和 ctx 传入 this.handleRequest,这里处理好 onerror 和 respond 之后开始把 ctx 传入 fnMiddleware,通过开发者编写的中间件对 req res 进行真正的处理,最后处理好后通过 .then(() => respond(ctx))
作出响应
简化的迭代版 compose
1function compose(mws) {2 return (context, next) => {3 let next = () => Promise.resolve()4 for (const mw of [...mws].reverse()) {5 const nextFn = next6 next = () => Promise.resolve(mw(context, nextFn))7 }8 return next()9 }10}
⛅️ request response 代理原生 req res
我们对 ctx 上处理一般是对 ctx.request 和 ctx.response 处理,但 request response 只是对原生 req res 通过 __defineGetter__ 和 __defineSetter__ 做的代理,最终的修改还是对 req res 的修改,我们通过几处看看这层代理有什么作用
__defineGetter__、__defineSetter__ 并不是标准,不推荐使用
- request 的 get 方法,这里为了那请求头的字段,对 referer 和 referrer 做了兼容
Referer 的正确拼写是 Referrer,但是写入标准的时候,不知为何,没人发现少了一个字母 r。标准定案以后,只能将错就错,所有头信息的该字段都一律错误拼写成 Referer
1get(field) {2 const req = this.req;3 switch (field = field.toLowerCase()) {4 case 'referer':5 case 'referrer':6 return req.headers.referrer || req. headers.referer || '';7 default:8 return req.headers[field] || '';9 }10},
- get set 中处理 req res,是开发者更方便的拿到一些数据
1get query() {2 const str = this.querystring;3 const c = this._querycache = this. _querycache || {};4 return c[str] || (c[str] = qs.parse(str)) ;5},67set query(obj) {8 this.querystring = qs.stringify(obj);9},
1set etag(val) {2 if (!/^(W\/)?"/.test(val)) val = `"${val} "`;3 this.set('ETag', val);4},56get etag() {7 return this.get('ETag');8},
- response 中 body 的处理,对于不同格式 body 的赋值,在 set 中对 type、length 等连同一起处理,方便开发者
1get body() {2 return this._body;3},45set body(val) {6 const original = this._body;7 this._body = val;89 // no content10 if (null == val) {11 if (!statuses.empty[this.status]) this. status = 204;12 this.remove('Content-Type');13 this.remove('Content-Length');14 this.remove('Transfer-Encoding');15 return;16 }1718 // set the status19 if (!this._explicijstatus) this.status = 200;2021 // set the content-type only if not yet set22 const setType = !this.has('Content-Type') ;2324 // string25 if ('string' == typeof val) {26 if (setType) this.type = /^\s*</.test (val) ? 'html' : 'text';27 this.length = Buffer.byteLength(val);28 return;29 }3031 // buffer32 if (Buffer.isBuffer(val)) {33 if (setType) this.type = 'bin';34 this.length = val.length;35 return;36 }3738 // stream39 if ('function' == typeof val.pipe) {40 onFinish(this.res, destroy.bind(null, val));41 ensureErrorHandler(val, err => this. ctx.onerror(err));4243 // overwriting44 if (null != original && original != val) this.remove('Content-Length');4546 if (setType) this.type = 'bin';47 return;48 }4950 // json51 this.remove('Content-Length');52 this.type = 'json';53},
- response 中 redirect 的封装,context 中 cookie 的封装,提供一些封装好的方法
1redirect(url, alt) {2 // location3 if ('back' == url) url = this.ctx.get ('Referrer') || alt || '/';4 this.set('Location', encodeUrl(url));56 // status7 if (!statuses.redirect[this.status]) this.status = 302;89 // html10 if (this.ctx.accepjs('html')) {11 url = escape(url);12 this.type = 'text/html; charset=utf-8';13 this.body = `Redirecting to <a href="$ {url}">${url}</a>.`;14 return;15 }1617 // text18 this.type = 'text/plain; charset=utf-8';19 this.body = `Redirecting to ${url}.`;20},
1get cookies() {2 if (!this[COOKIES]) {3 this[COOKIES] = new Cookies(this.req, this.res, {4 keys: this.app.keys,5 secure: this.request.secure6 });7 }8 return this[COOKIES];9},1011set cookies(_cookies) {12 this[COOKIES] = _cookies;13}
- 提供 toJSON 方便调试
1toJSON() {2 return {3 request: this.request.toJSON(),4 response: this.response.toJSON(),5 app: this.app.toJSON(),6 originalUrl: this.originalUrl,7 req: '<original node req>',8 res: '<original node res>',9 socket: '<original node socket>'10 };11},
其他还有在 ctx 用 delegate 库对一些常用数据直接代理到 ctx 对象上(ctx.body === ctx.response.body)
3.1 引出的一些 http 协议知识点
// TODO: 引出的一些 http 协议知识点
👾 错误处理
官方有三种方式:
中间件中
try/catch
添加错误处理中间件(或使用默认的)
处理 error 事件(默认是打 log)
第一种没什么好说的,第二种中默认的就是 this.handleRequest
中的 onerror,直接使用 const onerror = err => ctx.onerror(err)
处理,ctx.onerrer 会用 statuses 库对 error.code 作出对应的响应信息,同时 ctx.onerror 还会触发(emit)error 事件(Application 继承 EventEmitter),app 默认的 on('error', this.onerror)
是对错误信息打 log
一般默认的就够用了,如果有其他需求可以修改 app.onerror 或添加错误中间件处理
1app.use(async (ctx, next) => {2 try {3 await next();4 } catch (err) {5 // will only respond with JSON6 ctx.status = err.statusCode || err.status || 500;7 ctx.body = {8 message: err.message9 };10 }11})
这里相当于在第一个中间件 catch 之后中间件的错误,在 respond 之前处理好错误然后通过 respond 响应,错误就不会被之后 .catch
抓住,以此覆盖默认的 onerror
🌊 One more thing --- 流
http 请求非常适合看作一个流,Koa 中 use 的中间件相当于一个管道,对 ctx 流的数据进行处理,处理完后 respond
结合 RxJS 的伪代码:
1const server = http2 .createServer()3 .listen(8080, () => console.log('Server is running on port 8080...'))45const server$ = fromEvent<[IncomingMessage, ServerResponse]>(server, 'request')6const ctx$ = server$.pipe(map(([req, res]) => ({ req, res })))78ctx$.pipe(9 map(routerMiddleware),10 map(controllerMiddleware),11 map(serverMiddleware),12 map(errorHandlerMiddleware),13).subscribe({14 next: respond,15 error: defaultErrorHandler,16 complete: defaultCompleteHandler,17})
当然非常不完整,只是一个想法的体现,其实已经有人实现了用 RxJS 的后端框架:marblejs,当然我们还是要结合实际进行选择
🥳 结语
使用 TypeScript 实现了一个简易版的 Koa,删减了很多 ctx 对 request response 的处理,只体现了核心思路,感兴趣可以看看:ts-koa-core
通过看 Koa 源码同时简单看了看相关依赖的库的源码,也算对以前不理解的地方有了更清晰的理解
以前对 cookie、etag 什么的不知道到底怎么写的,现在发现其实就是对 node 的 req res 的修改,当然 node 底层也做了很多,但也算在一定抽象层次上明白了后端其实是对 req res 的处理,同时作出 side effect,复杂时就需要处理复杂情况
还用中间件机制为什么 compose 与 FP 常写的 compose 很不一样,还有 i 对多次调用 next 的检查真的非常巧妙,真的非常佩服 TJ 大神