乐趣区

关于javascript:更优雅地用-JS-进行-IPC-调用我写了-eventinvoke-库

背景

团队最近有一个 Node.js 全新的模块须要开发,波及多过程的治理和通信,简化模型能够了解为须要频繁从 master 过程调用 worker 过程的某些办法,简略设计实现了一个 event-invoke 的库,能够简略优雅进行调用。

Node.js 提供了 child_process 模块,在 master 过程通过 fork / spawn 等办法调用能够创立 worker 过程并获取其对象(简称 cp)。父子过程会建设 IPC 通道,在 master 过程中能够应用 cp.send() 给 worker 过程发送 IPC 音讯,而在 worker 过程中也能够通过 process.send() 给父过程发送 IPC 音讯,达到双工通信的目标。(过程治理波及更简单的工作,本文暂不波及)

最小实现

基于以上前提,借助 IPC 通道和过程对象,咱们能够通过事件驱动的形式实现过程间的通信,只须要简略的几行代码,就能实现根本调用逻辑,例如:

// master.js
const child_process = require('child_process');
const cp = child_process.fork('./worker.js');

function invoke() {cp.send({ name: 'methodA', args: [] });
  cp.on('message', (packet) => {console.log('result: %j', packet.payload);
  });
}

invoke();

// worker.js
const methodMap = {methodA() {}}

cp.on('message', async (packet) => {const { name, args} = packet;
  const result = await methodMap[name)(...args);
  process.send({name, payload: result});
});

仔细分析上述代码实现,直观感触 invoke 调用并不优雅,并且当调用量较大时,会创立很多的 message 监听器,并且要保障申请和响应是一一对应,须要做很多额定的设计。 心愿设计一个简略现实的形式,只需提供 invoke 办法,传入办法名和参数,返回一个 Promise,像调用本地办法那样进行 IPC 调用,而不必思考音讯通信的细节。

// 假想中的 IPC 调用
const res1 = await invoker.invoke('sleep', 1000);
console.log('sleep 1000ms:', res1);
const res2 = await invoker.invoke('max', [1, 2, 3]); // 3
console.log('max(1, 2, 3):', res2);

流程设计

从调用的模型看,能够将角色形象为 Invoker 和 Callee,别离对应服务调用方和提供方,将音讯通信的细节能够封装在外部。parent_process 和 child_process 的通信桥梁是操作系统提供的 IPC 通道,单纯从 API 的视角看,能够简化为两个 Event 对象(主过程为 cp,子过程为 process)。Event 对象作为两头的双工通道两端,暂且命名为 InvokerChannel 和 CalleeChannel。

要害实体和流程如下:

  • Callee 中注册可被调用的所有办法,并保留在 functionMap
  • 用户调用 Invoker.invoke() 时:

    • 创立一个 promise 对象,返回给用户,同时将其保留在 promiseMap 中
    • 每次调用生成一个 id,保障调用和执行后果是一一对应的
    • 进行超时管制,超时的工作间接执行 reject 该 promise
  • Invoker 通过 Channel 把调用办法音讯发送给 Callee
  • Callee 解析收到的音讯,通过 name 执行对应办法,并将后果和实现状态(胜利 or 异样)通过 Channel 发送音讯给 Invoker
  • Invoker 解析音讯,通过 id+name 找到对应的 promise 对象,胜利则 resolve,失败则 reject

实际上,这个设计不仅实用 IPC 调用,在浏览器的场景下也能间接失去很好的利用,比如说跨 iframe 的调用能够包装 window.postMessage(),跨标签页调用能够应用 storage 事件,以及 Web worker 中可借助 worker.postMessage() 作为通信的桥梁。

疾速开始

基于以上设计,实现编码必然不在话下,趁着非工作工夫迅速实现开发和文档的工作,源代码:https://github.com/x-cold/event-invoke

装置依赖

npm i -S event-invoke

父子过程通信实例

示例代码:Example code

// parent.js
const cp = require('child_process');
const {Invoker} = require('event-invoke');

const invokerChannel = cp.fork('./child.js');

const invoker = new Invoker(invokerChannel);

async function main() {const res1 = await invoker.invoke('sleep', 1000);
  console.log('sleep 1000ms:', res1);
  const res2 = await invoker.invoke('max', [1, 2, 3]); // 3
  console.log('max(1, 2, 3):', res2);
  invoker.destroy();}

main();
// child.js
const {Callee} = require('event-invoke');

const calleeChannel = process;

const callee = new Callee(calleeChannel);

// async method
callee.register(async function sleep(ms) {return new Promise((resolve) => {setTimeout(resolve, ms);
  });
});

// sync method
callee.register(function max(...args) {return Math.max(...args);
});

callee.listen();

自定义 Channel 实现 PM2 过程间调用

示例代码:Example code

// pm2.config.cjs
module.exports = {
  apps: [
    {
      script: 'invoker.js',
      name: 'invoker',
      exec_mode: 'fork',
    },
    {
      script: 'callee.js',
      name: 'callee',
      exec_mode: 'fork',
    }
  ],
};
// callee.js
import net from 'net';
import pm2 from 'pm2';
import {
  Callee,
  BaseCalleeChannel
} from 'event-invoke';

const messageType = 'event-invoke';
const messageTopic = 'some topic';

class CalleeChannel extends BaseCalleeChannel {constructor() {super();
    this._onProcessMessage = this.onProcessMessage.bind(this);
    process.on('message', this._onProcessMessage);
  }

  onProcessMessage(packet) {if (packet.type !== messageType) {return;}
    this.emit('message', packet.data);
  }

  send(data) {pm2.list((err, processes) => {if (err) {throw err;}
      const list = processes.filter(p => p.name === 'invoker');
      const pmId = list[0].pm2_env.pm_id;
      pm2.sendDataToProcessId({
        id: pmId,
        type: messageType,
        topic: messageTopic,
        data,
      }, function (err, res) {if (err) {throw err;}
      });
    });
  }

  destory() {process.off('message', this._onProcessMessage);
  }
}

const channel = new CalleeChannel();
const callee = new Callee(channel);

// async method
callee.register(async function sleep(ms) {return new Promise((resolve) => {setTimeout(resolve, ms);
  });
});

// sync method
callee.register(function max(...args) {return Math.max(...args);
});

callee.listen();

// keep your process alive
net.createServer().listen();
// invoker.js
import pm2 from 'pm2';
import {
  Invoker,
  BaseInvokerChannel
} from 'event-invoke';

const messageType = 'event-invoke';
const messageTopic = 'some topic';

class InvokerChannel extends BaseInvokerChannel {constructor() {super();
    this._onProcessMessage = this.onProcessMessage.bind(this);
    process.on('message', this._onProcessMessage);
  }

  onProcessMessage(packet) {if (packet.type !== messageType) {return;}
    this.emit('message', packet.data);
  }

  send(data) {pm2.list((err, processes) => {if (err) {throw err;}
      const list = processes.filter(p => p.name === 'callee');
      const pmId = list[0].pm2_env.pm_id;
      pm2.sendDataToProcessId({
        id: pmId,
        type: messageType,
        topic: messageTopic,
        data,
      }, function (err, res) {if (err) {throw err;}
      });
    });
  }

  connect() {this.connected = true;}

  disconnect() {this.connected = false;}

  destory() {process.off('message', this._onProcessMessage);
  }
}

const channel = new InvokerChannel();
channel.connect();

const invoker = new Invoker(channel);

setInterval(async () => {const res1 = await invoker.invoke('sleep', 1000);
  console.log('sleep 1000ms:', res1);
  const res2 = await invoker.invoke('max', [1, 2, 3]); // 3
  console.log('max(1, 2, 3):', res2);
}, 5 * 1000);

下一步

目前 event-invoke 具备了优雅调用“IPC”调用的根本能力,代码覆盖率 100%,同时提供了绝对欠缺的类型形容。感兴趣的同学能够间接应用,有任何问题能够间接提 Issue。

另外一些后续仍要继续欠缺的局部:

  • 更丰盛的示例,笼罩跨 Iframe,跨标签页,Web worker 等应用场景
  • 提供开箱即用通用 Channel
  • 更敌对的异样解决
退出移动版