发现的问题
在微服务开发中,gRPC 的利用相对少不了,个别状况下,外部微服务交互,通常是应用 RPC 进行通信,如果是内部通信的话,会提供 https 接口文档
对于 gRPC 的根本应用能够查看文章 gRPC介绍
对于 gRPC ,咱们须要根本晓得如下的一些知识点:
gRPC 的根本四种模式的利用场景
- 申请响应模式
- 客户端数据流模式
- 服务端数据流模式
- 双向流模式
<!---->
- Proto 文件的定义和应用
<!---->
gRPC 拦截器的利用 , 根本的能够查看这篇 gRPC 拦截器
- 实际上有客户端拦截器 和 服务端拦截器,具体具体的能够自行学习
<!---->
- gRPC 的设计原理细节
<!---->
- Go-Kit 的应用
当然明天并不是要聊 gRPC 的利用或者原理,而是想聊咱们在开发过程中很容易遇到的问题:
- 未复用 gRPC 客户端连贯,影响性能
最近审查各个服务代码中,发现整个部门应用 gRPC 客户端申请服务端接口的时候,都是会新建一个连贯,而后调用服务端接口,应用结束之后就 close 掉, 例如这样
这会有什么问题呢?
失常简略的应用不会有啥问题,但如果是面临高并发的状况,性能问题很容易就会呈现,例如咱们在做性能测试的时候,就会发现,打一会性能测试,客户端申请服务端的时候就会报错:
rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused
理论去查看问题的时候,很显著,这是 gRPC 的连接数被打满了,很多连贯都还未齐全开释
那这个时候,简略思考一下,咱们是没有必要对于每一次客户端申请服务端接口的时候,都新建设一次连贯,并且调用结束之后就马上敞开连贯
咱们晓得,gRPC 的通信实质上也是 TCP 的连贯,那么一次连贯就须要三次握手,和四次挥手,每一次建设连贯和开释连贯的时候,都须要走这么一个过程,如果咱们频繁的建设和开释连贯,这对于资源和性能其实都是一个大大的节约
咱们还晓得 gRPC 是一个高性能、开源和领有对立规定的 RPC框架,面向对象的 http/2 通信协议,可能能节俭空间和 IO 密集度的开销 ,然而咱们并没有很好的将他使用起来,gRPC 服务端的连贯治理不必咱们操心,然而咱们对于 gRPC 客户端的间断十分有必要关怀,咱们要想方法复用客户端的连贯
gRPC 连接池
复用连贯,咱们能够应用连接池的形式
对于这种复用资源,咱们其实也接触了不少,例如复用线程 worker 的线程池,go 中的协程池 ..
简略来说,连接池 ,就是提前创立好肯定数量的 tcp 连贯句柄放在池子中,咱们须要和内部通信的时候,就去池子中取一个连贯来用,用完了之后,咱们就放回去
连接池解决了什么问题
很显著,连接池解决了上述咱们频繁创立连贯和开释连贯带来的资源和性能上的损耗,咱们节俭了这部分开销后,天然就进步了咱们的性能
可是咱们再次思考一下,如果这个连贯池子就是只能寄存固定的连贯,那么咱们业务扩张的时候,岂不是光期待池子外面有闲暇连贯就会消耗大量的工夫呢?
或者是池子过大,咱们须要的连接数较少,那么开拓那么多连贯岂不是一种节约?
那么咱们在设计或者是利用连接池的时候,就须要思考如下几个方面了:
- 连接池是否反对扩缩容
<!---->
- 闲暇的连贯是否反对超时自行敞开,是否反对保活
<!---->
- 池子满的时候,解决的策略是什么样的
其实对于连接池的设计和库网上都很多,咱们能够找一个案例来看看如何来应用连接池,以及它是如何来进行上述几个方面的编码落地的
如何去应用连接池
先来看看客户端如何应用连接池
客户端应用 pool
client/main.go
package mainimport ( "context" "flag" "fmt" "log" "time" "mypoolclient/pool" "mypoolclient/pb")var addr = flag.String("addr", "127.0.0.1:8888", "the address to connect to")func main() { flag.Parse() p, err := pool.New(*addr, pool.DefaultOptions) if err != nil { log.Fatalf("failed to new pool: %v", err) } defer p.Close() conn, err := p.Get() if err != nil { log.Fatalf("failed to get conn: %v", err) } defer conn.Close() client := pb.NewTestsvrClient(conn.Value()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() res, err := client.Say(ctx, &pb.TestReq{Message: []byte("hi")}) if err != nil { log.Fatalf("unexpected error from Say: %v", err) } fmt.Println("rpc response:", res)}
此处的客户端,咱们很显著能够看进去,以前咱们应用客户端去调用服务端接口的时候,总会不盲目的 Dial 一下建设连贯
咱们应用连接池的话,就能够间接从池子外面拿一个连贯进去间接应用即可
服务端
server/client.go
package mainimport ( "context" "flag" "fmt" "log" "net" "google.golang.org/grpc" "mypoolserver/pb")var port = flag.Int("port", 8888, "port number")// server implements EchoServer.type server struct{}func (s *server) Say(context.Context, *pb.TestReq) (*pb.TestRsp, error) { fmt.Println("call Say ... ") return &pb.TestRsp{Message: []byte("hello world")}, nil}func main() { flag.Parse() listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterTestsvrServer(s, &server{}) fmt.Println("start server ...") if err := s.Serve(listen); err != nil { log.Fatalf("failed to serve: %v", err) } fmt.Println("over server ...")}
连接池的具体实现形式
连接池的具体实现形式,参考了 github https://github.com/shimingyah/pool
具体的实现,都放在上述目录的 pool 上面了 , 也能够拜访地址 : https://github.com/qingconglaixueit/mypoolapp
pool 包中蕴含了 3 个文件,作用如下:
.
├── conn.go
-- 对于 grpc 连贯的构造定义和办法实现
├── options.go
-- 拦截器的常量定义,以及 Dial 建设连贯的简略封装, 这个文件可要可不要,看本人的需要
└── pool.go
-- 具体 pool 的接口定义和实现
间接来看 pool.go 中的接口定义
type Pool interface { Get() (Conn, error) Close() error Status() string}
- Get()
获取一个新的连贯 , 当敞开连贯的时候,会将该连贯放入到池子中
- Close()
敞开连接池,天然连贯池子中的连贯也不再可用
对于 pool 构造的定义 ,conn 构造的定义倡议,将上述 github 地址上的源码下载下来进行浏览,上面次要是分享对于
- 连贯池子的创立,扩缩容,开释
<!---->
- 具体 TCP 连贯的创立和开释
创立连接池
func New(address string, option Options) (Pool, error) { if address == "" { return nil, errors.New("invalid address settings") } if option.Dial == nil { return nil, errors.New("invalid dial settings") } if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive { return nil, errors.New("invalid maximum settings") } if option.MaxConcurrentStreams <= 0 { return nil, errors.New("invalid maximun settings") } p := &pool{ index: 0, current: int32(option.MaxIdle), ref: 0, opt: option, conns: make([]*conn, option.MaxActive), address: address, closed: 0, } for i := 0; i < p.opt.MaxIdle; i++ { c, err := p.opt.Dial(address) if err != nil { p.Close() return nil, fmt.Errorf("dial is not able to fill the pool: %s", err) } p.conns[i] = p.wrapConn(c, false) } log.Printf("new pool success: %v\n", p.Status()) return p, nil}
对于 pool 的接口,能够看成是这样的
对于创立连接池,除了校验根本的参数以外,咱们晓得池子其实是一个 TCP 连贯的切片,长度为 option.MaxActive 即最大的沉闷连接数
p.conns[i] = p.wrapConn(c, false)
示意咱们初始化一个连贯,并放到连接池中,且初始化的 once 参数置为 false,示意该连贯默认保留在池子中,不被销毁
换句话说,当咱们须要实在销毁连接池中的连贯的时候,就将该链接的 once 参数置为 false 即可,实际上也无需咱们应用这去做这一步
实际上 对于每一个连贯的建设也是在 New 外面实现的,只有有 1 个连贯未建设胜利,那么咱们的连接池就算是建设失败,咱们会调用 p.Close() 将之前建设好的连贯全副开释掉
// 敞开连接池func (p *pool) Close() error { atomic.StoreInt32(&p.closed, 1) atomic.StoreUint32(&p.index, 0) atomic.StoreInt32(&p.current, 0) atomic.StoreInt32(&p.ref, 0) p.deleteFrom(0) log.Printf("close pool success: %v\n", p.Status()) return nil}// 革除从 指定地位开始到 MaxActive 之间的连贯func (p *pool) deleteFrom(begin int) { for i := begin; i < p.opt.MaxActive; i++ { p.reset(i) }}// 革除具体的连贯func (p *pool) reset(index int) { conn := p.conns[index] if conn == nil { return } conn.reset() p.conns[index] = nil}
这里咱们能够看到,当须要从池子中革除具体的连贯的时候,最终从连贯池子中取出对应地位上的连贯 ,conn := p.conns[index], conn.reset()
,实际上是给以后这个连贯进行参数赋值
func (c *conn) reset() error { cc := c.cc c.cc = nil c.once = false if cc != nil { return cc.Close() } return nil}func (c *conn) Close() error { c.pool.decrRef() if c.once { return c.reset() } return nil}
最终调用 Close() 将指定的连贯革除掉,这些动作都是连接池主动给咱们做了,无需咱们使用者去放心
咱们应用连接池通过 pool.Get() 拿到具体的连贯句柄 conn 之后,咱们应用 conn.Close() 敞开连贯,实际上也是会走到上述的 Close() 实现的地位,然而咱们并未指定当然也没有权限显示的指定将 once 置位为 false ,因而对于调用者来说,是敞开了连贯,对于连接池来说,实际上是将连贯偿还到连接池中
对于连贯池子的缩容和扩容是在 pool.Get() 中实现的
func (p *pool) Get() (Conn, error) { // the first selected from the created connections nextRef := p.incrRef() p.RLock() current := atomic.LoadInt32(&p.current) p.RUnlock() if current == 0 { return nil, ErrClosed } if nextRef <= current*int32(p.opt.MaxConcurrentStreams) { next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil } // the number connection of pool is reach to max active if current == int32(p.opt.MaxActive) { // the second if reuse is true, select from pool's connections if p.opt.Reuse { next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil } // the third create one-time connection c, err := p.opt.Dial(p.address) return p.wrapConn(c, true), err } // the fourth create new connections given back to pool p.Lock() current = atomic.LoadInt32(&p.current) if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) { // 2 times the incremental or the remain incremental increment := current if current+increment > int32(p.opt.MaxActive) { increment = int32(p.opt.MaxActive) - current } var i int32 var err error for i = 0; i < increment; i++ { c, er := p.opt.Dial(p.address) if er != nil { err = er break } p.reset(int(current + i)) p.conns[current+i] = p.wrapConn(c, false) } current += i log.Printf("grow pool: %d ---> %d, increment: %d, maxActive: %d\n", p.current, current, increment, p.opt.MaxActive) atomic.StoreInt32(&p.current, current) if err != nil { p.Unlock() return nil, err } } p.Unlock() next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil}
从 Get 的实现中,咱们能够晓得 Get 的逻辑如下
- 先减少连贯的援用计数,如果在设定 current*int32(p.opt.MaxConcurrentStreams) 范畴内,那么间接取连贯进行应用即可
<!---->
- 若以后的连接数达到了最大沉闷的连接数,那么就看咱们新建池子的时候传递的 option 中的 reuse 参数是否是 true,若是复用,则随机取出连接池中的任意连贯提供应用,如果不复用,则新建一个连贯
<!---->
- 其余的状况,就须要咱们进行 2 倍或者 1 倍的数量对连接池进行扩容了
实际上,上述的库中,并没有提供咱们缩容的算法,如果真的有这方面的需要的话
也能够在 Get 的实现上进行缩容,具体的缩容策略能够依据理论状况来定,例如当援用计数 nextRef 只有以后沉闷连接数的 20% 的时候(这只是一个例子),就能够思考缩容了
感激浏览,欢送交换,点个赞,关注一波 再走吧