对于近程调用
近程调用(简称 RPC)次要是指服务器或集群之间对处理过程的调用。通过近程调用能够买通不同零碎之间的数据与性能,或是抽离与建设公共的逻辑对立提供服务。近程调用的两端也称为近程调用的客户端与服务端,个别是多对多的关系,须要引入注册与发现机制进行治理,下图为最常见实际:
治理机制通常是就地取材的,能够基于 ZooKeeper 建设,本章不再开展。
gRPC
gPRC 是谷歌开源的一款跨语言高性能的 RPC 框架,底层应用 protobuf 进行数据交换,已在谷歌、奈非、思科等企业大规模利用。以下为 gRPC 的调用过程,客户端先进行 DNS 解析,再依据解析后果直连服务端或向负载平衡/治理服务获取服务端信息再连贯:
本章将基于上一章已实现的工程 host1-tech/nodejs-server-examples - 11-schedule 退出近程调用性能实现一个简略音讯的输出与输入,当初到工程根目录装置 grpc 相干模块:
$ yarn add @grpc/grpc-js @grpc/proto-loader # 本地装置 @grpc/grpc-js、@grpc/proto-loader# ...info Direct dependencies├─ @grpc/grpc-js@1.1.3└─ @grpc/proto-loader@0.5.5# ...
加上近程调用
通过 .proto
文件定义近程接口并写入基于该定义的 gRPC 客户端与服务端:
$ mkdir src/rpc # 新建 src/rpc 寄存近程调用逻辑$ mkdir src/rpc/echo # 新建 src/rpc/echo$ tree src -L 1 # 展现 src 目录内容构造src├── config├── controllers├── middlewares├── models├── moulds├── rpc├── schedules├── server.js├── services└── utils
// src/rpc/echo/def.protosyntax = "proto3";service Echo { rpc Get(EchoRequest) returns (EchoResponse) {}}message EchoRequest { string message = 1; }message EchoResponse { string message = 1; }
// src/rpc/echo/client.jsconst { resolve } = require('path');const { promisify } = require('util');const protoLoader = require('@grpc/proto-loader');const grpc = require('@grpc/grpc-js');const { rpc } = require('../../config');class EchoClient { grpcClient; async init() { const grpcObject = grpc.loadPackageDefinition( await protoLoader.load(resolve(__dirname, 'def.proto')) ); this.grpcClient = new grpcObject.Echo( `${rpc.domain}:${rpc.port}`, grpc.credentials.createInsecure() ); } get = async ({ s, logger }) => { const { grpcClient } = this; const { message } = await promisify( grpcClient.get.bind(grpcClient, { message: s }) )(); logger.info('Echo/Get Invoked'); return { message }; };}let client;module.exports = async () => { if (!client) { client = new EchoClient(); await client.init(); } return client;};
// src/rpc/echo/server.jsconst { resolve } = require('path');const { callbackify } = require('util');const protoLoader = require('@grpc/proto-loader');const grpc = require('@grpc/grpc-js');class EchoServer { grpcServer; async init() { const grpcObject = grpc.loadPackageDefinition( await protoLoader.load(resolve(__dirname, 'def.proto')) ); this.grpcServer.addService(grpcObject.Echo.service, this); } get = callbackify(async (call) => { const { message } = call.request; return { message }; });}let server;module.exports = async (grpcServer) => { if (!server) { server = new EchoServer(); Object.assign(server, { grpcServer }); await server.init(); } return server;};
// src/config/index.js// ...const config = { // 默认配置 default: { // ...+ + rpc: {+ domain: 'localhost',+ port: process.env.PORT_RPC || 9001,+ }, }, // ...};// ...
开启 gRPC 日志输入并初始化:
# .envLOG_LEVEL='debug'++GRPC_TRACE='all'+GRPC_VERBOSITY='DEBUG'
// src/utils/logger.js// ...+const GRPC_LOGGER_REGEXP = /^.+Z\s+\|\s+/;++function grpcLogger(logger, level = 'debug') {+ const verbosities = ['debug', 'info', 'error'];++ return {+ error(severity, message) {+ if (typeof severity != 'number') {+ message = severity;+ severity = 0;+ }++ if (typeof message != 'string') {+ message = String(message || '');+ }++ logger[verbosities[severity] || level](+ message.replace(GRPC_LOGGER_REGEXP, '')+ );+ },+ };+}+module.exports = logger;-Object.assign(module.exports, { logging });+Object.assign(module.exports, { logging, grpcLogger });
// src/rpc/index.jsconst { promisify } = require('util');const grpc = require('@grpc/grpc-js');const { rpc } = require('../config');const logger = require('../utils/logger');const echoClient = require('./echo/client');const echoServer = require('./echo/server');const { grpcLogger } = logger;module.exports = async function initRpc() { grpc.setLogger(grpcLogger(logger.child({ type: 'rpc' }), 'debug')); // init rpc servers const grpcServer = new grpc.Server(); await echoServer(grpcServer); await promisify(grpcServer.bindAsync.bind(grpcServer))( `0.0.0.0:${rpc.port}`, grpc.ServerCredentials.createInsecure() ); grpcServer.start(); // init rpc clients await echoClient();};
// src/server.jsconst express = require('express');const { resolve } = require('path');const { promisify } = require('util');const initMiddlewares = require('./middlewares');const initControllers = require('./controllers');const initSchedules = require('./schedules');+const initRpc = require('./rpc');const logger = require('./utils/logger');const server = express();const port = parseInt(process.env.PORT || '9000');const publicDir = resolve('public');const mouldsDir = resolve('src/moulds');async function bootstrap() {+ await initRpc(); server.use(await initMiddlewares()); server.use(express.static(publicDir)); server.use('/moulds', express.static(mouldsDir)); server.use(await initControllers()); server.use(errorHandler); await initSchedules(); await promisify(server.listen.bind(server, port))(); logger.info(`> Started on port ${port}`);}// ...
增加 gRPC 客户端 logger 与管制层入口:
// src/middlewares/trace.jsconst { v4: uuid } = require('uuid');const morgan = require('morgan');const onFinished = require('on-finished');const logger = require('../utils/logger');const { logging } = logger;module.exports = function traceMiddleware() { return [ morgan('common', { skip: () => true }), (req, res, next) => { req.uuid = uuid(); req.logger = logger.child({ uuid: req.uuid }); req.loggerSql = req.logger.child({ type: 'sql' }); req.logging = logging(req.loggerSql, 'info');+ req.loggerRpc = req.logger.child({ type: 'rpc' }); onFinished(res, () => { // ... }); next(); }, ];};
// src/controllers/echo.jsconst { Router } = require('express');const cc = require('../utils/cc');const rpcEchoClient = require('../rpc/echo/client');class EchoController { rpcEchoClient; async init() { this.rpcEchoClient = await rpcEchoClient(); const router = Router(); router.get('/', this.get); return router; } get = cc(async (req, res) => { const { s = '' } = req.query; const message = await this.rpcEchoClient.get({ s, logger: req.loggerRpc }); res.send({ success: true, message }); });}module.exports = async () => { const c = new EchoController(); return await c.init();};
// src/controllers/index.jsconst { Router } = require('express');const shopController = require('./shop');const chaosController = require('./chaos');const healthController = require('./health');const loginController = require('./login');const csrfController = require('./csrf');+const echoController = require('./echo');module.exports = async function initControllers() { const router = Router(); router.use('/api/shop', await shopController()); router.use('/api/chaos', await chaosController()); router.use('/api/health', await healthController()); router.use('/api/login', await loginController()); router.use('/api/csrf', await csrfController());+ router.use('/api/echo', await echoController()); return router;};
拜访 http://localhost:9000/api/echo?s=Hello%20RPC 即可看到成果:
同时在命令行可能看到充沛的 gRPC 日志:
# ...08:20:52.320Z DEBUG 12-rpc: dns_resolver | Resolver constructed for target dns:0.0.0.0:9001 (type=rpc)08:20:52.321Z DEBUG 12-rpc: dns_resolver | Resolution update requested for target dns:0.0.0.0:9001 (type=rpc)08:20:52.321Z DEBUG 12-rpc: dns_resolver | Returning IP address for target dns:0.0.0.0:9001 (type=rpc)08:20:52.322Z DEBUG 12-rpc: server | Attempting to bind 0.0.0.0:9001 (type=rpc)08:20:52.324Z DEBUG 12-rpc: server | Successfully bound 0.0.0.0:9001 (type=rpc)08:20:52.327Z DEBUG 12-rpc: resolving_load_balancer | dns:localhost:9001 IDLE -> IDLE (type=rpc)08:20:52.327Z DEBUG 12-rpc: connectivity_state | dns:localhost:9001 IDLE -> IDLE (type=rpc)08:20:52.327Z DEBUG 12-rpc: dns_resolver | Resolver constructed for target dns:localhost:9001 (type=rpc)# ...
本章源码
host1-tech/nodejs-server-examples - 12-rpc
更多浏览
从零搭建 Node.js 企业级 Web 服务器(零):动态服务
从零搭建 Node.js 企业级 Web 服务器(一):接口与分层
从零搭建 Node.js 企业级 Web 服务器(二):校验
从零搭建 Node.js 企业级 Web 服务器(三):中间件
从零搭建 Node.js 企业级 Web 服务器(四):异样解决
从零搭建 Node.js 企业级 Web 服务器(五):数据库拜访
从零搭建 Node.js 企业级 Web 服务器(六):会话
从零搭建 Node.js 企业级 Web 服务器(七):认证登录
从零搭建 Node.js 企业级 Web 服务器(八):网络安全
从零搭建 Node.js 企业级 Web 服务器(九):配置项
从零搭建 Node.js 企业级 Web 服务器(十):日志
从零搭建 Node.js 企业级 Web 服务器(十一):定时工作
从零搭建 Node.js 企业级 Web 服务器(十二):近程调用