Apache Dubbo 有injvm形式的通信,可能防止网络带来的提早,同时也不占用本地端口,对测试、本地验证而言,是一种比拟不便的RPC通信形式。

最近看到 containerd 的代码,发现它也有相似的需要。
但应用ip端口通信,有可能会有端口抵触;应用unix socket,可能会有门路抵触。
考查了下gRPC有没有和injvm相似的,基于内存的通信形式。起初发现pipe十分好用,所以记录了下。

Golang/gRPC对网络的形象

首先,咱们先看一下gRPC一次调用的架构图。当然,这个架构图目前只关注了网络形象散布。

咱们重点关注网络局部。

操作系统零碎形象

首先,在网络包之上,零碎形象进去了socket,代表一条虚构连贯,对于UDP,这个虚构连贯是不牢靠的,对于TCP,这个链接是尽力牢靠的。

对于网络编程而言,仅仅有连贯是不够的,还须要通知开发者如何创立、敞开连贯。
对于服务端,零碎提供了accept办法,用来接管连贯。
对于客户端,零碎提供了connect办法,用于和服务端建设连贯。

Golang形象

在Golang中,socket对等的概念叫net.Conn,代表了一条虚构连贯。

接下来,对于服务端,accept这个行为被包装成了net.Listener接口;对于客户端,Golang则基于connect提供了net.Dial办法。

type Listener interface {  // 接管来自客户端的网络连接  Accept() (Conn, error)  Close() error  Addr() Addr}

gRPC应用

那么gRPC是怎么应用Listener和Dial的呢?

对于gRPC服务端,Serve办法接管一个Listener,示意在这个Listener上提供服务。

对于gRPC客户端,网络实质上就是一个可能连贯到某个中央的货色就能够,所以只须要一个dialer func(context.Context, string) (net.Conn, error)函数就行了。

什么是pipe

在操作系统层面,pipe示意一个数据管道,而这个管道两端都在本程序中,能够很好的满足咱们的要求:基于内存的网络通信。

Golang也基于pipe提供了net.Pipe()函数创立了一个双向的、基于内存通信的管道,在能力上,可能很好的满足gRPC对底层通信的要求。

然而net.Pipe仅仅产生了两个net.Conn,即只产生两个网络连接,没有之前提到的Listner,也没有Dial办法。

于是联合Golang的channel,把net.Pipe包装成了Listner,也提供了Dial办法:

  1. Listener.Accept(),只须要监听一个channel,客户端连贯过去的时候,把连贯通过channel传递过去即可
  2. Dial办法,调用Pipe,将一端通过channel给服务端(作为服务端连贯),另一端作为客户端连贯

代码如下:

package mainimport (  "context"  "errors"  "net"  "sync"  "sync/atomic")var ErrPipeListenerClosed = errors.New(`pipe listener already closed`)type PipeListener struct {  ch    chan net.Conn  close chan struct{}  done  uint32  m     sync.Mutex}func ListenPipe() *PipeListener {  return &PipeListener{    ch:    make(chan net.Conn),    close: make(chan struct{}),  }}// Accept 期待客户端连贯func (l *PipeListener) Accept() (c net.Conn, e error) {  select {  case c = <-l.ch:  case <-l.close:    e = ErrPipeListenerClosed  }  return}// Close 敞开 listener.func (l *PipeListener) Close() (e error) {  if atomic.LoadUint32(&l.done) == 0 {    l.m.Lock()    defer l.m.Unlock()    if l.done == 0 {      defer atomic.StoreUint32(&l.done, 1)      close(l.close)      return    }  }  e = ErrPipeListenerClosed  return}// Addr 返回 listener 的地址func (l *PipeListener) Addr() net.Addr {  return pipeAddr(0)}func (l *PipeListener) Dial(network, addr string) (net.Conn, error) {  return l.DialContext(context.Background(), network, addr)}func (l *PipeListener) DialContext(ctx context.Context, network, addr string) (conn net.Conn, e error) {  // PipeListener是否曾经敞开  if atomic.LoadUint32(&l.done) != 0 {    e = ErrPipeListenerClosed    return  }  // 创立pipe  c0, c1 := net.Pipe()  // 期待连贯传递到服务端接管  select {  case <-ctx.Done():    e = ctx.Err()  case l.ch <- c0:    conn = c1  case <-l.close:    c0.Close()    c1.Close()    e = ErrPipeListenerClosed  }  return}type pipeAddr intfunc (pipeAddr) Network() string {  return `pipe`}func (pipeAddr) String() string {  return `pipe`}

如何用pipe作为gRPC的connection

有了下面的包装,咱们就能够基于此创立一个gRPC的服务器端和客户端,来进行基于内存的RPC通信了。

首先,咱们简略的创立一个服务,蕴含了四种调用形式:

syntax = "proto3";option go_package = "google.golang.org/grpc/examples/helloworld/helloworld";option java_multiple_files = true;option java_package = "io.grpc.examples.helloworld";option java_outer_classname = "HelloWorldProto";package helloworld;// The greeting service definition.service Greeter {  // unary调用  rpc SayHello(HelloRequest) returns (HelloReply) {}  // 服务端流式调用  rpc SayHelloReplyStream(HelloRequest) returns (stream HelloReply);  // 客户端流式调用  rpc SayHelloRequestStream(stream HelloRequest) returns (HelloReply);  // 双向流式调用  rpc SayHelloBiStream(stream HelloRequest) returns (stream HelloReply);}// The request message containing the user's name.message HelloRequest {  string name = 1;}// The response message containing the greetingsmessage HelloReply {  string message = 1;}

而后生成相干的stub代码:

protoc --go_out=. --go_opt=paths=source_relative \  --go-grpc_out=. --go-grpc_opt=paths=source_relative \  helloworld/helloworld.proto

而后开始写服务端代码,根本逻辑就是实现一个demo版本的服务端就好:

package mainimport (  "context"  "log"  "github.com/robberphex/grpc-in-memory/helloworld"  pb "github.com/robberphex/grpc-in-memory/helloworld")// helloworld.GreeterServer 的实现type server struct {  // 为了前面代码兼容,必须聚合UnimplementedGreeterServer  // 这样当前在proto文件中新减少一个办法的时候,这段代码至多不会报错  pb.UnimplementedGreeterServer}// unary调用的服务端代码func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {  log.Printf("Received: %v", in.GetName())  return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil}// 客户端流式调用的服务端代码// 接管两个req,而后返回一个respfunc (s *server) SayHelloRequestStream(streamServer pb.Greeter_SayHelloRequestStreamServer) error {  req, err := streamServer.Recv()  if err != nil {    log.Printf("error receiving: %v", err)    return err  }  log.Printf("Received: %v", req.GetName())  req, err = streamServer.Recv()  if err != nil {    log.Printf("error receiving: %v", err)    return err  }  log.Printf("Received: %v", req.GetName())  streamServer.SendAndClose(&pb.HelloReply{Message: "Hello " + req.GetName()})  return nil}// 服务端流式调用的服务端代码// 接管一个req,而后发送两个respfunc (s *server) SayHelloReplyStream(req *pb.HelloRequest, streamServer pb.Greeter_SayHelloReplyStreamServer) error {  log.Printf("Received: %v", req.GetName())  err := streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()})  if err != nil {    log.Printf("error Send: %+v", err)    return err  }  err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName() + "_dup"})  if err != nil {    log.Printf("error Send: %+v", err)    return err  }  return nil}// 双向流式调用的服务端代码func (s *server) SayHelloBiStream(streamServer helloworld.Greeter_SayHelloBiStreamServer) error {  req, err := streamServer.Recv()  if err != nil {    log.Printf("error receiving: %+v", err)    // 及时将谬误返回给客户端,下同    return err  }  log.Printf("Received: %v", req.GetName())  err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()})  if err != nil {    log.Printf("error Send: %+v", err)    return err  }  // 来到这个函数后,streamServer会敞开,所以不举荐在独自的goroute发送音讯  return nil}// 新建一个服务端实现func NewServerImpl() *server {  return &server{}}

而后咱们创立一个基于pipe连贯的客户端来调用服务端。

蕴含如下几个步骤:

  1. 创立服务端实现
  2. 基于pipe创立listener,而后基于它创立gRPC server
  3. 基于pipe创立客户端连贯,而后创立gRPC client,调用服务

代码如下:

package mainimport (  "context"  "fmt"  "log"  "net"  pb "github.com/robberphex/grpc-in-memory/helloworld"  "google.golang.org/grpc")// 将一个服务实现转化为一个客户端func serverToClient(svc *server) pb.GreeterClient {  // 创立一个基于pipe的Listener  pipe := ListenPipe()  s := grpc.NewServer()  // 注册Greeter服务到gRPC  pb.RegisterGreeterServer(s, svc)  if err := s.Serve(pipe); err != nil {    log.Fatalf("failed to serve: %v", err)  }  // 客户端指定应用pipe作为网络连接  clientConn, err := grpc.Dial(`pipe`,    grpc.WithInsecure(),    grpc.WithContextDialer(func(c context.Context, s string) (net.Conn, error) {      return pipe.DialContext(c, `pipe`, s)    }),  )  if err != nil {    log.Fatalf("did not connect: %v", err)  }  // 基于pipe连贯,创立gRPC客户端  c := pb.NewGreeterClient(clientConn)  return c}func main() {  svc := NewServerImpl()  c := serverToClient(svc)  ctx := context.Background()  // unary调用  for i := 0; i < 5; i++ {    r, err := c.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("world_unary_%d", i)})    if err != nil {      log.Fatalf("could not greet: %v", err)    }    log.Printf("Greeting: %s", r.GetMessage())  }  // 客户端流式调用  for i := 0; i < 5; i++ {    streamClient, err := c.SayHelloRequestStream(ctx)    if err != nil {      log.Fatalf("could not SayHelloRequestStream: %v", err)    }    err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d", i)})    if err != nil {      log.Fatalf("could not Send: %v", err)    }    err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d_dup", i)})    if err != nil {      log.Fatalf("could not Send: %v", err)    }    reply, err := streamClient.CloseAndRecv()    if err != nil {      log.Fatalf("could not Recv: %v", err)    }    log.Println(reply.GetMessage())  }  // 服务端流式调用  for i := 0; i < 5; i++ {    streamClient, err := c.SayHelloReplyStream(ctx, &pb.HelloRequest{Name: fmt.Sprintf("SayHelloReplyStream_%d", i)})    if err != nil {      log.Fatalf("could not SayHelloReplyStream: %v", err)    }    reply, err := streamClient.Recv()    if err != nil {      log.Fatalf("could not Recv: %v", err)    }    log.Println(reply.GetMessage())    reply, err = streamClient.Recv()    if err != nil {      log.Fatalf("could not Recv: %v", err)    }    log.Println(reply.GetMessage())  }  // 双向流式调用  for i := 0; i < 5; i++ {    streamClient, err := c.SayHelloBiStream(ctx)    if err != nil {      log.Fatalf("could not SayHelloStream: %v", err)    }    err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("world_stream_%d", i)})    if err != nil {      log.Fatalf("could not Send: %v", err)    }    reply, err := streamClient.Recv()    if err != nil {      log.Fatalf("could not Recv: %v", err)    }    log.Println(reply.GetMessage())  }}

总结

当然,作为基于内存的RPC调用,还能够有更好的形式,比方间接将对象传递到服务端,间接通过本地调用形式来通信。
但这种形式毁坏了很多约定,比方对象地址、比方gRPC连贯参数不失效等等。

本文介绍的,基于Pipe的通信形式,除了网络层走了内存传递之外,其余都和失常RPC通信行为统一,比方同样经验了序列化、经验了HTTP/2的流控制等。当然,性能上比原生调用也会差一点,然而好在对于测试、验证场景,行为上的统一比拟重要些。

本文代码曾经托管到了GitHub https://github.com/robberphex...。


本文首发于 https://robberphex.com/grpc-i...。