对于近程调用

近程调用(简称 RPC)次要是指服务器或集群之间对处理过程的调用。通过近程调用能够买通不同零碎之间的数据与性能,或是抽离与建设公共的逻辑对立提供服务。近程调用的两端也称为近程调用的客户端与服务端,个别是多对多的关系,须要引入注册与发现机制进行治理,下图为最常见实际:

治理机制通常是就地取材的,能够基于 ZooKeeper 建设,本章不再开展。

gRPC

gPRC 是谷歌开源的一款跨语言高性能的 RPC 框架,底层应用 protobuf 进行数据交换,已在谷歌、奈非、思科等企业大规模利用。以下为 gRPC 的调用过程,客户端先进行 DNS 解析,再依据解析后果直连服务端或向负载平衡/治理服务获取服务端信息再连贯:

本章将基于上一章已实现的工程 host1-tech/nodejs-server-examples - 11-schedule 退出近程调用性能实现一个简略音讯的输出与输入,当初到工程根目录装置 grpc 相干模块:

$ yarn add @grpc/grpc-js @grpc/proto-loader # 本地装置 @grpc/grpc-js、@grpc/proto-loader# ...info Direct dependencies├─ @grpc/grpc-js@1.1.3└─ @grpc/proto-loader@0.5.5# ...

加上近程调用

通过 .proto 文件定义近程接口并写入基于该定义的 gRPC 客户端与服务端:

$ mkdir src/rpc       # 新建 src/rpc 寄存近程调用逻辑$ mkdir src/rpc/echo  # 新建 src/rpc/echo$ tree src -L 1       # 展现 src 目录内容构造src├── config├── controllers├── middlewares├── models├── moulds├── rpc├── schedules├── server.js├── services└── utils
// src/rpc/echo/def.protosyntax = "proto3";service Echo {  rpc Get(EchoRequest) returns (EchoResponse) {}}message EchoRequest { string message = 1; }message EchoResponse { string message = 1; }
// src/rpc/echo/client.jsconst { resolve } = require('path');const { promisify } = require('util');const protoLoader = require('@grpc/proto-loader');const grpc = require('@grpc/grpc-js');const { rpc } = require('../../config');class EchoClient {  grpcClient;  async init() {    const grpcObject = grpc.loadPackageDefinition(      await protoLoader.load(resolve(__dirname, 'def.proto'))    );    this.grpcClient = new grpcObject.Echo(      `${rpc.domain}:${rpc.port}`,      grpc.credentials.createInsecure()    );  }  get = async ({ s, logger }) => {    const { grpcClient } = this;    const { message } = await promisify(      grpcClient.get.bind(grpcClient, { message: s })    )();    logger.info('Echo/Get Invoked');    return { message };  };}let client;module.exports = async () => {  if (!client) {    client = new EchoClient();    await client.init();  }  return client;};
// src/rpc/echo/server.jsconst { resolve } = require('path');const { callbackify } = require('util');const protoLoader = require('@grpc/proto-loader');const grpc = require('@grpc/grpc-js');class EchoServer {  grpcServer;  async init() {    const grpcObject = grpc.loadPackageDefinition(      await protoLoader.load(resolve(__dirname, 'def.proto'))    );    this.grpcServer.addService(grpcObject.Echo.service, this);  }  get = callbackify(async (call) => {    const { message } = call.request;    return { message };  });}let server;module.exports = async (grpcServer) => {  if (!server) {    server = new EchoServer();    Object.assign(server, { grpcServer });    await server.init();  }  return server;};
// src/config/index.js// ...const config = {  // 默认配置  default: {    // ...+    +    rpc: {+      domain: 'localhost',+      port: process.env.PORT_RPC || 9001,+    },  },  // ...};// ...

开启 gRPC 日志输入并初始化:

# .envLOG_LEVEL='debug'++GRPC_TRACE='all'+GRPC_VERBOSITY='DEBUG'
// src/utils/logger.js// ...+const GRPC_LOGGER_REGEXP = /^.+Z\s+\|\s+/;++function grpcLogger(logger, level = 'debug') {+  const verbosities = ['debug', 'info', 'error'];++  return {+    error(severity, message) {+      if (typeof severity != 'number') {+        message = severity;+        severity = 0;+      }++      if (typeof message != 'string') {+        message = String(message || '');+      }++      logger[verbosities[severity] || level](+        message.replace(GRPC_LOGGER_REGEXP, '')+      );+    },+  };+}+module.exports = logger;-Object.assign(module.exports, { logging });+Object.assign(module.exports, { logging, grpcLogger });
// src/rpc/index.jsconst { promisify } = require('util');const grpc = require('@grpc/grpc-js');const { rpc } = require('../config');const logger = require('../utils/logger');const echoClient = require('./echo/client');const echoServer = require('./echo/server');const { grpcLogger } = logger;module.exports = async function initRpc() {  grpc.setLogger(grpcLogger(logger.child({ type: 'rpc' }), 'debug'));  // init rpc servers  const grpcServer = new grpc.Server();  await echoServer(grpcServer);  await promisify(grpcServer.bindAsync.bind(grpcServer))(    `0.0.0.0:${rpc.port}`,    grpc.ServerCredentials.createInsecure()  );  grpcServer.start();  // init rpc clients  await echoClient();};
// src/server.jsconst express = require('express');const { resolve } = require('path');const { promisify } = require('util');const initMiddlewares = require('./middlewares');const initControllers = require('./controllers');const initSchedules = require('./schedules');+const initRpc = require('./rpc');const logger = require('./utils/logger');const server = express();const port = parseInt(process.env.PORT || '9000');const publicDir = resolve('public');const mouldsDir = resolve('src/moulds');async function bootstrap() {+  await initRpc();  server.use(await initMiddlewares());  server.use(express.static(publicDir));  server.use('/moulds', express.static(mouldsDir));  server.use(await initControllers());  server.use(errorHandler);  await initSchedules();  await promisify(server.listen.bind(server, port))();  logger.info(`> Started on port ${port}`);}// ...

增加 gRPC 客户端 logger 与管制层入口:

// src/middlewares/trace.jsconst { v4: uuid } = require('uuid');const morgan = require('morgan');const onFinished = require('on-finished');const logger = require('../utils/logger');const { logging } = logger;module.exports = function traceMiddleware() {  return [    morgan('common', { skip: () => true }),    (req, res, next) => {      req.uuid = uuid();      req.logger = logger.child({ uuid: req.uuid });      req.loggerSql = req.logger.child({ type: 'sql' });      req.logging = logging(req.loggerSql, 'info');+      req.loggerRpc = req.logger.child({ type: 'rpc' });      onFinished(res, () => {        // ...      });      next();    },  ];};
// src/controllers/echo.jsconst { Router } = require('express');const cc = require('../utils/cc');const rpcEchoClient = require('../rpc/echo/client');class EchoController {  rpcEchoClient;  async init() {    this.rpcEchoClient = await rpcEchoClient();    const router = Router();    router.get('/', this.get);    return router;  }  get = cc(async (req, res) => {    const { s = '' } = req.query;    const message = await this.rpcEchoClient.get({ s, logger: req.loggerRpc });    res.send({ success: true, message });  });}module.exports = async () => {  const c = new EchoController();  return await c.init();};
// src/controllers/index.jsconst { Router } = require('express');const shopController = require('./shop');const chaosController = require('./chaos');const healthController = require('./health');const loginController = require('./login');const csrfController = require('./csrf');+const echoController = require('./echo');module.exports = async function initControllers() {  const router = Router();  router.use('/api/shop', await shopController());  router.use('/api/chaos', await chaosController());  router.use('/api/health', await healthController());  router.use('/api/login', await loginController());  router.use('/api/csrf', await csrfController());+  router.use('/api/echo', await echoController());  return router;};

拜访 http://localhost:9000/api/echo?s=Hello%20RPC 即可看到成果:

同时在命令行可能看到充沛的 gRPC 日志:

# ...08:20:52.320Z DEBUG 12-rpc: dns_resolver | Resolver constructed for target dns:0.0.0.0:9001 (type=rpc)08:20:52.321Z DEBUG 12-rpc: dns_resolver | Resolution update requested for target dns:0.0.0.0:9001 (type=rpc)08:20:52.321Z DEBUG 12-rpc: dns_resolver | Returning IP address for target dns:0.0.0.0:9001 (type=rpc)08:20:52.322Z DEBUG 12-rpc: server | Attempting to bind 0.0.0.0:9001 (type=rpc)08:20:52.324Z DEBUG 12-rpc: server | Successfully bound 0.0.0.0:9001 (type=rpc)08:20:52.327Z DEBUG 12-rpc: resolving_load_balancer | dns:localhost:9001 IDLE -> IDLE (type=rpc)08:20:52.327Z DEBUG 12-rpc: connectivity_state | dns:localhost:9001 IDLE -> IDLE (type=rpc)08:20:52.327Z DEBUG 12-rpc: dns_resolver | Resolver constructed for target dns:localhost:9001 (type=rpc)# ...

本章源码

host1-tech/nodejs-server-examples - 12-rpc

更多浏览

从零搭建 Node.js 企业级 Web 服务器(零):动态服务
从零搭建 Node.js 企业级 Web 服务器(一):接口与分层
从零搭建 Node.js 企业级 Web 服务器(二):校验
从零搭建 Node.js 企业级 Web 服务器(三):中间件
从零搭建 Node.js 企业级 Web 服务器(四):异样解决
从零搭建 Node.js 企业级 Web 服务器(五):数据库拜访
从零搭建 Node.js 企业级 Web 服务器(六):会话
从零搭建 Node.js 企业级 Web 服务器(七):认证登录
从零搭建 Node.js 企业级 Web 服务器(八):网络安全
从零搭建 Node.js 企业级 Web 服务器(九):配置项
从零搭建 Node.js 企业级 Web 服务器(十):日志
从零搭建 Node.js 企业级 Web 服务器(十一):定时工作
从零搭建 Node.js 企业级 Web 服务器(十二):近程调用