近期比拟火的开源我的项目go-zero是一个集成了各种工程实际的蕴含了Web和RPC协定的功能完善的微服务框架,明天咱们就一起来剖析一下其中的RPC局部zRPC。
zRPC底层依赖gRPC,内置了服务注册、负载平衡、拦截器等模块,其中还包含自适应降载,自适应熔断,限流等微服务治理计划,是一个简略易用的可间接用于生产的企业级RPC框架。
zRPC初探
zRPC反对直连和基于etcd服务发现两种形式,咱们以基于etcd做服务发现为例演示zRPC的根本应用:
配置
创立hello.yaml配置文件,配置如下:
Name: hello.rpc // 服务名ListenOn: 127.0.0.1:9090 // 服务监听地址Etcd: Hosts: - 127.0.0.1:2379 // etcd服务地址 Key: hello.rpc // 服务注册key
创立proto文件
创立hello.proto文件,并生成对应的go代码
syntax = "proto3";package pb;service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {}}message HelloRequest { string name = 1;}message HelloReply { string message = 1;}
生成go代码
protoc --go_out=plugins=grpc:. hello.proto
Server端
package mainimport ( "context" "flag" "log" "example/zrpc/pb" "github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/zrpc" "google.golang.org/grpc")type Config struct { zrpc.RpcServerConf}var cfgFile = flag.String("f", "./hello.yaml", "cfg file")func main() { flag.Parse() var cfg Config conf.MustLoad(*cfgFile, &cfg) srv, err := zrpc.NewServer(cfg.RpcServerConf, func(s *grpc.Server) { pb.RegisterGreeterServer(s, &Hello{}) }) if err != nil { log.Fatal(err) } srv.Start()}type Hello struct{}func (h *Hello) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { return &pb.HelloReply{Message: "hello " + in.Name}, nil}
Client端
package mainimport ( "context" "log" "example/zrpc/pb" "github.com/tal-tech/go-zero/core/discov" "github.com/tal-tech/go-zero/zrpc")func main() { client := zrpc.MustNewClient(zrpc.RpcClientConf{ Etcd: discov.EtcdConf{ Hosts: []string{"127.0.0.1:2379"}, Key: "hello.rpc", }, }) conn := client.Conn() hello := pb.NewGreeterClient(conn) reply, err := hello.SayHello(context.Background(), &pb.HelloRequest{Name: "go-zero"}) if err != nil { log.Fatal(err) } log.Println(reply.Message)}
启动服务,查看服务是否注册:
ETCDCTL_API=3 etcdctl get hello.rpc --prefix
显示服务曾经注册:
hello.rpc/7587849401504590084127.0.0.1:9090
运行客户端即可看到输入:
hello go-zero
这个例子演示了zRPC的根本应用,能够看到通过zRPC构建RPC服务非常简单,只须要很少的几行代码,接下来咱们持续进行摸索
zRPC原理剖析
下图展现zRPC的架构图和次要组成部分
zRPC次要有以下几个模块组成:
- discov: 服务发现模块,基于etcd实现服务发现性能
- resolver: 服务注册模块,实现了gRPC的resolver.Builder接口并注册到gRPC
- interceptor: 拦截器,对申请和响应进行拦挡解决
- balancer: 负载平衡模块,实现了p2c负载平衡算法,并注册到gRPC
- client: zRPC客户端,负责发动申请
- server: zRPC服务端,负责解决申请
这里介绍了zRPC的次要组成模块和每个模块的次要性能,其中resolver和balancer模块实现了gRPC凋谢的接口,实现了自定义的resolver和balancer,拦截器模块是整个zRPC的性能重点,自适应降载、自适应熔断、prometheus服务指标收集等性能都在这里实现
Interceptor模块
gRPC提供了拦截器性能,次要是对申请前后进行额定解决的拦挡操作,其中拦截器蕴含客户端拦截器和服务端拦截器,又分为一元(Unary)拦截器和流(Stream)拦截器,这里咱们次要解说一元拦截器,流拦截器同理。
客户端拦截器定义如下:
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
其中method为办法名,req,reply别离为申请和响应参数,cc为客户端连贯对象,invoker参数是真正执行rpc办法的handler其实在拦截器中被调用执行
服务端拦截器定义如下:
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
其中req为申请参数,info中蕴含了申请办法属性,handler为对server端办法的包装,也是在拦截器中被调用执行
zRPC中内置了丰盛的拦截器,其中包含自适应降载、自适应熔断、权限验证、prometheus指标收集等等,因为拦截器较多,篇幅无限没法所有的拦截器给大家一一解析,这里咱们次要剖析两个,自适应熔断和prometheus服务监控指标收集:
内置拦截器剖析
自适应熔断(breaker)
当客户端向服务端发动申请,客户端会记录服务端返回的谬误,当谬误达到肯定的比例,客户端会自行的进行熔断解决,抛弃掉肯定比例的申请以爱护上游依赖,且能够主动复原。zRPC中自适应熔断遵循《Google SRE》中过载爱护策略,算法如下:
<img src="https://static.gocn.vip/photo/2020/877bfdd2-5df2-4e6c-b61e-cacc3f0e46c4.png?x-oss-process=image/resize,w_600" alt="overload" style="zoom:120%;" />
requests: 总申请数量
accepts: 失常申请数量
K: 倍值 (Google SRE推荐值为2)
能够通过批改K的值来批改熔断产生的激进水平,升高K的值会使得自适应熔断算法更加激进,减少K的值则自适应熔断算法变得不再那么激进
熔断拦截器定义如下:
func BreakerInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // target + 办法名 breakerName := path.Join(cc.Target(), method) return breaker.DoWithAcceptable(breakerName, func() error { // 真正执行调用 return invoker(ctx, method, req, reply, cc, opts...) }, codes.Acceptable)}
accept办法实现了Google SRE过载爱护算法,判断否进行熔断
func (b *googleBreaker) accept() error { // accepts为失常申请数,total为总申请数 accepts, total := b.history() weightedAccepts := b.k * float64(accepts) // 算法实现 dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1)) if dropRatio <= 0 { return nil } // 是否超过比例 if b.proba.TrueOnProba(dropRatio) { return ErrServiceUnavailable } return nil}
doReq办法首先判断是否熔断,满足条件间接返回error(circuit breaker is open),不满足条件则对申请数进行累加
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error { if err := b.accept(); err != nil { if fallback != nil { return fallback(err) } else { return err } } defer func() { if e := recover(); e != nil { b.markFailure() panic(e) } }() // 此处执行RPC申请 err := req() // 失常申请total和accepts都会加1 if acceptable(err) { b.markSuccess() } else { // 申请失败只有total会加1 b.markFailure() } return err}
prometheus指标收集
服务监控是理解服务以后运行状态以及变化趋势的重要伎俩,监控依赖于服务指标的收集,通过prometheus进行监控指标的收集是业界支流计划,zRPC中也采纳了prometheus来进行指标的收集
prometheus拦截器定义如下:
这个拦截器次要是对服务的监控指标进行收集,这里次要是对RPC办法的耗时和调用谬误进行收集,这里次要应用了Prometheus的Histogram和Counter数据类型
func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) ( interface{}, error) { // 执行前记录一个工夫 startTime := timex.Now() resp, err := handler(ctx, req) // 执行后通过Since算出执行该调用的耗时 metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod) // 办法对应的错误码 metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err)))) return resp, err }}
增加自定义拦截器
除了内置了丰盛的拦截器之外,zRPC同时反对增加自定义拦截器
Client端通过AddInterceptor办法增加一元拦截器:
func (rc *RpcClient) AddInterceptor(interceptor grpc.UnaryClientInterceptor) { rc.client.AddInterceptor(interceptor)}
Server端通过AddUnaryInterceptors办法增加一元拦截器:
func (rs *RpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) { rs.server.AddUnaryInterceptors(interceptors...)}
resolver模块
zRPC服务注册架构图:
zRPC中自定义了resolver模块,用来实现服务的注册性能。zRPC底层依赖gRPC,在gRPC中要想自定义resolver须要实现resolver.Builder接口:
type Builder interface { Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) Scheme() string}
其中Build办法返回Resolver,Resolver定义如下:
type Resolver interface { ResolveNow(ResolveNowOptions) Close()}
在zRPC中定义了两种resolver,direct和discov,这里咱们次要剖析基于etcd做服务发现的discov,自定义的resolver须要通过gRPC提供了Register办法进行注册代码如下:
func RegisterResolver() { resolver.Register(&dirBuilder) resolver.Register(&disBuilder)}
当咱们启动咱们的zRPC Server的时候,调用Start办法,会像etcd中注册对应的服务地址:
func (ags keepAliveServer) Start(fn RegisterFn) error { // 注册服务地址 if err := ags.registerEtcd(); err != nil { return err } // 启动服务 return ags.Server.Start(fn)}
当咱们启动zRPC客户端的时候,在gRPC外部会调用咱们自定义resolver的Build办法,zRPC通过在Build办法内调用执行了resolver.ClientConn的UpdateState办法,该办法会把服务地址注册到gRPC客户端外部:
func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) ( resolver.Resolver, error) { hosts := strings.FieldsFunc(target.Authority, func(r rune) bool { return r == EndpointSepChar }) // 服务发现 sub, err := discov.NewSubscriber(hosts, target.Endpoint) if err != nil { return nil, err } update := func() { var addrs []resolver.Address for _, val := range subset(sub.Values(), subsetSize) { addrs = append(addrs, resolver.Address{ Addr: val, }) } // 向gRPC注册服务地址 cc.UpdateState(resolver.State{ Addresses: addrs, }) } // 监听 sub.AddListener(update) update() // 返回自定义的resolver.Resolver return &nopResolver{cc: cc}, nil}
在discov中,通过调用load办法从etcd中获取指定服务的所有地址:
func (c *cluster) load(cli EtcdClient, key string) { var resp *clientv3.GetResponse for { var err error ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout) // 从etcd中获取指定服务的所有地址 resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix()) cancel() if err == nil { break } logx.Error(err) time.Sleep(coolDownInterval) } var kvs []KV c.lock.Lock() for _, ev := range resp.Kvs { kvs = append(kvs, KV{ Key: string(ev.Key), Val: string(ev.Value), }) } c.lock.Unlock() c.handleChanges(key, kvs)}
并通过watch监听服务地址的变动:
func (c *cluster) watch(cli EtcdClient, key string) { rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix()) for { select { case wresp, ok := <-rch: if !ok { logx.Error("etcd monitor chan has been closed") return } if wresp.Canceled { logx.Error("etcd monitor chan has been canceled") return } if wresp.Err() != nil { logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err())) return } // 监听变动告诉更新 c.handleWatchEvents(key, wresp.Events) case <-c.done: return } }}
这部分次要介绍了zRPC中是如何自定义的resolver,以及基于etcd的服务发现原理,通过这部分的介绍大家能够理解到zRPC外部服务注册发现的原理,源代码比拟多只是粗略的从整个流程上进行了剖析,如果大家对zRPC的源码比拟感兴趣能够自行进行学习
balancer模块
负载平衡原理图:
<img src="https://static.gocn.vip/photo/2020/a53d8ed1-116d-49b5-9672-d7d466c43e9c.png?x-oss-process=image/resize,w_800" alt="balancer" style="zoom:45%;" />
防止过载是负载平衡策略的一个重要指标,好的负载平衡算法能很好的均衡服务端资源。罕用的负载平衡算法有轮训、随机、Hash、加权轮训等。但为了应答各种简单的场景,简略的负载平衡算法往往体现的不够好,比方轮训算法当服务响应工夫变长就很容易导致负载不再均衡, 因而zRPC中自定义了默认负载平衡算法P2C(Power of Two Choices),和resolver相似,要想自定义balancer也须要实现gRPC定义的balancer.Builder接口,因为和resolver相似这里不再带大家一起剖析如何自定义balancer,感兴趣的敌人能够查看gRPC相干的文档来进行学习
留神,zRPC是在客户端进行负载平衡,常见的还有通过nginx两头代理的形式
zRPC框架中默认的负载平衡算法为P2C,该算法的次要思维是:
- 从可用节点列表中做两次随机抉择操作,失去节点A、B
- 比拟A、B两个节点,选出负载最低的节点作为被选中的节点
伪代码如下:
<img src="https://static.gocn.vip/photo/2020/23203454-2529-423b-aaef-bdfce28cfab3.png?x-oss-process=image/resize,w_400" alt="random_pseudo" style="zoom:80%;" />
![]()
次要算法逻辑在Pick办法中实现:
func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) ( conn balancer.SubConn, done func(balancer.DoneInfo), err error) { p.lock.Lock() defer p.lock.Unlock() var chosen *subConn switch len(p.conns) { case 0: return nil, nil, balancer.ErrNoSubConnAvailable case 1: chosen = p.choose(p.conns[0], nil) case 2: chosen = p.choose(p.conns[0], p.conns[1]) default: var node1, node2 *subConn for i := 0; i < pickTimes; i++ { // 随机数 a := p.r.Intn(len(p.conns)) b := p.r.Intn(len(p.conns) - 1) if b >= a { b++ } // 随机获取所有节点中的两个节点 node1 = p.conns[a] node2 = p.conns[b] // 效验节点是否衰弱 if node1.healthy() && node2.healthy() { break } } // 抉择其中一个节点 chosen = p.choose(node1, node2) } atomic.AddInt64(&chosen.inflight, 1) atomic.AddInt64(&chosen.requests, 1) return chosen.conn, p.buildDoneFunc(chosen), nil}
choose办法对随机抉择进去的节点进行负载比拟从而最终确定抉择哪个节点
func (p *p2cPicker) choose(c1, c2 *subConn) *subConn { start := int64(timex.Now()) if c2 == nil { atomic.StoreInt64(&c1.pick, start) return c1 } if c1.load() > c2.load() { c1, c2 = c2, c1 } pick := atomic.LoadInt64(&c2.pick) if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) { return c2 } else { atomic.StoreInt64(&c1.pick, start) return c1 }}
下面次要介绍了zRPC默认负载平衡算法的设计思维和代码实现,那自定义的balancer是如何注册到gRPC的呢,resolver提供了Register办法来进行注册,同样balancer也提供了Register办法来进行注册:
func init() { balancer.Register(newBuilder())}func newBuilder() balancer.Builder { return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))}
注册balancer之后gRPC怎么晓得应用哪个balancer呢?这里咱们须要应用配置项进行配置,在NewClient的时候通过grpc.WithBalancerName办法进行配置:
func NewClient(target string, opts ...ClientOption) (*client, error) { var cli client opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name))) if err := cli.dial(target, opts...); err != nil { return nil, err } return &cli, nil}
这部分次要介绍了zRPC中内中的负载平衡算法的实现原理以及具体的实现形式,之后介绍了zRPC是如何注册自定义的balancer以及如何抉择自定义的balancer,通过这部分大家应该对负载平衡有了更进一步的意识
总结
首先,介绍了zRPC的根本应用办法,能够看到zRPC应用非常简单,只须要多数几行代码就能够构建高性能和自带服务治理能力的RPC服务,当然这里没有八面玲珑的介绍zRPC的根本应用,大家能够查看相干文档进行学习
接着,介绍了zRPC的几个重要组成模块以及其实现原理,并剖析了局部源码。拦截器模块是整个zRPC的重点,其中内置了丰盛的性能,像熔断、监控、降载等等也是构建高可用微服务必不可少的。resolver和balancer模块自定义了gRPC的resolver和balancer,通过该局部能够理解到整个服务注册与发现的原理以及如何构建本人的服务发现零碎,同时自定义负载平衡算法也变得不再神秘
最初,zRPC是一个经验过各种工程实际的RPC框架,不论是想要用于生产还是学习其中的设计模式都是一个不可多得的开源我的项目。心愿通过这篇文章的介绍大家可能进一步理解zRPC
我的项目地址
https://github.com/tal-tech/go-zero
好将来技术