玩转进程
- 同步
- 复制进程:占用资源大,不能共享状态
- 多线程:上下文切换
- 事件驱动
多进程
1var http = require('http');23http.createServer(function (req, res) {4 res.writeHead(200, {'Content-Type': 'text/plain'});5 res.end('Hello World\n');6}).listen(Math.round((1 + Math.random()) * 1000), '127.0.0.1');
1var fork = require('child_process').fork;2var cpus = require('os').cpus();34for (var i = 0; i < cpus.length; i++) {5 fork('./worker.js');6}
ps aux | grep worker.js
查看进程数量
主从模式,主进程负责调度管理工作进程,工作进程负责业务
创建子进程
可执行文件是 js 时,首行必须有 #!/usr/bin/env node
进程间通信
创建子进程时,父子进程间会创建 IPC 通道,父子进程间通过 IPC 通道和 message 事件、send 方法通信
IPC(Inter-Process Communication)由 libuv 提供,libuv 根据平台进行封装
父进程会先创建 IPC 通道并监听它,然后创建子进程,并通过环境变量(NODE_CHANNEL_FD)告诉子进程这个 IPC 通道的文件描述符,子进程启动时,通过这个文件描述符连接 IPC 通道,从而实现进程通信
句柄传递
通过代理可以避免重复监听造成的冲突问题,还可以做负载均衡,但是每次会用到两个文件描述符,操作系统的文件描述符有限
为了解决上述问题,Node 引入进程间发送句柄的功能,句柄是一种可以用来标记资源的引用
1var child = require('child_process').fork('child.js');23// Open up the server object and send the handle4var server = require('net').createServer();56server.on('connection', function (socket) {7 socket.end('handled by parent\n');8});9server.listen(1337, function () {10 child.send('server', server); // 第二个参数,发送句柄11});
1process.on('message', function (m, server) {2 if (m === 'server') {3 server.on('connection', function (socket) {4 socket.end('handled by child\n');5 });6 }7});
子进程可以发送的句柄有 net.Socket、net.Server、net.Native、dgram.Socket、dgram.Native
send 会把 message = { cmd: 'NODE_HANDLE', type: 'net.Server', msg: message }
和句柄文件描述符经过 JSON.stringfy 发送到 IPC 中,接收后经过 parse 根据 message.type 和文件描述符一起还原出对应的对象
以发送的 TCP 服务器句柄为例,子进程收到消息后的还原过程代码如下:
1function(message,handle,emit){2 var self = this;34 var server = new net.Server();5 server.listen(handler,function(){6 emit(server);7 });8}
Node 进程间只有消息通信,不会真正的传递对象
进程间通信是利用内核管理一块内存,不同进程可以读写这块内容,进而可以互相通信
集群
进程事件
error, exit, close, disconnect
1// 子进程2child.kill([signal]);3// 当前进程4process.kill(pid, [signal]);
signal 是 POSIX 标准中的,有不同含义,进程收到后应作出约定的行为
1process.on('SIGTERM', function() {2 console.log('Got a SIGTERM, exiting...');3 process.exit(1);4});56console.log('server running with PID: ', process.pid)7process.kill(process.pid, 'SIGTERM')
自动重启
1var fork = require('child_process').fork;2var cpus = require('os').cpus();3var server = require('net').createServer();45server.listen(1337);67// 重启次数8var limit = 10;9// 时间单位10var during = 60000;11var restart = [];12var isTooFrequently = function () {13 // 记录重启时间14 var time = Date.now();15 var length = restart.push(time);16 if (length > limit) {17 restart = restart.slice(limit * -1);18 }19 return restart.length >= limit && restart[restart.length - 1] - restart[0] < during;20};2122var workers = {};23var createWorker = function () {24 if (isTooFrequently()) {25 // 触发 giveup 事件后,不再重启26 process.emit('giveup', length, during);27 return;28 }29 var worker = fork(__dirname + '/worker.js');30 // 启动新的进程31 worker.on('message', function (message) {32 if (message.act === 'suicide') { // uncaughtException 时自杀重启33 createWorker();34 }35 });36 worker.on('exit', function () {37 console.log('Worker ' + worker.pid + ' exited.');38 delete workers[worker.pid];39 });40 // 句柄转发41 worker.send('server', server);42 workers[worker.pid] = worker;43 console.log('Create worker. pid: ' + worker.pid);44};4546for (var i = 0; i < cpus.length; i++) {47 createWorker();48}4950// 进程自己退出时让所有工作进程退出51process.on('exit', function () {52 for (var pid in workers) {53 workers[pid].kill();54 }55});
1// worker.js2var http = require('http');3var server = http.createServer(function (req, res) {4 res.writeHead(200, {'Content-Type': 'text/plain'});5 res.end('handled by child, pid is ' + process.pid + '\n');6});78var worker;9process.on('message', function (m, tcp) {10 if (m === 'server') {11 worker = tcp;12 worker.on('connection', function (socket) {13 server.emit('connection', socket);14 });15 }16});17process.on('uncaughtException', function (e) {18 // 记录日志19 logger.error(e)20 // 自杀21 process.send({act: 'suicide'});22 // 停止接收所有新的连接23 worker.close(function () {24 // 所有已有连接断开后,退出进程25 process.exit(1);26 });27 // 连接可能是长连接,等待断开可能需要较长时间,设置一个超时时间28 setTimeout(() => process.exit(1), 5000)29});
负载均衡
Round-Robin:每次选择第 i = (i + 1) % n 个进程
状态共享
进程间不能共享数据
第三方数据存储:redis
不能即时同步状态,需要轮询
主动通知
在 redis 和工作进程之间加一个 pub/sub
cluster
cluster 是 child_process 和 net 的组合应用,cluster 启动时,会在内部启动 TCP 服务器,在 cluster.fork 子进程时,将这个 TCP 服务器端 socket 的文件描述符发送给工作线程,如果进程是 fork 出来的,进程的环境变量里就有 NODE_UNIQUE_ID,如果工作进程中有 listen 监听端口调用,进程就能拿到文件描述符,通过 SO_REUSEADDR 端口重用,从而实现多个子进程共享端口,对于普通方式启动的进程,则不存在文件描述符共享传递的等事情