乐趣区

关于grpc:grpcnode-源码阅读笔记0

简略介绍 gRPC

贴一张挂在官网的图片:https://grpc.io/docs/what-is-…

能够了解 gRPC 是 RPC(近程过程调用)框架的一种实现,对于 RPC 的介绍因为并不是本次的主题,所以放个链接来帮忙大家了解:https://www.zhihu.com/questio…

我所了解 RPC 整个执行的过程就是 Client 调用办法 -> 序列化申请参数 -> 传输数据 -> 反序列化申请参数 -> Server 解决申请 -> 序列化返回数据 -> 传输数据 -> Client 接管到办法返回值:

其次要逻辑会集中在 数据的序列化 / 反序列化 以及 数据的传输上,而这两项 gRPC 别离选用了 Protocol Buffers 和 HTTP2 来作为默认选项。

gRPC 在 Node.js 的实现

gRPC 在 Node.js 的实现上一共有两个官网版本,一个是基于 c++ addon 的版本,另一个是纯 JS 实现的版本。

gRPC 在 Node.js 中相干的模块

除了上边提到的两个 gRPC 的实现,在 Node.js 中还存在一些其余的模块用来辅助应用 gRPC。

  • grpc-tools 这个是每个语言都会用的,用来依据 proto 文件生成对应,插件提供了 Node.js 语言的实现
  • proto-loader 用来动静加载 proto 文件,不须要应用 grpc_tools 提前生成代码(性能比上边的形式稍差)

这次笔记次要是针对 grpc-node 形式的实现,在 c++ addon 模块的实现下,并不是一个 gRPC 的残缺实现,做的事件更多的是一个连接的工作,通过 JS、c++ 两层封装将 c++ 版本的 gRPC 能力裸露进去供用户应用。

之所以抉择它是因为感觉逻辑会较 grpc-js 清晰一些,更适宜了解 gRPC 整体的运行逻辑。

在我的项目仓库中,两个目录下是咱们须要关注的:

  • src(JS 代码)
  • ext(c++ 代码)

ext 中的代码次要用于调用 c++ 版本 gRPC 的接口,并通过 NAN 提供 c++ addon 模块。
src 中的代码则是调用了 ext 编译后的模块,并进行一层利用上的封装。
而作为应用 gRPC 的用户就是援用的 src 下的文件了。

咱们先通过官网的 hello world 示例来阐明咱们是如何应用 gRPC 的,因为 gRPC 默认的数据序列化形式采纳的 protobuf,所以首先咱们须要有一个 proto 文件,而后通过 gRPC 提供的文件来生成对应的代码,生成进去的文件蕴含了 proto 中所定义的 service、method、message 等各种构造的定义,并可能让咱们用比拟相熟的形式去应用。

示例中的 proto 文件:

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}}

// The request message containing the user's name.
message HelloRequest {string name = 1;}

// The response message containing the greetings
message HelloReply {string message = 1;}

grpc_tools 是用来生成 proto 对应代码的,这个命令行工具提供了多种语言的生成版本。
在 Node 中,会生成两个文件,个别命名规定为 xxx_pb.jsxxx_grpc_pb.jsxxx_pb.js 是 proto 中各种 service、method 以及 message 的构造形容及如何应用的接口定义,而 xxx_grpc_pb.js 次要则是针对 xxx_pb.js 的一个整合,依照 proto 文件中定义的构造生成对应的代码,在用户应用的时候,应用前者多半用于结构音讯构造,应用后者则是办法的调用。

生成后的要害代码(XXX_grpc_pb.js):

const grpc = require('@grpc/grpc');
const helloworld_pb = require('./helloworld_pb.js');

function serialize_helloworld_HelloReply(arg) {if (!(arg instanceof helloworld_pb.HelloReply)) {throw new Error('Expected argument of type helloworld.HelloReply');
  }
  return Buffer.from(arg.serializeBinary());
}

function deserialize_helloworld_HelloReply(buffer_arg) {return helloworld_pb.HelloReply.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_helloworld_HelloRequest(arg) {if (!(arg instanceof helloworld_pb.HelloRequest)) {throw new Error('Expected argument of type helloworld.HelloRequest');
  }
  return Buffer.from(arg.serializeBinary());
}

function deserialize_helloworld_HelloRequest(buffer_arg) {return helloworld_pb.HelloRequest.deserializeBinary(new Uint8Array(buffer_arg));
}


// The greeting service definition.
const GreeterService = exports.GreeterService = {
  // Sends a greeting
sayHello: {
    path: '/helloworld.Greeter/SayHello',
    requestStream: false,
    responseStream: false,
    requestType: helloworld_pb.HelloRequest,
    responseType: helloworld_pb.HelloReply,
    requestSerialize: serialize_helloworld_HelloRequest,
    requestDeserialize: deserialize_helloworld_HelloRequest,
    responseSerialize: serialize_helloworld_HelloReply,
    responseDeserialize: deserialize_helloworld_HelloReply,
  },
};

exports.GreeterClient = grpc.makeGenericClientConstructor(GreeterService);

最终导出的 sayHello 就是咱们在 proto 文件中定义的 SayHello 办法,所以咱们在作为 Client 的时候应用,就是很简略的调用 sayHello 就行了:

const messages = require('./helloworld_pb');
const services = require('./helloworld_grpc_pb');
const grpc = require('grpc');

const client = new services.GreeterClient(
  target,
  grpc.credentials.createInsecure());

const request = new messages.HelloRequest();

request.setName('Niko');

client.sayHello(request, function(err, response) {console.log('Greeting:', response.getMessage());
});

其实实在写的代码也就上边的几行,实例化了一个 Client,实例化一个 Message 并构建数据,而后通过 client 调用对应的 method 传入 message,就实现了一个 gRPC 申请的发送。
在这个过程中,咱们间接可见的用到了 grpc-nodecredentials 以及 makeGenericClientConstructor,咱们就拿这两个作为入口,首先从 makeGenericClientConstructor 来说。

源码剖析

makeGenericClientConstructor

在翻看 index.js 文件中能够发现,makeGenericClientConstructor 其实是 client.makeClientConstructor 的一个别名,所以咱们须要去查看 src/client.js 中对应函数的定义,就像函数名一样,它是用来生成一个 Client 的构造函数的,这个构造函数就是咱们在上边示例中的 GreeterClient
源码所在位置:https://github.com/grpc/grpc-…

当对照着 xxx_grpc_pb.js 与源码来看时,会发现调用函数只传入了一个参数,而函数定义却存在三个参数,这个其实是历史起因导致的,咱们能够间接疏忽后边的两个参数。

精简后的源码:

exports.makeClientConstructor = function(methods) {function ServiceClient(address, credentials, options) {Client.call(this, address, credentials, options);
  }

  util.inherits(ServiceClient, Client);
  ServiceClient.prototype.$method_definitions = methods;
  ServiceClient.prototype.$method_names = {};

  Object.keys(methods).forEach(name => {const attrs = methods[name];
    if (name.indexOf('$') === 0) {throw new Error('Method names cannot start with $');
    }
    var method_type = common.getMethodType(attrs);
    var method_func = function() {return requester_funcs[method_type].apply(this,
        [attrs.path, attrs.requestSerialize, attrs.responseDeserialize]
        .concat([].slice.call(arguments))
      );
    };
    
    ServiceClient.prototype[name] = method_func;
    ServiceClient.prototype.$method_names[attrs.path] = name;
    // Associate all provided attributes with the method
    Object.assign(ServiceClient.prototype[name], attrs);
    if (attrs.originalName) {ServiceClient.prototype[attrs.originalName] =
        ServiceClient.prototype[name];
    }
  });

  ServiceClient.service = methods;

  return ServiceClient;
};

methods 参数就是咱们上边文件中生成的对象,包含服务地址、是否应用 stream、以及 申请 / 返回值 的类型及对应的序列化 / 反序列化 形式。

大抵的逻辑就是创立一个继承自 Client 的子类,而后遍历咱们整个 service 来看里边有多少个 method,并依据 method 不同的传输类型来辨别应用不同的函数进行数据的传输,最初以 method 为 key 放到 Client 子类的原型链上。

common.getMethodType 就是用来辨别 method 到底是什么类型的申请的,目前 gRPC 一共分了四种类型,双向 Stream、两个单向 Stream,以及 Unary 模式:

exports.getMethodType = function(method_definition) {if (method_definition.requestStream) {if (method_definition.responseStream) {return constants.methodTypes.BIDI_STREAMING;} else {return constants.methodTypes.CLIENT_STREAMING;}
  } else {if (method_definition.responseStream) {return constants.methodTypes.SERVER_STREAMING;} else {return constants.methodTypes.UNARY;}
  }
};

在最初几行有一处判断 originalName 是否存在的操作,这个是在 proto-loader 中存在的一个逻辑,将 methodName 转换成纯小写放了进去,单纯看正文的话,这并不是一个长期的解决方案:https://github.com/grpc/grpc-…

P.S. proto-loader 是 JS 里边一种动静加载 proto 文件的形式,性能比通过 grpc_tools 预生成代码的形式要低一些。

所有的申请形式,都被放在了一个叫做 requester_funcs 的对象中,源码中的定义是这样的:

var requester_funcs = {[methodTypes.UNARY]: Client.prototype.makeUnaryRequest,
  [methodTypes.CLIENT_STREAMING]: Client.prototype.makeClientStreamRequest,
  [methodTypes.SERVER_STREAMING]: Client.prototype.makeServerStreamRequest,
  [methodTypes.BIDI_STREAMING]: Client.prototype.makeBidiStreamRequest
};

从这里就能够看出,其实是和咱们 getMethodType 所对应的四种解决形式。

最终,将继承自 Client 的子类返回,实现了整个函数的执行。

Client

首先咱们须要看看继承的 Client 构造函数到底做了什么事件。
抛开参数类型的查看,首先是针对拦截器的解决,咱们能够通过两种形式来实现拦截器,一个是提供拦截器的具体函数,这个在所有 method 触发时都会执行,还有一个能够通过传入 interceptor_provider 来实现动静的生成拦截器,函数会在初始化 Client 的时候触发,并要求返回一个新的 interceptor 对象用于执行拦截器的逻辑。

interceptor 的用法

// interceptors 用法
const interceptor = function(options, nextCall) {console.log('trigger')
  return new InterceptingCall(nextCall(options));
}
const client = new services.GreeterClient(
  target,
  grpc.credentials.createInsecure(),
  {interceptors: [interceptor]
  }
);

// interceptor_providers 用法
const interceptor = function(options, nextCall) {console.log('trigger')
  return new InterceptingCall(nextCall(options));
}

const interceptorProvider = (methodDefinition) => {console.log('call interceptorProvider', methodDefinition)
  return interceptor
}

const client = new services.GreeterClient(
  target,
  grpc.credentials.createInsecure(),
  {interceptor_providers: [interceptorProvider]
  }
);

P.S. 须要留神的是,如果传入 interceptor_providers,则会在两个中央触发调用,一个是实例化 Client 的时候,还有一个是在 method 实在调用的时候,每次调用都会触发,所以如果要复用 interceptor,最好在函数之外构建出函数体

然而这样的拦截器其实是没有太多意义的,咱们不可能针对 metadatamessage 来做本人的批改,如果咱们察看 InterceptingCall 的具体函数签名,会发现它反对两个参数的传入。

function InterceptingCall(next_call, requester) {
  this.next_call = next_call;
  this.requester = requester;
}

上边示例只介绍了第一个参数,这个参数预期承受一个对象,对象会提供多个办法,咱们能够通过 console.log(nextCall(options).constructor.prototype) 来查看都有哪些,例如 sendMessagestart 之类的。
而察看这些函数的实现,会发现他们都调用了一个 _callNext

InterceptingCall.prototype.sendMessage = function(message) {this._callNext('sendMessage', [message]);
};

InterceptingCall.prototype.halfClose = function() {this._callNext('halfClose');
};

InterceptingCall.prototype.cancel = function() {this._callNext('cancel');
};

InterceptingCall.prototype._callNext = function(method_name, args, next) {var args_array = args || [];
  var next_call = next ? next : this._getNextCall(method_name);
  if (this.requester && this.requester[method_name]) {
    // Avoid using expensive `apply` calls
    var num_args = args_array.length;
    switch (num_args) {
      case 0:
        return this.requester[method_name](next_call);
      case 1:
        return this.requester[method_name](args_array[0], next_call);
      case 2:
        return this.requester[method_name](args_array[0], args_array[1],
                                           next_call);
    }
  } else {if (next_call === emptyNext) {throw new Error('Interceptor call chain terminated unexpectedly');
    }
    return next_call(args_array[0], args_array[1]);
  }
};

_callNext 办法中,咱们就能够找到 requester 参数到底是有什么用了,如果 requester 也有实现对应的 method_name,那么就会先执行 requester 的办法,随后将 next_call 对应的办法作为调用 requester 办法的最初一个参数传入。
在 grpc-node 中,拦截器的执行程序与传入程序无关,是一个队列,先传入的拦截器先执行,如果传入了第二个参数,则先执行第二个参数对应的办法,后执行第一个参数对应的办法。

所以如果咱们想做一些额定的事件,比如说针对 metadata 增加一个咱们想要的字段,那么就能够这么来写拦截器:

var interceptor = function(options, nextCall) {return new InterceptingCall(nextCall(options), {start: function(metadata, listener, next) {
      next(metadata, {onReceiveMetadata: function (metadata, next) {metadata.set('xxx', 'xxx')
          next(metadata);
        },
      });
     },
  });
};

略微非凡的中央是,start函数的 next 参数被调用时传入的第二个参数并不是一个 InterceptingCall 的实例,而是一个 InterceptingListener 的实例,两者都有 _callNext 的实现,只不过所提供的办法不齐全一样罢了。

Channel 的创立

接下来的代码逻辑次要是用于创立 Channel,能够通过传递不同的参数来笼罩 Channel,也能够用默认的 Channel,这个 Channel 对应的 gRPC 中其实就是做数据传输的那一个模块,能够了解为 HTTP2 最终是在这里应用的。
个别很少会去笼罩默认的 Channel,所以咱们间接去看 grpc-node 里边的 Channel 是如何实现的。

Channel 是 c++ 代码实现的,代码的地位:https://github.com/grpc/grpc-…

如果有同学尝试过混用 grpc-nodegrpc-js,那么你肯定有看到过这个报错:Channel's second argument (credentials) must be a ChannelCredentials
起因就在于 Channel 实例化过程中会进行查看咱们创立 Channel 传入的 credential 是否是继承自 grpc 中的 ChannelCredentials 类。
grpc-nodegrpc-js 用的是两个不同的类,所以混用的话可能会呈现这个问题。

而后就是依据传入的 credential 的不同来判断是否要应用加密,而个别罕用的 grpc.credentials.createInsecure() 其实就是不走加密的意思了,咱们能够在 https://github.com/grpc/grpc-… 和 https://github.com/grpc/grpc-… 来看到对应的逻辑。

后边就是调用 c++ 版本的 grpc 来构建对应的 Channel 了,如果有老铁看过 c++ 版本是如何创立 grpc Client 的,那么这些代码就比拟相熟了:https://github.com/grpc/grpc/…
grpc-node 中也是调用的同样的 API 来创立的。

makeUnaryRequest

Client 被创立进去后,咱们会调用 Client 上的办法(也就是发申请了),这时候就会触发到上边提到的 requester_funcs 其中的一个,咱们先从最简略的 Unary 来说,这种 Client/Server 都是 Unary 申请形式时会触发的函数。
咱们通过上边 method_func 中调用形式能够确定传递了什么参数进去,有几个固定的参数 path、request 序列化形式,以及 response 的反序列化形式。
后边的参数就是由调用时传入的动静参数了,这些能够在 makeUnaryRequest 函数定义中看到,别离是 argument(也就是 request body)、metadata(能够了解为 header,一些元数据)、options 是一个可选的参数(自定义的拦截器是放在这里的),能够用于笼罩 method 的一些形容信息,以及最初的 callback 就是咱们接管到 response 后应该做的操作了。

整个函数的实现,按长度来说,有一半都是在解决参数,而剩下的局部则做了两件事,一个是实例化了 ClientUnaryCall 对象,另一个则是解决拦截器相干的逻辑,并启动拦截器来发送整个申请。
makeUnaryRequest 函数中波及到拦截器的局部有这么几块 resolveInterceptorProvidersgetLastListenergetInterceptingCall

ClientUnaryCall

先来看 ClientUnaryCall 做了什么事件,在源码中有这样的一个代码块,是应用该对象的场景:

function ClientUnaryCall(call) {EventEmitter.call(this);
  this.call = call;
}

var callProperties = {
  argument: argument,
  metadata: metadata,
  call: new ClientUnaryCall(),
  channel: this.$channel,
  methodDefinition: method_definition,
  callOptions: options,
  callback: callback
};

// 以及后续与拦截器产生了一些关联
var emitter = callProperties.call;
// 这行代码很诡异,看起来是能够在实例化的时候传入的,却抉择了在这里笼罩属性值
emitter.call = intercepting_call;

var last_listener = client_interceptors.getLastListener(
  methodDefinition,
  emitter,
  callProperties.callback
);

对于 ClientUnaryCall 的定义也非常简单,其实是一个继承自 EventEmitter 的子类,减少了一个 call 属性的定义,以及两个办法封装调用了 call 属性对应的一些办法。

强烈狐疑 这部分代码是前期有过调整,因为 ClientUnaryCall 构造函数的实现中是能够承受一个参数作为 call 属性的赋值的,然而在代码利用中抉择了后续笼罩 call 属性,而非间接在实例化的时候传入进去

resolveInterceptorProviders

resolveInterceptorProviders 是用来解决用户传入的拦截器的,这个函数在 Client 的整个生命周期会有两处调用,一个是在上边 Client 实例化的过程中会触发一次,再有就是每次 method 被调用之前,会从新触发该函数。
resolveInterceptorProviders 的逻辑很简略,就是遍历咱们传入的 interceptor_provider 并将对应 method 的信息形容传入并执行,失去 provider 返回的 interceptor 用作拦截器。
Client 实例化过程中是会遍历所有的 method 来执行,而在具体的 method 触发时则只触发以后 method 相干的 provider 逻辑。

getLastListener

getLastListener 依照正文中的形容,是为了取得一个最初会触发的监听者,源码大抵是这样的:
https://github.com/grpc/grpc-…

var listenerGenerators = {[methodTypes.UNARY]: _getUnaryListener,
  [methodTypes.CLIENT_STREAMING]: _getClientStreamingListener,
  [methodTypes.SERVER_STREAMING]: _getServerStreamingListener,
  [methodTypes.BIDI_STREAMING]: _getBidiStreamingListener
};

function getLastListener(method_definition, emitter, callback) {if (emitter instanceof Function) {
    callback = emitter;
    callback = function() {};
  }
  if (!(callback instanceof Function)) {callback = function() {};}
  if (!((emitter instanceof EventEmitter) &&
       (callback instanceof Function))) {throw new Error('Argument mismatch in getLastListener');
  }
  var method_type = common.getMethodType(method_definition);
  var generator = listenerGenerators[method_type];
  return generator(method_definition, emitter, callback);
}

同样也应用了一个枚举来辨别不同的办法类型来调用不同的函数来生成对应的 listener。

比方这里用到的 getUnaryListener,是这样的一个逻辑:

function _getUnaryListener(method_definition, emitter, callback) {
  var resultMessage;
  return {onReceiveMetadata: function (metadata) {emitter.emit('metadata', metadata);
    },
    onReceiveMessage: function (message) {resultMessage = message;},
    onReceiveStatus: function (status) {if (status.code !== constants.status.OK) {var error = common.createStatusError(status);
        callback(error);
      } else {callback(null, resultMessage);
      }
      emitter.emit('status', status);
    }
  };
}

代码也算比拟清晰,在不同的阶段会触发不同的事件,而后再真正返回后果当前,触发 callback 来告知用户申请响应。
也就是咱们在示例中调用 sayHello 时传入的 callback 被调用的中央了。

getInterceptingCall

getInterceptingCall 函数的调用会返回一个实例,通过操作该实例咱们能够管制申请的开始、数据的发送以及申请的完结。
咱们上边 getLastListener 返回的对象触发的机会也是会在这里能够找到的。

从源码上来看会波及到这么几个函数:

var interceptorGenerators = {[methodTypes.UNARY]: _getUnaryInterceptor,
  [methodTypes.CLIENT_STREAMING]: _getClientStreamingInterceptor,
  [methodTypes.SERVER_STREAMING]: _getServerStreamingInterceptor,
  [methodTypes.BIDI_STREAMING]: _getBidiStreamingInterceptor
};

function getInterceptingCall(method_definition, options,
                             interceptors, channel, responder) {
  var last_interceptor = _getLastInterceptor(method_definition, channel,
                                            responder);
  var all_interceptors = interceptors.concat(last_interceptor);
  return _buildChain(all_interceptors, options);
}

function _getLastInterceptor(method_definition, channel, responder) {var callback = (responder instanceof Function) ? responder : function() {};
  var emitter = (responder instanceof EventEmitter) ? responder :
                                                      new EventEmitter();
  var method_type = common.getMethodType(method_definition);
  var generator = interceptorGenerators[method_type];
  return generator(method_definition, channel, emitter, callback);
}

function _buildChain(interceptors, options) {var next = function(interceptors) {if (interceptors.length === 0) {return function (options) {};}
    var head_interceptor = interceptors[0];
    var rest_interceptors = interceptors.slice(1);
    return function (options) {return head_interceptor(options, next(rest_interceptors));
    };
  };
  var chain = next(interceptors)(options);
  return new InterceptingCall(chain);
}

_getUnaryInterceptor 因为篇幅较长,间接贴 GitHub 链接了:https://github.com/grpc/grpc-…

大抵的逻辑就是咱们通过 method_definitionchannel 等参数来获取到一个 interceptor,并将其拼接到原有的 interceptor 后边,作为最初执行的拦截器,_buildChain 函数比较简单,就是实现了一个链式调用的函数,用来按程序执行拦截器。

对于 interceptor 如何应用能够看咱们介绍 interceptor 用法时写的 demo

次要的逻辑实际上在 _getUnaryInterceptor 中,咱们会创立一个性能全面的 interceptor,函数会返回一个匿名函数,就是咱们在上边代码中看到的调用 generator 的中央了,而在匿名函数的结尾部门,咱们就调用了 getCall 来获取一个 call 对象,这个 call 对象就是咱们与 gRPC 服务器之间的通道了,申请最终是由 call 对象负责发送的。

getCall 中实际上调用了 channel 对象的 createCall 办法,这部分的逻辑也是在 c++ 中做的了,蕴含数据的发送之类的逻辑。

这是咱们回到 makeUnaryRequest 函数,再看函数完结的中央调用的那三个办法,第一个 start,将咱们的 metadata(能够了解为 header)发送了过来,而后将实在的信息发送了过来,最初调用敞开办法。

咱们能够在 _getUnaryInterceptor 中的 startsendMessage 以及 halfClose 函数中都有调用 _startBatchIfReady 函数,而这个办法实际上就是调用的 channel 上的 startBatch 办法,再依据调用链查找,最终会看到解决逻辑在这里:https://github.com/grpc/grpc/…
opType 与 代码中 switch-case 中的对应关系在这里:https://github.com/grpc/grpc-…

首先在 start 里边次要是发送了 metadata,并且尝试承受服务端返回过去的 metadata,并在回调中触发咱们传入的 listeneronReceiveMetadata 办法。
而后查看 response 的状态是否正确,并触发 listeneronReceiveStatus 办法。

接下来是调用 sendMessage 办法,在这里咱们将音讯体进行序列化,并发送,在回调中就会去调用咱们传入的 callback。

最初在 halfClose 办法中其实就是发送一个指令来设置申请的完结。

整个的流程细化当前大略是这个样子的:

小结

上边整体的记录就是对于 Client 这一侧是如何实现的了。
次要波及到 Client 的构建、发送申请时做的事件、拦截器的作用。
而更深刻的一些逻辑其实是在 c++ 版本的 gRPC 库里所实现,所以本次笔记并没有过多的波及。

文章波及到的局部示例代码仓库地址:https://github.com/Jiasm/grpc…

退出移动版