Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Egg 源码解析之 egg-cluster #1

Open
OnedayLiu opened this issue Oct 16, 2017 · 0 comments
Open

Egg 源码解析之 egg-cluster #1

OnedayLiu opened this issue Oct 16, 2017 · 0 comments

Comments

@OnedayLiu
Copy link
Owner

OnedayLiu commented Oct 16, 2017

egg-cluster是什么

为了将多核CPU的性能发挥到极致,最大程度地榨干服务器资源,egg采用多进程模型,解决了一个Node.js进程只能运行在一个CPU上的问题,egg-cluster是用于egg多进程管理的基础模块,负责底层的IPC通道的建立以及处理各进程的通信

egg多进程模型

image

  • master 主进程
  • worker master的子进程,一般是根据服务器有多少个CPU启动多少个这样的worker进程,主要用于对外服务,处理各种业务层面的事情
  • agent master的子进程,主要处理公共资源的访问,如文件监听,或者帮worker处理一些公共事务,如一些事情是不需要每个worker都做一次的,agent帮忙做完之后通知它们执行之后的操作

master类似于一个守护进程的存在:

  • 负责agent的启动、退出、重启
  • 负责各个worker进程的启动、退出、以及refork,在开发模式下负责重启
  • 负责agent和各个worker之间的通信
  • 负责各个worker之间的通信

各进程的启动顺序:

  • master启动后先启动agent进程
  • agent初始化成功后,通过IPC通道通知master
  • master根据CPU的个数启动相同数目的worker进程
  • worker进程初始化成功后,通过IPC通道通知master
  • 所有的进程初始化成功后,master通知agent和各个worker进程应用启动成功

启动方式差异:

从上图可以看出,master启动agentworker的方式明显不一样,启动agent使用的是child_process的fork模式,启动各个worker使用的是cluster的fork模式,为什么不能都使用同一种方式来启动?因为它们所负责处理的事情性质是不一样的,agent是类似于作为各个worker秘书的存在,只负责帮它们处理轻量级的服务,是不直接对外提供http访问的,所以mastercluster.fork把各个worker启动起来,并提供对外http访问,这些workercluster的预处理下能够对同一端口进行监听而不会产生端口冲突,同时使用round-robin策略进行负载均衡把收到的http请求合理地分配给各个worker进行处理

进程间通信:

masteragent/worker是real communication,agentworker之间以及各个worker之间是virtual communication

  • master继承了events模块,拥有events监听、发送消息的能力,master进程自身是通过订阅者模式来进行事务处理的,所以在master的源码里面并没有看到过多的callback hell
  • masteragent的父进程,相互可以通过IPC通道进行通信
  • masterworker的父进程,相互可以通过IPC通道进行通信
  • agent和各个worker之间毕竟是不同进程,是无法直接进行通信的,所以需要借助master的力量进行转发,egg-cluster封装了一个messenger的工具类,对各个进程间消息转发进行了封装
  • 各个worker之间由于是不同进程,也是无法直接进行通信的,需要借助master的力量进行转发,原理同上

各进程的状态通知

  • worker启动成功后master会对其状态进行监听,对于退出或者失联的worker master是清楚的,在这情况下master会对这些worker之前所绑定的事件进行销毁防止内存泄露,并且通知agent,最后refork出同等数量的worker保证业务的顺利进行,对worker的fork和refork操作都是通过工具类cfork进行的
  • agent启动成功后master会对其状态进行监听,对于退出或者失联的agent master是清楚的,在这情况下master会对这些agent之前所绑定的事件进行销毁防止内存泄露,并且通知各个worker,最后重启agent进程保证业务的顺利进行
  • master退出了或者失联了,worker怎么办?不用担心,cluster已经做好了这样的处理,当父进程退出后子进程自动退出
  • master退出了或者失联了,agent也像worker一样退出吗?然而并不是!这是child_process.forkcluster.fork的不同之处,master退出了或者失联了,agent进程还继续运行,但是它的父进程已经不在了,它将会被init进程收养,从而成为孤儿进程,当这样的孤儿进程越来越多的时候服务器就会越来越卡。所以master退出后需要指定agent也一起退出!

开发模式

开发模式下agent会监听相关文件的改动,然后通知masterworker进行重启操作

开发模式下开启egg-development插件,对相关文件进行监听,监听到有文件改动的话向master发送reload-worker事件

Talk is cheap. Show me the code

准备工作

写这篇文章的时候egg社区版最新版是 1.6.0 ,下面的内容以该版本为准

读源码前需要理解两个模块的作用:

  • messenger,负责masteragentworkerIPC通信的消息转发
  • cfork,负责worker的启动,状态监听以及refork操作

egg是通过index.js作为入口文件进行启动的,输入以下代码然后就可以成功启动了

const egg = require('egg');
egg.startCluster(options, () => {
  console.log('started');
});

入口文件代码如此简单,那egg底层做了些什么?比如egg.startCluster这个方法里面做了些什么?

exports.startCluster = require('egg-cluster').startCluster;

原来egg.startClusteregg-cluster模块暴露的一个API

// egg-cluster/index.js
const Master = require('./lib/master');
exports.startCluster = function(options, callback) {
  new Master(options).ready(callback);
};

startCluster主要做了这些事情

  • 启动master进程
  • egg启动成功后执行callback方法,比如希望在egg启动成功后执行一些业务上的初始化操作

Master(egg-cluster/lib/master.js)

// Master继承了events模块,拥有events监听、发送消息的能力
class Master extends EventEmitter {} 

Master#constructor

constructor里面大致可以分为5个部分:

constructor(options) {
  super();
  this.options = parseOptions(options);
  // new一个Messenger实例
  this.messenger = new Messenger(this);
  // 借用ready模块的方法
  ready.mixin(this);
  this.isProduction = isProduction();
  this.isDebug = isDebug();
  ...
  ...
  // 根据不同运行环境(local、test、prod)设置日志输出级别
  this.logger = new ConsoleLogger({ level: process.env.EGG_MASTER_LOGGER_LEVEL || 'INFO' });
  ...
}
// master启动成功后通知parent、app worker、agent
this.ready(() => {
  this.isStarted = true;
  const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
  this.logger.info('[master] %s started on %s://127.0.0.1:%s (%sms)%s',
  frameworkPkg.name, this.options.https ? 'https' : 'http',
  this.options.port, Date.now() - startTime, stickyMsg);

  const action = 'egg-ready';
  this.messenger.send({ action, to: 'parent' });
  this.messenger.send({ action, to: 'app', data: this.options });
  this.messenger.send({ action, to: 'agent', data: this.options });
});
// 监听agent退出
this.on('agent-exit', this.onAgentExit.bind(this));
// 监听agent启动
this.on('agent-start', this.onAgentStart.bind(this));
// 监听app worker退出
this.on('app-exit', this.onAppExit.bind(this));
// 监听app worker启动
this.on('app-start', this.onAppStart.bind(this));
// 开发环境下监听app worker重启
this.on('reload-worker', this.onReload.bind(this));

// 监听agent启动,注意这里只执行一次
this.once('agent-start', this.forkAppWorkers.bind(this));
// master监听自身的退出及退出后的处理

// kill(2) Ctrl-C     监听SIGINT信号
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\     监听SIGQUIT信号
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default   监听SIGTERM信号
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));

// 监听exit事件
process.once('exit', this.onExit.bind(this));
// 监听端口冲突
detectPort((err, port) => {
  /* istanbul ignore if */
  if (err) {
    err.name = 'ClusterPortConflictError';
    err.message = '[master] try get free port error, ' + err.message;
    this.logger.error(err);
    process.exit(1);
    return;
  }
  this.options.clusterPort = port;
  this.forkAgentWorker(); // 如果端口没有冲突则执行该方法
});

Master#forkAgentWorker

master进程以child_process模式启动agent进程

forkAgentWorker() {
  this.agentStartTime = Date.now();
  const args = [ JSON.stringify(this.options) ];
  const opt = { execArgv: process.execArgv.concat([ '--debug-port=5856' ]) };
  
  // 以child_process.fork模式启动agent worker,此时agent成为master的子进程
  const agentWorker = this.agentWorker = childprocess.fork(agentWorkerFile, args, opt);
  
  // 记录agent的id
  agentWorker.id = ++this.agentWorkerIndex;
  
  this.log('[master] agent_worker#%s:%s start with clusterPort:%s',
  agentWorker.id, agentWorker.pid, this.options.clusterPort);

  // master监听从agent发送给master的消息, 并打上消息来源(msg.from = 'agent')
  // 将消息通过messenger发送出去
  agentWorker.on('message', msg => {
	if (typeof msg === 'string') msg = { action: msg, data: msg };
    msg.from = 'agent';
    this.messenger.send(msg);
  });
  
  // master监听agent的异常,并打上对应的log信息方便问题排查
  agentWorker.on('error', err => {
    err.name = 'AgentWorkerError';
    err.id = agentWorker.id;
    err.pid = agentWorker.pid;
    this.logger.error(err);
  });
  
  // master监听agent的退出
  // 并通过messenger发送agent的'agent-exit'事件给master
  // 告诉master说agent退出了
  agentWorker.once('exit', (code, signal) => {
    this.messenger.send({
      action: 'agent-exit',
      data: { code, signal },
      to: 'master',
      from: 'agent',
    });
  });
}

到这里,agent worker已完成启动,并且master对其进行监听,这里有个疑问

agent启动成功后是如何通知master进行下一步操作的?

const agentWorker = this.agentWorker = childprocess.fork(agentWorkerFile, args, opt);

child_process.fork模式启动agent worker,读取的是agent_worker.js,截取里面的一段代码

// egg-cluster/lib/agent_worker.js

agent.ready(() => {
  agent.removeListener('error', startErrorHandler);
  process.send({ action: 'agent-start', to: 'master' });
});

agent启动成功后调用process.send()通知mastermaster监听到该消息通过messenger转发出去

// Master#forkAgentWorker
agentWorker.on('message', msg => {
  if (typeof msg === 'string') msg = { action: msg, data: msg };
  msg.from = 'agent';
  this.messenger.send(msg);
});

最终由master进行agent-start事件的响应

// Master#constructor
...
...
this.on('agent-start', this.onAgentStart.bind(this));
...
this.once('agent-start', this.forkAppWorkers.bind(this));
...
...

Master#onAgentStart

agent启动后的操作

onAgentStart() {
  // agent启动成功后向app worker发送'egg-pids'事件并带上agent pid
  this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] });
  // 向app worker发送'agent-start'事件
  this.messenger.send({ action: 'agent-start', to: 'app' });
  this.logger.info('[master] agent_worker#%s:%s started (%sms)',
  this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
}

值得注意的是此时app worker还没启动,所以该消息会被丢弃,后续如果发生agent重启的情况会被app worker监听到

Master#forkAppWorkers

master进程以cluster模式启动app worker进程

forkAppWorkers() {
  this.appStartTime = Date.now();
  this.isAllAppWorkerStarted = false;
  this.startSuccessCount = 0;

  this.workers = new Map();

  const args = [ JSON.stringify(this.options) ];
  this.log('[master] start appWorker with args %j', args);
  
  // 以cluster模式启动app worker进程
  cfork({
    exec: appWorkerFile,
    args,
    silent: false,
    count: this.options.workers,
    // 在开发环境下不会进行refork,方便排查问题
    refork: this.isProduction,
  });

  // master监听各个app worker进程的消息
  cluster.on('fork', worker => {
    this.workers.set(worker.process.pid, worker);
    worker.on('message', msg => {
      if (typeof msg === 'string') msg = { action: msg, data: msg };
      msg.from = 'app';
      this.messenger.send(msg);
    });
    this.log('[master] app_worker#%s:%s start, state: %s, current workers: %j',
  worker.id, worker.process.pid, worker.state, Object.keys(cluster.workers));
  });
  
  // master监听各个app worker进程的disconnect事件并记录到log
  cluster.on('disconnect', worker => {
    this.logger.info('[master] app_worker#%s:%s disconnect, suicide: %s, state: %s, current workers: %j',
    worker.id, worker.process.pid, worker.exitedAfterDisconnect, worker.state, Object.keys(cluster.workers));
  });
  
  // master监听各个app worker进程的exit事件,并向master发送'app-exit'事件,将app worker退出后的事情交给master处理
  cluster.on('exit', (worker, code, signal) => {
    this.messenger.send({
      action: 'app-exit',
      data: { workerPid: worker.process.pid, code, signal },
      to: 'master',
      from: 'app',
    });
  });
  
  // master监听各个app worker进程的listening事件,表示各个app worker已经可以开始工作了
  cluster.on('listening', (worker, address) => {
    this.messenger.send({
      action: 'app-start',
      data: { workerPid: worker.process.pid, address },
      to: 'master',
      from: 'app',
    });
  });
}

app worker启动后,跟agent一样,通过messengerapp-start事件发送给master,由master继续处理

// Master#constructor

...
...
this.on('app-start', this.onAppStart.bind(this));
...
...

Master#onAppStart

app worker启动后的操作

onAppStart(data) {
  const worker = this.workers.get(data.workerPid);
  
  ...
  
  // app worker启动成功后通知agent
  this.messenger.send({
   action: 'egg-pids',
   to: 'agent',
   data: getListeningWorker(this.workers),
  });
  
  ...
  
  // app worker准备好了
  if (this.options.sticky) {
   this.startMasterSocketServer(err => {
     if (err) return this.ready(err);
     this.ready(true);
   });
  } else {
   this.ready(true);
  }
}

这时agent和各个app worker已经ready了,master也可以做好准备了,执行ready后的操作,把egg-ready事件发送给parentappagent,告诉它们已经ready了,可以开始干活

this.ready(() => {
  ...
  const action = 'egg-ready';
  this.messenger.send({ action, to: 'parent' });
  this.messenger.send({ action, to: 'app', data: this.options });
  this.messenger.send({ action, to: 'agent', data: this.options });
});

Master#onAgentExit

agent退出后的处理

onAgentExit(data) {
  ...
  // 告诉各个app worker,agent退出了
  this.messenger.send({ action: 'egg-pids', to: 'app', data: [] });
  
  ...
  // 记录异常信息,方便问题排查
  const err = new Error(util.format('[master] agent_worker#%s:%s died (code: %s, signal: %s)',
      agentWorker.id, agentWorker.pid, data.code, data.signal));
    err.name = 'AgentWorkerDiedError';
  this.logger.error(err);
  
  // 移除事件监听,防止内存泄露
  agentWorker.removeAllListeners();
  
  ...
  // 把'agent-worker-died'通知parent进程后重启agent进程
  this.log('[master] try to start a new agent_worker after 1s ...');
  setTimeout(() => {
    this.logger.info('[master] new agent_worker starting...');
    this.forkAgentWorker();
  }, 1000);
  this.messenger.send({
    action: 'agent-worker-died',
    to: 'parent',
  });
}

Master#onAppExit

app worker退出后的处理

onAppExit(data) {
  ...
  // 记录异常信息,方便问题排查
  if (!worker.isDevReload) {
    const signal = data.code;
    const message = util.format(
   '[master] app_worker#%s:%s died (code: %s, signal: %s, suicide: %s, state: %s), current workers: %j',
   worker.id, worker.process.pid, worker.process.exitCode, signal,
   worker.exitedAfterDisconnect, worker.state,
    Object.keys(cluster.workers)
    );
    const err = new Error(message);
    err.name = 'AppWorkerDiedError';
    this.logger.error(err);
  }
  
  // 移除事件监听,防止内存泄露
  worker.removeAllListeners();
  this.workers.delete(data.workerPid);
  
  // 发送'egg-pids'事件给agent,告诉它目前处于alive状态的app worker pid
  this.messenger.send({ action: 'egg-pids', to: 'agent', data: getListeningWorker(this.workers) });
  
  // 发送'app-worker-died'的消息给parent进程
  this.messenger.send({
    action: 'app-worker-died',
    to: 'parent',
  });
}

Master#onReload

开发模式下监听文件的改动,对app worker进行重启操作

  • 开发模式下开启egg-development插件,对相关文件进行监听,监听到有文件改动的话向master发送reload-worker事件
process.send({
  to: 'master',
  action: 'reload-worker',
});
  • master通过监听reload-worker事件后执行onReload方法
this.on('reload-worker', this.onReload.bind(this));
  • onReload通过cluster-reload模块进行重启操作
onReload() {
  this.log('[master] reload workers...');
  for (const id in cluster.workers) {
    const worker = cluster.workers[id];
    worker.isDevReload = true;
  }
  require('cluster-reload')(this.options.workers);
}

Master#onExit

master退出后的处理,该方法主要是打相关的log

Master#onSignal和Master#close

测试的时候,master对收到的各个系统signal进行响应

// kill(2) Ctrl-C
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
  • 杀死各个app worker进程
  • 杀死agent进程
  • 退出master进程
close() {
  this.closed = true;
  this.killAppWorkers();
  this.killAgentWorker();
  this.log('[master] close done, exiting with code:0');
  process.exit(0);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant