Skip to content

AHABHGK

玩转进程

  1. 同步
  2. 复制进程:占用资源大,不能共享状态
  3. 多线程:上下文切换
  4. 事件驱动

多进程

worker.js
1var http = require('http');
2
3http.createServer(function (req, res) {
4 res.writeHead(200, {'Content-Type': 'text/plain'});
5 res.end('Hello World\n');
6}).listen(Math.round((1 + Math.random()) * 1000), '127.0.0.1');
master.js
1var fork = require('child_process').fork;
2var cpus = require('os').cpus();
3
4for (var i = 0; i < cpus.length; i++) {
5 fork('./worker.js');
6}

ps aux | grep worker.js 查看进程数量

fork 进程

主从模式,主进程负责调度管理工作进程,工作进程负责业务

创建子进程

创建子进程

可执行文件是 js 时,首行必须有 #!/usr/bin/env node

进程间通信

创建子进程时,父子进程间会创建 IPC 通道,父子进程间通过 IPC 通道和 message 事件、send 方法通信

IPC(Inter-Process Communication)由 libuv 提供,libuv 根据平台进行封装

父进程会先创建 IPC 通道并监听它,然后创建子进程,并通过环境变量(NODE_CHANNEL_FD)告诉子进程这个 IPC 通道的文件描述符,子进程启动时,通过这个文件描述符连接 IPC 通道,从而实现进程通信

句柄传递

通过代理可以避免重复监听造成的冲突问题,还可以做负载均衡,但是每次会用到两个文件描述符,操作系统的文件描述符有限

为了解决上述问题,Node 引入进程间发送句柄的功能,句柄是一种可以用来标记资源的引用

master.js
1var child = require('child_process').fork('child.js');
2
3// Open up the server object and send the handle
4var server = require('net').createServer();
5
6server.on('connection', function (socket) {
7 socket.end('handled by parent\n');
8});
9server.listen(1337, function () {
10 child.send('server', server); // 第二个参数,发送句柄
11});
1process.on('message', function (m, server) {
2 if (m === 'server') {
3 server.on('connection', function (socket) {
4 socket.end('handled by child\n');
5 });
6 }
7});

子进程可以发送的句柄有 net.Socket、net.Server、net.Native、dgram.Socket、dgram.Native

send 会把 message = { cmd: 'NODE_HANDLE', type: 'net.Server', msg: message } 和句柄文件描述符经过 JSON.stringfy 发送到 IPC 中,接收后经过 parse 根据 message.type 和文件描述符一起还原出对应的对象

ipc

以发送的 TCP 服务器句柄为例,子进程收到消息后的还原过程代码如下:

1function(message,handle,emit){
2 var self = this;
3
4 var server = new net.Server();
5 server.listen(handler,function(){
6 emit(server);
7 });
8}

Node 进程间只有消息通信,不会真正的传递对象

进程间通信是利用内核管理一块内存,不同进程可以读写这块内容,进而可以互相通信

集群

进程事件

error, exit, close, disconnect

1// 子进程
2child.kill([signal]);
3// 当前进程
4process.kill(pid, [signal]);

signal 是 POSIX 标准中的,有不同含义,进程收到后应作出约定的行为

1process.on('SIGTERM', function() {
2 console.log('Got a SIGTERM, exiting...');
3 process.exit(1);
4});
5
6console.log('server running with PID: ', process.pid)
7process.kill(process.pid, 'SIGTERM')

自动重启

master.js
1var fork = require('child_process').fork;
2var cpus = require('os').cpus();
3var server = require('net').createServer();
4
5server.listen(1337);
6
7// 重启次数
8var limit = 10;
9// 时间单位
10var during = 60000;
11var restart = [];
12var isTooFrequently = function () {
13 // 记录重启时间
14 var time = Date.now();
15 var length = restart.push(time);
16 if (length > limit) {
17 restart = restart.slice(limit * -1);
18 }
19 return restart.length >= limit && restart[restart.length - 1] - restart[0] < during;
20};
21
22var workers = {};
23var createWorker = function () {
24 if (isTooFrequently()) {
25 // 触发 giveup 事件后,不再重启
26 process.emit('giveup', length, during);
27 return;
28 }
29 var worker = fork(__dirname + '/worker.js');
30 // 启动新的进程
31 worker.on('message', function (message) {
32 if (message.act === 'suicide') { // uncaughtException 时自杀重启
33 createWorker();
34 }
35 });
36 worker.on('exit', function () {
37 console.log('Worker ' + worker.pid + ' exited.');
38 delete workers[worker.pid];
39 });
40 // 句柄转发
41 worker.send('server', server);
42 workers[worker.pid] = worker;
43 console.log('Create worker. pid: ' + worker.pid);
44};
45
46for (var i = 0; i < cpus.length; i++) {
47 createWorker();
48}
49
50// 进程自己退出时让所有工作进程退出
51process.on('exit', function () {
52 for (var pid in workers) {
53 workers[pid].kill();
54 }
55});
1// worker.js
2var http = require('http');
3var server = http.createServer(function (req, res) {
4 res.writeHead(200, {'Content-Type': 'text/plain'});
5 res.end('handled by child, pid is ' + process.pid + '\n');
6});
7
8var worker;
9process.on('message', function (m, tcp) {
10 if (m === 'server') {
11 worker = tcp;
12 worker.on('connection', function (socket) {
13 server.emit('connection', socket);
14 });
15 }
16});
17process.on('uncaughtException', function (e) {
18 // 记录日志
19 logger.error(e)
20 // 自杀
21 process.send({act: 'suicide'});
22 // 停止接收所有新的连接
23 worker.close(function () {
24 // 所有已有连接断开后,退出进程
25 process.exit(1);
26 });
27 // 连接可能是长连接,等待断开可能需要较长时间,设置一个超时时间
28 setTimeout(() => process.exit(1), 5000)
29});

负载均衡

Round-Robin:每次选择第 i = (i + 1) % n 个进程

状态共享

进程间不能共享数据

  1. 第三方数据存储:redis

    redis

    不能即时同步状态,需要轮询

  2. 主动通知

    pub sub

    在 redis 和工作进程之间加一个 pub/sub

cluster

cluster 是 child_process 和 net 的组合应用,cluster 启动时,会在内部启动 TCP 服务器,在 cluster.fork 子进程时,将这个 TCP 服务器端 socket 的文件描述符发送给工作线程,如果进程是 fork 出来的,进程的环境变量里就有 NODE_UNIQUE_ID,如果工作进程中有 listen 监听端口调用,进程就能拿到文件描述符,通过 SO_REUSEADDR 端口重用,从而实现多个子进程共享端口,对于普通方式启动的进程,则不存在文件描述符共享传递的等事情