共计 7862 个字符,预计需要花费 20 分钟才能阅读完成。
Apache Dubbo 有 injvm 形式的通信,可能防止网络带来的提早,同时也不占用本地端口,对测试、本地验证而言,是一种比拟不便的 RPC 通信形式。
最近看到 containerd 的代码,发现它也有相似的需要。
但应用 ip 端口通信,有可能会有端口抵触;应用 unix socket,可能会有门路抵触。
考查了下 gRPC 有没有和 injvm 相似的,基于内存的通信形式。起初发现 pipe 十分好用,所以记录了下。
Golang/gRPC 对网络的形象
首先,咱们先看一下 gRPC 一次调用的架构图。当然,这个架构图目前只关注了网络形象散布。
咱们重点关注网络局部。
操作系统零碎形象
首先,在网络包之上,零碎形象进去了 socket,代表一条虚构连贯,对于 UDP,这个虚构连贯是不牢靠的,对于 TCP,这个链接是尽力牢靠的。
对于网络编程而言,仅仅有连贯是不够的,还须要通知开发者如何创立、敞开连贯。
对于服务端 ,零碎提供了accept
办法,用来接管连贯。
对于客户端 ,零碎提供了connect
办法,用于和服务端建设连贯。
Golang 形象
在 Golang 中,socket 对等的概念叫net.Conn
,代表了一条虚构连贯。
接下来,对于服务端,accept 这个行为被包装成了 net.Listener
接口;对于客户端,Golang 则基于 connect 提供了 net.Dial
办法。
type Listener interface { | |
// 接管来自客户端的网络连接 | |
Accept() (Conn, error) | |
Close() error | |
Addr() Addr} |
gRPC 应用
那么 gRPC 是怎么应用 Listener 和 Dial 的呢?
对于 gRPC 服务端,Serve
办法接管一个 Listener,示意在这个 Listener 上提供服务。
对于 gRPC 客户端,网络实质上就是一个可能连贯到某个中央的货色就能够,所以只须要一个 dialer func(context.Context, string) (net.Conn, error)
函数就行了。
什么是 pipe
在操作系统层面,pipe
示意一个数据管道,而这个管道两端都在本程序中,能够很好的满足咱们的要求:基于内存的网络通信。
Golang 也基于 pipe 提供了 net.Pipe()
函数创立了一个双向的、基于内存通信的管道,在能力上,可能很好的满足 gRPC 对底层通信的要求。
然而 net.Pipe
仅仅产生了两个net.Conn
,即只产生两个网络连接,没有之前提到的 Listner,也没有 Dial 办法。
于是联合 Golang 的 channel,把 net.Pipe
包装成了 Listner,也提供了 Dial 办法:
Listener.Accept()
,只须要监听一个 channel,客户端连贯过去的时候,把连贯通过 channel 传递过去即可Dial 办法
,调用 Pipe,将一端通过 channel 给服务端(作为服务端连贯),另一端作为客户端连贯
代码如下:
package main | |
import ( | |
"context" | |
"errors" | |
"net" | |
"sync" | |
"sync/atomic" | |
) | |
var ErrPipeListenerClosed = errors.New(`pipe listener already closed`) | |
type PipeListener struct { | |
ch chan net.Conn | |
close chan struct{} | |
done uint32 | |
m sync.Mutex | |
} | |
func ListenPipe() *PipeListener { | |
return &PipeListener{ch: make(chan net.Conn), | |
close: make(chan struct{}), | |
} | |
} | |
// Accept 期待客户端连贯 | |
func (l *PipeListener) Accept() (c net.Conn, e error) { | |
select { | |
case c = <-l.ch: | |
case <-l.close: | |
e = ErrPipeListenerClosed | |
} | |
return | |
} | |
// Close 敞开 listener. | |
func (l *PipeListener) Close() (e error) {if atomic.LoadUint32(&l.done) == 0 {l.m.Lock() | |
defer l.m.Unlock() | |
if l.done == 0 {defer atomic.StoreUint32(&l.done, 1) | |
close(l.close) | |
return | |
} | |
} | |
e = ErrPipeListenerClosed | |
return | |
} | |
// Addr 返回 listener 的地址 | |
func (l *PipeListener) Addr() net.Addr {return pipeAddr(0) | |
} | |
func (l *PipeListener) Dial(network, addr string) (net.Conn, error) {return l.DialContext(context.Background(), network, addr) | |
} | |
func (l *PipeListener) DialContext(ctx context.Context, network, addr string) (conn net.Conn, e error) { | |
// PipeListener 是否曾经敞开 | |
if atomic.LoadUint32(&l.done) != 0 { | |
e = ErrPipeListenerClosed | |
return | |
} | |
// 创立 pipe | |
c0, c1 := net.Pipe() | |
// 期待连贯传递到服务端接管 | |
select {case <-ctx.Done(): | |
e = ctx.Err() | |
case l.ch <- c0: | |
conn = c1 | |
case <-l.close: | |
c0.Close() | |
c1.Close() | |
e = ErrPipeListenerClosed | |
} | |
return | |
} | |
type pipeAddr int | |
func (pipeAddr) Network() string {return `pipe`} | |
func (pipeAddr) String() string {return `pipe`} |
如何用 pipe 作为 gRPC 的 connection
有了下面的包装,咱们就能够基于此创立一个 gRPC 的服务器端和客户端,来进行基于内存的 RPC 通信了。
首先,咱们简略的创立一个服务,蕴含了四种调用形式:
syntax = "proto3"; | |
option go_package = "google.golang.org/grpc/examples/helloworld/helloworld"; | |
option java_multiple_files = true; | |
option java_package = "io.grpc.examples.helloworld"; | |
option java_outer_classname = "HelloWorldProto"; | |
package helloworld; | |
// The greeting service definition. | |
service Greeter { | |
// unary 调用 | |
rpc SayHello(HelloRequest) returns (HelloReply) {} | |
// 服务端流式调用 | |
rpc SayHelloReplyStream(HelloRequest) returns (stream HelloReply); | |
// 客户端流式调用 | |
rpc SayHelloRequestStream(stream HelloRequest) returns (HelloReply); | |
// 双向流式调用 | |
rpc SayHelloBiStream(stream HelloRequest) returns (stream HelloReply); | |
} | |
// The request message containing the user's name. | |
message HelloRequest {string name = 1;} | |
// The response message containing the greetings | |
message HelloReply {string message = 1;} |
而后生成相干的 stub 代码:
protoc --go_out=. --go_opt=paths=source_relative \ | |
--go-grpc_out=. --go-grpc_opt=paths=source_relative \ | |
helloworld/helloworld.proto |
而后开始写服务端代码,根本逻辑就是实现一个 demo 版本的服务端就好:
package main | |
import ( | |
"context" | |
"log" | |
"github.com/robberphex/grpc-in-memory/helloworld" | |
pb "github.com/robberphex/grpc-in-memory/helloworld" | |
) | |
// helloworld.GreeterServer 的实现 | |
type server struct { | |
// 为了前面代码兼容,必须聚合 UnimplementedGreeterServer | |
// 这样当前在 proto 文件中新减少一个办法的时候,这段代码至多不会报错 | |
pb.UnimplementedGreeterServer | |
} | |
// unary 调用的服务端代码 | |
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {log.Printf("Received: %v", in.GetName()) | |
return &pb.HelloReply{Message: "Hello" + in.GetName()}, nil | |
} | |
// 客户端流式调用的服务端代码 | |
// 接管两个 req,而后返回一个 resp | |
func (s *server) SayHelloRequestStream(streamServer pb.Greeter_SayHelloRequestStreamServer) error {req, err := streamServer.Recv() | |
if err != nil {log.Printf("error receiving: %v", err) | |
return err | |
} | |
log.Printf("Received: %v", req.GetName()) | |
req, err = streamServer.Recv() | |
if err != nil {log.Printf("error receiving: %v", err) | |
return err | |
} | |
log.Printf("Received: %v", req.GetName()) | |
streamServer.SendAndClose(&pb.HelloReply{Message: "Hello" + req.GetName()}) | |
return nil | |
} | |
// 服务端流式调用的服务端代码 | |
// 接管一个 req,而后发送两个 resp | |
func (s *server) SayHelloReplyStream(req *pb.HelloRequest, streamServer pb.Greeter_SayHelloReplyStreamServer) error {log.Printf("Received: %v", req.GetName()) | |
err := streamServer.Send(&pb.HelloReply{Message: "Hello" + req.GetName()}) | |
if err != nil {log.Printf("error Send: %+v", err) | |
return err | |
} | |
err = streamServer.Send(&pb.HelloReply{Message: "Hello" + req.GetName() + "_dup"}) | |
if err != nil {log.Printf("error Send: %+v", err) | |
return err | |
} | |
return nil | |
} | |
// 双向流式调用的服务端代码 | |
func (s *server) SayHelloBiStream(streamServer helloworld.Greeter_SayHelloBiStreamServer) error {req, err := streamServer.Recv() | |
if err != nil {log.Printf("error receiving: %+v", err) | |
// 及时将谬误返回给客户端,下同 | |
return err | |
} | |
log.Printf("Received: %v", req.GetName()) | |
err = streamServer.Send(&pb.HelloReply{Message: "Hello" + req.GetName()}) | |
if err != nil {log.Printf("error Send: %+v", err) | |
return err | |
} | |
// 来到这个函数后,streamServer 会敞开,所以不举荐在独自的 goroute 发送音讯 | |
return nil | |
} | |
// 新建一个服务端实现 | |
func NewServerImpl() *server {return &server{} | |
} |
而后咱们创立一个基于 pipe 连贯的客户端来调用服务端。
蕴含如下几个步骤:
- 创立服务端实现
- 基于 pipe 创立 listener,而后基于它创立 gRPC server
- 基于 pipe 创立客户端连贯,而后创立 gRPC client,调用服务
代码如下:
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"net" | |
pb "github.com/robberphex/grpc-in-memory/helloworld" | |
"google.golang.org/grpc" | |
) | |
// 将一个服务实现转化为一个客户端 | |
func serverToClient(svc *server) pb.GreeterClient { | |
// 创立一个基于 pipe 的 Listener | |
pipe := ListenPipe() | |
s := grpc.NewServer() | |
// 注册 Greeter 服务到 gRPC | |
pb.RegisterGreeterServer(s, svc) | |
if err := s.Serve(pipe); err != nil {log.Fatalf("failed to serve: %v", err) | |
} | |
// 客户端指定应用 pipe 作为网络连接 | |
clientConn, err := grpc.Dial(`pipe`, | |
grpc.WithInsecure(), | |
grpc.WithContextDialer(func(c context.Context, s string) (net.Conn, error) {return pipe.DialContext(c, `pipe`, s) | |
}), | |
) | |
if err != nil {log.Fatalf("did not connect: %v", err) | |
} | |
// 基于 pipe 连贯,创立 gRPC 客户端 | |
c := pb.NewGreeterClient(clientConn) | |
return c | |
} | |
func main() {svc := NewServerImpl() | |
c := serverToClient(svc) | |
ctx := context.Background() | |
// unary 调用 | |
for i := 0; i < 5; i++ {r, err := c.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("world_unary_%d", i)}) | |
if err != nil {log.Fatalf("could not greet: %v", err) | |
} | |
log.Printf("Greeting: %s", r.GetMessage()) | |
} | |
// 客户端流式调用 | |
for i := 0; i < 5; i++ {streamClient, err := c.SayHelloRequestStream(ctx) | |
if err != nil {log.Fatalf("could not SayHelloRequestStream: %v", err) | |
} | |
err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d", i)}) | |
if err != nil {log.Fatalf("could not Send: %v", err) | |
} | |
err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d_dup", i)}) | |
if err != nil {log.Fatalf("could not Send: %v", err) | |
} | |
reply, err := streamClient.CloseAndRecv() | |
if err != nil {log.Fatalf("could not Recv: %v", err) | |
} | |
log.Println(reply.GetMessage()) | |
} | |
// 服务端流式调用 | |
for i := 0; i < 5; i++ {streamClient, err := c.SayHelloReplyStream(ctx, &pb.HelloRequest{Name: fmt.Sprintf("SayHelloReplyStream_%d", i)}) | |
if err != nil {log.Fatalf("could not SayHelloReplyStream: %v", err) | |
} | |
reply, err := streamClient.Recv() | |
if err != nil {log.Fatalf("could not Recv: %v", err) | |
} | |
log.Println(reply.GetMessage()) | |
reply, err = streamClient.Recv() | |
if err != nil {log.Fatalf("could not Recv: %v", err) | |
} | |
log.Println(reply.GetMessage()) | |
} | |
// 双向流式调用 | |
for i := 0; i < 5; i++ {streamClient, err := c.SayHelloBiStream(ctx) | |
if err != nil {log.Fatalf("could not SayHelloStream: %v", err) | |
} | |
err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("world_stream_%d", i)}) | |
if err != nil {log.Fatalf("could not Send: %v", err) | |
} | |
reply, err := streamClient.Recv() | |
if err != nil {log.Fatalf("could not Recv: %v", err) | |
} | |
log.Println(reply.GetMessage()) | |
} | |
} |
总结
当然,作为基于内存的 RPC 调用,还能够有更好的形式,比方间接将对象传递到服务端,间接通过本地调用形式来通信。
但这种形式毁坏了很多约定,比方对象地址、比方 gRPC 连贯参数不失效等等。
本文介绍的,基于 Pipe 的通信形式,除了网络层走了内存传递之外,其余都和失常 RPC 通信行为统一,比方同样经验了序列化、经验了 HTTP/ 2 的流控制等。当然,性能上比原生调用也会差一点,然而好在对于测试、验证场景,行为上的统一比拟重要些。
本文代码曾经托管到了 GitHub https://github.com/robberphex…。
本文首发于 https://robberphex.com/grpc-i…。