对于近程调用
近程调用(简称 RPC)次要是指服务器或集群之间对处理过程的调用。通过近程调用能够买通不同零碎之间的数据与性能,或是抽离与建设公共的逻辑对立提供服务。近程调用的两端也称为近程调用的客户端与服务端,个别是多对多的关系,须要引入注册与发现机制进行治理,下图为最常见实际:
治理机制通常是就地取材的,能够基于 ZooKeeper 建设,本章不再开展。
gRPC
gPRC 是谷歌开源的一款跨语言高性能的 RPC 框架,底层应用 protobuf 进行数据交换,已在谷歌、奈非、思科等企业大规模利用。以下为 gRPC 的调用过程,客户端先进行 DNS 解析,再依据解析后果直连服务端或向负载平衡 / 治理服务获取服务端信息再连贯:
本章将基于上一章已实现的工程 host1-tech/nodejs-server-examples – 11-schedule 退出近程调用性能实现一个简略音讯的输出与输入,当初到工程根目录装置 grpc 相干模块:
$ yarn add @grpc/grpc-js @grpc/proto-loader # 本地装置 @grpc/grpc-js、@grpc/proto-loader
# ...
info Direct dependencies
├─ @grpc/grpc-js@1.1.3
└─ @grpc/proto-loader@0.5.5
# ...
加上近程调用
通过 .proto
文件定义近程接口并写入基于该定义的 gRPC 客户端与服务端:
$ mkdir src/rpc # 新建 src/rpc 寄存近程调用逻辑
$ mkdir src/rpc/echo # 新建 src/rpc/echo
$ tree src -L 1 # 展现 src 目录内容构造
src
├── config
├── controllers
├── middlewares
├── models
├── moulds
├── rpc
├── schedules
├── server.js
├── services
└── utils
// src/rpc/echo/def.proto
syntax = "proto3";
service Echo {rpc Get(EchoRequest) returns (EchoResponse) {}}
message EchoRequest {string message = 1;}
message EchoResponse {string message = 1;}
// src/rpc/echo/client.js
const {resolve} = require('path');
const {promisify} = require('util');
const protoLoader = require('@grpc/proto-loader');
const grpc = require('@grpc/grpc-js');
const {rpc} = require('../../config');
class EchoClient {
grpcClient;
async init() {
const grpcObject = grpc.loadPackageDefinition(await protoLoader.load(resolve(__dirname, 'def.proto'))
);
this.grpcClient = new grpcObject.Echo(`${rpc.domain}:${rpc.port}`,
grpc.credentials.createInsecure());
}
get = async ({s, logger}) => {const { grpcClient} = this;
const {message} = await promisify(grpcClient.get.bind(grpcClient, { message: s})
)();
logger.info('Echo/Get Invoked');
return {message};
};
}
let client;
module.exports = async () => {if (!client) {client = new EchoClient();
await client.init();}
return client;
};
// src/rpc/echo/server.js
const {resolve} = require('path');
const {callbackify} = require('util');
const protoLoader = require('@grpc/proto-loader');
const grpc = require('@grpc/grpc-js');
class EchoServer {
grpcServer;
async init() {
const grpcObject = grpc.loadPackageDefinition(await protoLoader.load(resolve(__dirname, 'def.proto'))
);
this.grpcServer.addService(grpcObject.Echo.service, this);
}
get = callbackify(async (call) => {const { message} = call.request;
return {message};
});
}
let server;
module.exports = async (grpcServer) => {if (!server) {server = new EchoServer();
Object.assign(server, { grpcServer});
await server.init();}
return server;
};
// src/config/index.js
// ...
const config = {
// 默认配置
default: {
// ...
+
+ rpc: {
+ domain: 'localhost',
+ port: process.env.PORT_RPC || 9001,
+ },
},
// ...
};
// ...
开启 gRPC 日志输入并初始化:
# .env
LOG_LEVEL='debug'
+
+GRPC_TRACE='all'
+GRPC_VERBOSITY='DEBUG'
// src/utils/logger.js
// ...
+const GRPC_LOGGER_REGEXP = /^.+Z\s+\|\s+/;
+
+function grpcLogger(logger, level = 'debug') {+ const verbosities = ['debug', 'info', 'error'];
+
+ return {+ error(severity, message) {+ if (typeof severity != 'number') {
+ message = severity;
+ severity = 0;
+ }
+
+ if (typeof message != 'string') {+ message = String(message || '');
+ }
+
+ logger[verbosities[severity] || level](+ message.replace(GRPC_LOGGER_REGEXP, '')
+ );
+ },
+ };
+}
+
module.exports = logger;
-Object.assign(module.exports, { logging});
+Object.assign(module.exports, { logging, grpcLogger});
// src/rpc/index.js
const {promisify} = require('util');
const grpc = require('@grpc/grpc-js');
const {rpc} = require('../config');
const logger = require('../utils/logger');
const echoClient = require('./echo/client');
const echoServer = require('./echo/server');
const {grpcLogger} = logger;
module.exports = async function initRpc() {grpc.setLogger(grpcLogger(logger.child({ type: 'rpc'}), 'debug'));
// init rpc servers
const grpcServer = new grpc.Server();
await echoServer(grpcServer);
await promisify(grpcServer.bindAsync.bind(grpcServer))(`0.0.0.0:${rpc.port}`,
grpc.ServerCredentials.createInsecure());
grpcServer.start();
// init rpc clients
await echoClient();};
// src/server.js
const express = require('express');
const {resolve} = require('path');
const {promisify} = require('util');
const initMiddlewares = require('./middlewares');
const initControllers = require('./controllers');
const initSchedules = require('./schedules');
+const initRpc = require('./rpc');
const logger = require('./utils/logger');
const server = express();
const port = parseInt(process.env.PORT || '9000');
const publicDir = resolve('public');
const mouldsDir = resolve('src/moulds');
async function bootstrap() {+ await initRpc();
server.use(await initMiddlewares());
server.use(express.static(publicDir));
server.use('/moulds', express.static(mouldsDir));
server.use(await initControllers());
server.use(errorHandler);
await initSchedules();
await promisify(server.listen.bind(server, port))();
logger.info(`> Started on port ${port}`);
}
// ...
增加 gRPC 客户端 logger 与管制层入口:
// src/middlewares/trace.js
const {v4: uuid} = require('uuid');
const morgan = require('morgan');
const onFinished = require('on-finished');
const logger = require('../utils/logger');
const {logging} = logger;
module.exports = function traceMiddleware() {
return [morgan('common', { skip: () => true }),
(req, res, next) => {req.uuid = uuid();
req.logger = logger.child({uuid: req.uuid});
req.loggerSql = req.logger.child({type: 'sql'});
req.logging = logging(req.loggerSql, 'info');
+ req.loggerRpc = req.logger.child({type: 'rpc'});
onFinished(res, () => {// ...});
next();},
];
};
// src/controllers/echo.js
const {Router} = require('express');
const cc = require('../utils/cc');
const rpcEchoClient = require('../rpc/echo/client');
class EchoController {
rpcEchoClient;
async init() {this.rpcEchoClient = await rpcEchoClient();
const router = Router();
router.get('/', this.get);
return router;
}
get = cc(async (req, res) => {const { s = ''} = req.query;
const message = await this.rpcEchoClient.get({s, logger: req.loggerRpc});
res.send({success: true, message});
});
}
module.exports = async () => {const c = new EchoController();
return await c.init();};
// src/controllers/index.js
const {Router} = require('express');
const shopController = require('./shop');
const chaosController = require('./chaos');
const healthController = require('./health');
const loginController = require('./login');
const csrfController = require('./csrf');
+const echoController = require('./echo');
module.exports = async function initControllers() {const router = Router();
router.use('/api/shop', await shopController());
router.use('/api/chaos', await chaosController());
router.use('/api/health', await healthController());
router.use('/api/login', await loginController());
router.use('/api/csrf', await csrfController());
+ router.use('/api/echo', await echoController());
return router;
};
拜访 http://localhost:9000/api/echo?s=Hello%20RPC 即可看到成果:
同时在命令行可能看到充沛的 gRPC 日志:
# ...
08:20:52.320Z DEBUG 12-rpc: dns_resolver | Resolver constructed for target dns:0.0.0.0:9001 (type=rpc)
08:20:52.321Z DEBUG 12-rpc: dns_resolver | Resolution update requested for target dns:0.0.0.0:9001 (type=rpc)
08:20:52.321Z DEBUG 12-rpc: dns_resolver | Returning IP address for target dns:0.0.0.0:9001 (type=rpc)
08:20:52.322Z DEBUG 12-rpc: server | Attempting to bind 0.0.0.0:9001 (type=rpc)
08:20:52.324Z DEBUG 12-rpc: server | Successfully bound 0.0.0.0:9001 (type=rpc)
08:20:52.327Z DEBUG 12-rpc: resolving_load_balancer | dns:localhost:9001 IDLE -> IDLE (type=rpc)
08:20:52.327Z DEBUG 12-rpc: connectivity_state | dns:localhost:9001 IDLE -> IDLE (type=rpc)
08:20:52.327Z DEBUG 12-rpc: dns_resolver | Resolver constructed for target dns:localhost:9001 (type=rpc)
# ...
本章源码
host1-tech/nodejs-server-examples – 12-rpc
更多浏览
从零搭建 Node.js 企业级 Web 服务器(零):动态服务
从零搭建 Node.js 企业级 Web 服务器(一):接口与分层
从零搭建 Node.js 企业级 Web 服务器(二):校验
从零搭建 Node.js 企业级 Web 服务器(三):中间件
从零搭建 Node.js 企业级 Web 服务器(四):异样解决
从零搭建 Node.js 企业级 Web 服务器(五):数据库拜访
从零搭建 Node.js 企业级 Web 服务器(六):会话
从零搭建 Node.js 企业级 Web 服务器(七):认证登录
从零搭建 Node.js 企业级 Web 服务器(八):网络安全
从零搭建 Node.js 企业级 Web 服务器(九):配置项
从零搭建 Node.js 企业级 Web 服务器(十):日志
从零搭建 Node.js 企业级 Web 服务器(十一):定时工作
从零搭建 Node.js 企业级 Web 服务器(十二):近程调用