Skip to content

AHABHGK

Koa2 源码分析

SourceCode2 min read

Table of Contents

Koa 由 express 同一个团队打造,目的是代替 express 成为下一代 Node.js 后端框架,Koa 由于十分简洁,其实更适合做一些框架的基础(egg.js、think.js),如今看来并不能代替 express,express 本身自带一些中间件支持,基于 express 编写的 nest 更是大而全,结合 TypeScript 写起来真的爽,虽然 Koa 用的人不如 express 多,但 Koa 确实有其进步的地方:支持 async/await 的异步中间件机制

本文由创建服务开始,讲述了 new Koa()app.listen Koa 所做的事,再由一条 http 请求详细介绍了 Koa 异步中间件机制,之后讲述对 ctx 的修改如何改动原生 req res,并带出一些 http 相关的知识,最后讲了 Koa 如何进行错误处理

🐣 创建服务

1.1 new Koa() 实例化

继承 EventEmitter,constructor 初始化参数

lib/application.js
1this.middleware = [];
2this.context = Object.create(context);
3this.request = Object.create(request);
4this.response = Object.create(response);

1.2 app.use(fn) 添加中间件

如何判断一个函数是 generator 函数:is-generator-function

lib/application.js
1use(fn) {
2 // 兼容 v1 Generator,判断 fn 是不是 generator,是则通过 koa-convert 用 co 进行处理
3 this.middleware.push(fn)
4 return this
5}

1.3 app.listen(8080) 监听端口

lib/application.js
1listen(...args) {
2 const server = http.createServer(this.callback());
3 return server.listen(...args);
4}

就是调用 http 创建服务的 listen,关键在于 createServer 的参数

1.3.1 this.callback() 返回什么

  1. 组合中间件

  2. 通过 EventEmitter 监听 error 事件,触发 error 时用 this.onerror 处理

  3. 返回 handleRequest,作为 createServer 的回调函数,其中创建 ctx 然后调用自己的 handleRequest 传入 ctx 和组合好的中间件来处理请求

lib/application.js
1callback() {
2 const fn = compose(this.middleware);
3
4 if (!this.listenerCount('error')) this.on('error', this.onerror);
5
6 const handleRequest = (req, res) => {
7 const ctx = this.createContext(req, res);
8 return this.handleRequest(ctx, fn);
9 };
10
11 return handleRequest;
12}
1.3.1.1 this.createContext(req, res)

创建 ctx,挂载 Koa 的 request 和 response,挂载原生的 req 和 res

lib/application.js
1createContext(req, res) {
2 const context = Object.create(this.context);
3 const request = context.request = Object.create(this.request);
4 const response = context.response = Object.create(this.response);
5 context.app = request.app = response.app = this;
6 context.req = request.req = response.req = req;
7 context.res = request.res = response.res = res;
8 request.ctx = response.ctx = context;
9 request.response = response;
10 response.request = request;
11 context.originalUrl = request.originalUrl = req.url;
12 context.state = {};
13 return context;
14}
1.3.1.2 this.handleRequest(ctx, fn)

这个可以说是第一个中间件,处理默认的 onerror 和最后用户响应的 respond

最后 fnMiddleware(ctx).then(handleResponse).catch(onerror) 可以看出组合的中间件返回一个 promise,之后调用 respond(ctx) 发送响应,这里 catch 是默认的错误处理

lib/application.js
1handleRequest(ctx, fnMiddleware) {
2 const res = ctx.res;
3 res.statusCode = 404;
4 const onerror = err => ctx.onerror(err);
5 const handleResponse = () => respond(ctx);
6 onFinished(res, onerror);
7 return fnMiddleware(ctx).then(handleResponse).catch(onerror);
8}

中间件中通过给 ctx.body 赋值,传到 respond 中,对于不存在的状态码的请求响应 res.end(),respond 对于 HEAD 请求返回 res.end(),对于字符串和 Buffer 的 body 响应 res.end(body),对于流的 body 响应 body.pipe(res),最后通过 JSON.stringify(body) 处理其他的形式,如果是 JSON 则 res.end(body) 返回 stringify 得 JSON,如果报错通过后面 catch 处理,同时 fnMiddleware 中其他报错也通过 catch 处理

lib/application.js
1function respond(ctx) {
2 // 判断是否同意 ctx.respond
3 // 204 205 304 忽略 body 的状态码
4 // 处理 HEAD 请求
5 // 处理 ctx.body == null
6
7 // responses
8 if (Buffer.isBuffer(body)) return res.end(body);
9 if ('string' == typeof body) return res.end(body);
10 if (body instanceof Stream) return body.pipe(res);
11
12 // body: json
13 body = JSON.stringify(body);
14 if (!res.headersSent) {
15 ctx.length = Buffer.byteLength(body);
16 }
17 res.end(body)
18}

🐥 一条 http 请求:Koa 核心 - 异步中间件机制

直接先看怎样组合中间件

koa-compose
1function compose (middleware) {
2 // 检查参数
3 if (!Array.isArray(middleware)) throw new TypeError('Middleware stack must be an array!')
4 for (const fn of middleware) {
5 if (typeof fn !== 'function') throw new TypeError('Middleware must be composed of functions!')
6 }
7
8 return function (context, next) {
9 // last called middleware #
10 let index = -1
11 return dispatch(0)
12 function dispatch (i) {
13 if (i <= index) return Promise.reject(new Error('next() called multiple times'))
14 index = i
15 let fn = middleware[i]
16 if (i === middleware.length) fn = next
17 if (!fn) return Promise.resolve()
18 try {
19 return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));
20 } catch (err) {
21 return Promise.reject(err)
22 }
23 }
24 }
25}

返回一个组合后的中间件,参数也和普通的中间件一样,dispatch 返回一个 Promise.resolve,从 dispatch(0) 开始展开写得到的 fnMiddleware 就是这样的:

1// fnMiddleware
2function fnMiddleware(context, next) {
3 return Promise.resolve(middleware[0](context, function next() {
4 return Promise.resolve(middleware[1](context, function next() {
5 return Promise.resolve(middleware[2](context, function next() {
6 return Promise.resolve() // 这里 i === middleware.length 同时 fn 就是 fnMiddleware(ctx) 中传入的 undefined,所以直接 resolve
7 }))
8 }))
9 }))
10}

通过 fnMiddleware(ctx) 传入 context,各个 middleware 对 ctx 上的属性进行更改添加(ctx.body、ctx.type……)由于 ctx 对象传入 middlware context 参数的值是 ctx 的地址,所以之后的 middleware 中可以得到之前 middleware 中修改后的 ctx,内部调用 await next() 就是 resolve 下一个 middleware,这样就实现了 Koa 的洋葱模型机制

我们发现这其实和 JS 的 FP 中 compose 很像,我们用 reduce 实现试试:

1// compose
2const compose = (fns) =>
3 (context, next) => fns.reduceRight(
4 (acc, cur) => Promise.resolve(cur(context, () => acc)),
5 Promise.resolve('resolve'),
6 )
7
8const sleep = (time) => new Promise(resolve => setTimeout(() => resolve(time), time))
9
10const ctx = { body: 0 }
11
12const middlewares = [
13 async (ctx, next) => {
14 ctx.body = 1
15 console.log(ctx.body) // 1
16 await next().then(() => {
17 ctx.body = 6
18 console.log(ctx.body) // 6
19 })
20 },
21 async (ctx, next) => {
22 await sleep(1000)
23 ctx.body = 2
24 console.log(ctx.body) // 2
25 await next().then(() => {
26 ctx.body = 5
27 console.log(ctx.body) // 5
28 })
29 },
30 async (ctx, next) => {
31 await sleep(1000)
32 ctx.body = 3
33 console.log(ctx.body) // 3
34 await next().then((r) => {
35 console.log(r)
36 ctx.body = 4
37 console.log(ctx.body) // 4
38 })
39 },
40]
41
42compose(middlewares)(ctx)

这时发现只会停顿 1 秒,然后顺序也不对,而换成官方的 compose 就一切正常,停顿两秒同时顺序正确,这是因为 reduce 和 reduceRight 是同步的,只停顿一秒是因为两个 sleep 并行了(类似 Promise.all([asyncFn1, asyncFn2])),Koa 中间件的特殊与对处理请求的进步点就在于此,不同于 express 和 redux 的中间件机制,Koa 实现的是异步中间件

同时这样也可以通过 i 来检查一个 middleware 中是否多次调用 next 的作用,Koa 没有使用这种方式也是为了保证 i 可以检查多次调用 next

koa-compose
1function dispatch (i) {
2 if (i <= index) return Promise.reject(new Error('next() called multiple times'))
3 index = i // 这里修改 index
4 let fn = middleware[0]
5 if (i === middleware.length) fn = next
6 if (!fn) return Promise.resolve()
7 try {
8 return Promise.resolve((async (ctx, next) => {
9 // 假设现在是 dispatch(0),此时 index = 0,i = 0
10 await dispatch.bind(null, 0 + 1)() // 内部修改 index = 1
11 await dispatch.bind(null, 0 + 1)() // 内部 i <= index:1 <= 1 成立 reject 掉
12 })(context, dispatch.bind(null, 0 + 1)));
13 } catch (err) {
14 return Promise.reject(err)
15 }
16}

看完中间件机制再来看 http 请求从接受到响应就简单了,this.callback 返回的就是 http.createServer 的回调函数,所以 req res 从这里接收,之后 req res 进入 createContext 挂载到 ctx 对象上,之后把组合好的 fnMiddleware 和 ctx 传入 this.handleRequest,这里处理好 onerror 和 respond 之后开始把 ctx 传入 fnMiddleware,通过开发者编写的中间件对 req res 进行真正的处理,最后处理好后通过 .then(() => respond(ctx)) 作出响应

简化的迭代版 compose

1function compose(mws) {
2 return (context, next) => {
3 let next = () => Promise.resolve()
4 for (const mw of [...mws].reverse()) {
5 const nextFn = next
6 next = () => Promise.resolve(mw(context, nextFn))
7 }
8 return next()
9 }
10}

⛅️ request response 代理原生 req res

我们对 ctx 上处理一般是对 ctx.request 和 ctx.response 处理,但 request response 只是对原生 req res 通过 __defineGetter____defineSetter__ 做的代理,最终的修改还是对 req res 的修改,我们通过几处看看这层代理有什么作用

__defineGetter____defineSetter__ 并不是标准,不推荐使用

  1. request 的 get 方法,这里为了那请求头的字段,对 referer 和 referrer 做了兼容

Referer 的正确拼写是 Referrer,但是写入标准的时候,不知为何,没人发现少了一个字母 r。标准定案以后,只能将错就错,所有头信息的该字段都一律错误拼写成 Referer

lib/request.js
1get(field) {
2 const req = this.req;
3 switch (field = field.toLowerCase()) {
4 case 'referer':
5 case 'referrer':
6 return req.headers.referrer || req. headers.referer || '';
7 default:
8 return req.headers[field] || '';
9 }
10},
  1. get set 中处理 req res,是开发者更方便的拿到一些数据
lib/request.js
1get query() {
2 const str = this.querystring;
3 const c = this._querycache = this. _querycache || {};
4 return c[str] || (c[str] = qs.parse(str)) ;
5},
6
7set query(obj) {
8 this.querystring = qs.stringify(obj);
9},
lib/response.js
1set etag(val) {
2 if (!/^(W\/)?"/.test(val)) val = `"${val} "`;
3 this.set('ETag', val);
4},
5
6get etag() {
7 return this.get('ETag');
8},
  1. response 中 body 的处理,对于不同格式 body 的赋值,在 set 中对 type、length 等连同一起处理,方便开发者
lib/response.js
1get body() {
2 return this._body;
3},
4
5set body(val) {
6 const original = this._body;
7 this._body = val;
8
9 // no content
10 if (null == val) {
11 if (!statuses.empty[this.status]) this. status = 204;
12 this.remove('Content-Type');
13 this.remove('Content-Length');
14 this.remove('Transfer-Encoding');
15 return;
16 }
17
18 // set the status
19 if (!this._explicijstatus) this.status = 200;
20
21 // set the content-type only if not yet set
22 const setType = !this.has('Content-Type') ;
23
24 // string
25 if ('string' == typeof val) {
26 if (setType) this.type = /^\s*</.test (val) ? 'html' : 'text';
27 this.length = Buffer.byteLength(val);
28 return;
29 }
30
31 // buffer
32 if (Buffer.isBuffer(val)) {
33 if (setType) this.type = 'bin';
34 this.length = val.length;
35 return;
36 }
37
38 // stream
39 if ('function' == typeof val.pipe) {
40 onFinish(this.res, destroy.bind(null, val));
41 ensureErrorHandler(val, err => this. ctx.onerror(err));
42
43 // overwriting
44 if (null != original && original != val) this.remove('Content-Length');
45
46 if (setType) this.type = 'bin';
47 return;
48 }
49
50 // json
51 this.remove('Content-Length');
52 this.type = 'json';
53},
  1. response 中 redirect 的封装,context 中 cookie 的封装,提供一些封装好的方法
lib/response.js
1redirect(url, alt) {
2 // location
3 if ('back' == url) url = this.ctx.get ('Referrer') || alt || '/';
4 this.set('Location', encodeUrl(url));
5
6 // status
7 if (!statuses.redirect[this.status]) this.status = 302;
8
9 // html
10 if (this.ctx.accepjs('html')) {
11 url = escape(url);
12 this.type = 'text/html; charset=utf-8';
13 this.body = `Redirecting to <a href="$ {url}">${url}</a>.`;
14 return;
15 }
16
17 // text
18 this.type = 'text/plain; charset=utf-8';
19 this.body = `Redirecting to ${url}.`;
20},
lib/context.js
1get cookies() {
2 if (!this[COOKIES]) {
3 this[COOKIES] = new Cookies(this.req, this.res, {
4 keys: this.app.keys,
5 secure: this.request.secure
6 });
7 }
8 return this[COOKIES];
9},
10
11set cookies(_cookies) {
12 this[COOKIES] = _cookies;
13}
  1. 提供 toJSON 方便调试
lib/context.js
1toJSON() {
2 return {
3 request: this.request.toJSON(),
4 response: this.response.toJSON(),
5 app: this.app.toJSON(),
6 originalUrl: this.originalUrl,
7 req: '<original node req>',
8 res: '<original node res>',
9 socket: '<original node socket>'
10 };
11},

其他还有在 ctx 用 delegate 库对一些常用数据直接代理到 ctx 对象上(ctx.body === ctx.response.body)

3.1 引出的一些 http 协议知识点

// TODO: 引出的一些 http 协议知识点

👾 错误处理

官方有三种方式:

  1. 中间件中 try/catch

  2. 添加错误处理中间件(或使用默认的)

  3. 处理 error 事件(默认是打 log)

第一种没什么好说的,第二种中默认的就是 this.handleRequest 中的 onerror,直接使用 const onerror = err => ctx.onerror(err) 处理,ctx.onerrer 会用 statuses 库对 error.code 作出对应的响应信息,同时 ctx.onerror 还会触发(emit)error 事件(Application 继承 EventEmitter),app 默认的 on('error', this.onerror) 是对错误信息打 log

一般默认的就够用了,如果有其他需求可以修改 app.onerror 或添加错误中间件处理

1app.use(async (ctx, next) => {
2 try {
3 await next();
4 } catch (err) {
5 // will only respond with JSON
6 ctx.status = err.statusCode || err.status || 500;
7 ctx.body = {
8 message: err.message
9 };
10 }
11})

这里相当于在第一个中间件 catch 之后中间件的错误,在 respond 之前处理好错误然后通过 respond 响应,错误就不会被之后 .catch 抓住,以此覆盖默认的 onerror

🌊 One more thing --- 流

http 请求非常适合看作一个流,Koa 中 use 的中间件相当于一个管道,对 ctx 流的数据进行处理,处理完后 respond

结合 RxJS 的伪代码:

1const server = http
2 .createServer()
3 .listen(8080, () => console.log('Server is running on port 8080...'))
4
5const server$ = fromEvent<[IncomingMessage, ServerResponse]>(server, 'request')
6const ctx$ = server$.pipe(map(([req, res]) => ({ req, res })))
7
8ctx$.pipe(
9 map(routerMiddleware),
10 map(controllerMiddleware),
11 map(serverMiddleware),
12 map(errorHandlerMiddleware),
13).subscribe({
14 next: respond,
15 error: defaultErrorHandler,
16 complete: defaultCompleteHandler,
17})

当然非常不完整,只是一个想法的体现,其实已经有人实现了用 RxJS 的后端框架:marblejs,当然我们还是要结合实际进行选择

🥳 结语

使用 TypeScript 实现了一个简易版的 Koa,删减了很多 ctx 对 request response 的处理,只体现了核心思路,感兴趣可以看看:ts-koa-core

通过看 Koa 源码同时简单看了看相关依赖的库的源码,也算对以前不理解的地方有了更清晰的理解

以前对 cookie、etag 什么的不知道到底怎么写的,现在发现其实就是对 node 的 req res 的修改,当然 node 底层也做了很多,但也算在一定抽象层次上明白了后端其实是对 req res 的处理,同时作出 side effect,复杂时就需要处理复杂情况

还用中间件机制为什么 compose 与 FP 常写的 compose 很不一样,还有 i 对多次调用 next 的检查真的非常巧妙,真的非常佩服 TJ 大神