通过源码解析 Node.js 中 cluster 模块的主要功能实现

众所周知,Node.js 中的 JavaScript 代码执行在单线程中,非常脆弱,一旦出现了未捕获的异常,那么整个应用就会崩溃。这在许多场景下,尤其是 web 应用中,是无法忍受的。通常的解决方案,便是使用 Node.js 中自带的 cluster 模块,以 master-worker 模式启动多个应用实例。然而大家在享受 cluster 模块带来的福祉的同时,不少人也开始好奇:

为什么我的应用代码中明明有 app.listen(port); ,但 cluter 模块在多次 fork 这份代码时,却没有报端口已被占用? Master 是如何将接收的请求传递至 worker 中进行处理然后响应的?

让我们从 Node.js 项目的 lib/cluster.js 中的代码里,来一勘究竟。

问题一:

为了得到这个问题的解答,我们先从 worker 进程的初始化看起,master 进程在 fork 工作进程时,会为其附上环境变量 NODE_UNIQUE_ID,是一个从零开始的递增数:

// lib/cluster.js
// ...

function createWorkerProcess(id, env) {  
  // ...
  workerEnv.NODE_UNIQUE_ID = '' + id;  
  // ...
  return fork(cluster.settings.exec, cluster.settings.args, {
    env: workerEnv,
    silent: cluster.settings.silent,
    execArgv: execArgv,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

随后 Node.js 在初始化时,会根据该环境变量,来判断该进程是否为 cluster 模块 fork 出的工作进程,若是,则执行 workerInit() 函数来初始化环境,否则执行 masterInit() 函数。

workerInit() 函数中,定义了 cluster._getServer 方法,这个方法在任何 net.Server 实例的 listen 方法中,会被调用:

// lib/net.js
// ...

function listen(self, address, port, addressType, backlog, fd, exclusive) {  
  exclusive = !!exclusive;  
  if (!cluster) cluster = require('cluster');  
  if (cluster.isMaster || exclusive) {
    self._listen2(address, port, addressType, backlog, fd);         
    return;
  }

  cluster._getServer(self, {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags: 0
  }, cb);  function cb(err, handle) {    
    // ...
    self._handle = handle;
    self._listen2(address, port, addressType, backlog, fd);
  }
}

你可能已经猜到,问题一的答案,就在这个 cluster._getServer 函数的代码中。它主要干了两件事:

向 master 进程注册该 worker,若 master 进程是第一次接收到监听此端口/描述符下的 worker,则起一个内部 TCP 服务器,来承担监听该端口/描述符的职责,随后在 master 中记录下该 worker。 Hack 掉 worker 进程中的 net.Server 实例的 listen 方法里监听端口/描述符的部分,使其不再承担该职责。

对于第一件事,由于 master 在接收,传递请求给 worker 时,会符合一定的负载均衡规则(在非 Windows 平台下默认为轮询),这些逻辑被封装在 RoundRobinHandle 类中。故,初始化内部 TCP 服务器等操作也在此处:

// lib/cluster.js
// ...

function RoundRobinHandle(key, address, port, addressType, backlog, fd) {  
  // ...
  this.handles = [];  
  this.handle = null;  
  this.server = net.createServer(assert.fail); 

  if (fd >= 0) 
    this.server.listen({ fd: fd });  
  else if (port >= 0)    
    this.server.listen(port, address);  
  else
    this.server.listen(address);  // UNIX socket path.

  // ...
}

对于第二件事,由于 net.Server 实例的 listen 方法,最终会调用自身 _handle 属性下 listen 方法来完成监听动作,故在代码中修改之:

// lib/cluster.js
// ...

function rr(message, cb) {  
  // ...
  // 此处的listen函数不再做任何监听动作
  function listen(backlog) {    
    return 0;
  }
  function close() {    
    // ...
  }  
  function ref() {}  
  function unref() {}

  var handle = {
    close: close,
    listen: listen,
    ref: ref,
    unref: unref,
  };  

  // ...

  handles[key] = handle;
  cb(0, handle); // 传入这个 cb 中的 handle 将会被赋值给 net.Server 实例中的 _handle属性
}

// lib/net.js
// ...

function listen(self, address, port, addressType, backlog, fd, exclusive) {  
  // ...

  if (cluster.isMaster || exclusive) {
    self._listen2(address, port, addressType, backlog, fd); 
    return; // 仅在 worker 环境下改变
  }

  cluster._getServer(self, {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags: 0
  }, cb);  

  function cb(err, handle) {   
    // ...
    self._handle = handle; 
    // ...
  }
}

至此,第一个问题便已豁然开朗了,总结下: 端口仅由 master 进程中的内部 TCP 服务器监听了一次。 不会出现端口被重复监听报错,是由于,worker 进程中,最后执行监听端口操作的方法,已被 cluster 模块主动 hack。

问题二:

解决了问题一,问题二的解决就明朗轻松许多了。通过问题一我们已得知,监听端口的是 master 进程中创建的内部 TCP 服务器,所以第二个问题的解决,着手点就是该内部 TCP 服务器接手连接时,执行的操作。Cluster 模块的做法是,监听该内部 TCP 服务器的 connection 事件,在监听器函数里,有负载均衡地挑选出一个 worker,向其发送 newconn 内部消息(消息体对象中包含 cmd: 'NODE_CLUSTER' 属性)以及一个客户端句柄(即 connection 事件处理函数的第二个参数),相关代码如下:

// lib/cluster.js
// ...

function RoundRobinHandle(key, address, port, addressType, backlog, fd) {  
  // ...
  this.server = net.createServer(assert.fail);  

  // ...
  var self = this;  this.server.once('listening', function() {      
    // ...
    self.handle.onconnection = self.distribute.bind(self);
  });
}

RoundRobinHandle.prototype.distribute = function(err, handle) {  this.handles.push(handle);  
  var worker = this.free.shift();  
  if (worker) this.handoff(worker);
};

RoundRobinHandle.prototype.handoff = function(worker) {  
  // ...
  var message = { act: 'newconn', key: this.key };  
  var self = this;
  sendHelper(worker.process, message, handle, function(reply) {    
    // ...
  });
};

Worker 进程在接收到了 newconn 内部消息后,根据传递过来的句柄,调用实际的业务逻辑处理并返回:

// lib/cluster.js
// ...

// 该方法会在 Node.js 初始化时由 src/node.js 调用

cluster._setupWorker = function() {  
  // ...
  process.on('internalMessage', internal(worker, onmessage));    
  // ...
  function onmessage(message, handle) {
    if (message.act === 'newconn')
    onconnection(message, handle);    
    // ...
  }
};

function onconnection(message, handle) {  
  // ...
  var accepted = server !== undefined;  
  // ...
  if (accepted) server.onconnection(0, handle);
}

至此,问题二也得到了解决,也总结一下:

所有请求先同一经过内部 TCP 服务器。 在内部 TCP 服务器的请求处理逻辑中,有负载均衡地挑选出一个 worker 进程,将其发送一个 newconn 内部消息,随消息发送客户端句柄。 Worker 进程接收到此内部消息,根据客户端句柄创建 net.Socket 实例,执行具体业务逻辑,返回。

最后:

Node.js 中的 cluster 模块除了上述提到的功能外,其实还提供了非常丰富的 API 供 master 和 worker 进程之前通信,对于不同的操作系统平台,也提供了不同的默认行为。本文仅挑选了一条功能线进行了分析阐述。如果大家有闲,非常推荐完整领略一下 cluster 模块的代码实现。

参考: