组件
server
grpc的服务端首先须要三局部的组件:
- service为提供客户端响应;
- register是服务端每次启动时将本人的地址注册给注册核心;
- unregister是服务端每次终止时从注册核心革除掉本人的注册信息;
register
func Register(client *etcd3.Client, target, key, val string, stopSignal chan os.Signal) error { go func() { ticker := time.NewTicker(1 * time.Second) for { select { case <-ticker.C: ttl := 10 resp, err := client.Grant(context.Background(), int64(ttl)) if err != nil { log.Println("grant lease error ", err) } _, err = client.Get(context.Background(), key) if err != nil { if err == rpctypes.ErrKeyNotFound { if _, err = client.Put(context.Background(), key, val, etcd3.WithLease(resp.ID)); err != nil { log.Printf("put %+v in etcd error:%+v", val, err) } } else { log.Printf("get from etcd error:%+v", err) } } else { if _, err = client.Put(context.Background(), key, val, etcd3.WithLease(resp.ID)); err != nil { log.Printf("put %+v in etcd error:%+v", val, err) } } select { case <-stopSignal: return default: } } } }() return nil}
次要逻辑阐明:
首先etcd client去etcd获取该service实例的key,若获取不到,则往etcd中存入该实例的key,val,key为“服务名/本次启动的地址”,val为本次启动的地址;若获取失去,阐明该key曾经被存过了,只须要keepalive即可;留神,register是一个有限循环的协程,会一直查看以后实例的注册状况,并且是有时效性的,避免该实例意外退出没有清理掉本人的注册信息而被客户端将申请发送到该实例上。
unregister
func Unregister(client *etcd3.Client, key string) error { var err error if _, err := client.Delete(context.Background(), key); err != nil { log.Printf("grpclb: unregister '%s' failed: %s", key, err.Error()) } else { log.Printf("grpclb: unregister '%s' ok.", key) } return err}
次要逻辑:登记注册信息是在服务失常退出时被调用,将本身的注册信息从etcd中清理掉,避免之后持续承受客户端的申请。
service
func RungGRPCServer(grpcPort int16) { svcKey := fmt.Sprintf("%+v/%+v/127.0.0.1:%+v", lb.PREFIX, SERVICE_NAME, *port) svcVal := fmt.Sprintf("127.0.0.1:%+v", *port) // 启动一个grpc server grpcServer := grpc.NewServer() // 绑定服务实现 RegisterHelloWorldServiceServer client, err := etcd3.New(etcd3.Config{ Endpoints: strings.Split(ETCD_ADDR, ","), }) if err != nil { log.Fatalf("creat etcd3 client failed:%+v", err) } gs := &GreeterService{client} ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT) go func() { s := <-ch log.Printf("receive stop signal:%+v", s) lb.Unregister(gs.etcdCli, svcKey) os.Exit(1) }() lb.Register(gs.etcdCli, ETCD_ADDR, svcKey, svcVal, ch) //实现了GreeterService的所有办法能力注册给grpcServer api.RegisterGreeterServer(grpcServer, gs) // 监听端口 listen, e := net.Listen("tcp", fmt.Sprintf(":%+v", *port)) if e != nil { log.Fatal(e) } // 绑定监听端口 log.Printf("serve gRPC server: 127.0.0.1:%+v", grpcPort) if err := grpcServer.Serve(listen); err != nil { log.Printf("failed to serve: %v", err) return }}
次要逻辑:启动实例,注册到注册核心,并且起一个协程用来监听退出信息,若退出则调用登记函数登记本次启动实例的注册信息;
client
client
func main() { r := lb.NewResolver(GREETER_SERVICE) //r须要实现naming.Resolver的Resolve接口 b := grpc.RoundRobin(r) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() conn, err := grpc.DialContext(ctx, ETCD_ADDR, grpc.WithInsecure(), grpc.WithBalancer(b)) time.Sleep(5 * time.Second) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := api.NewGreeterClient(conn) name := "world" if len(os.Args) > 1 { name = os.Args[1] } ticker := time.NewTicker(1 * time.Second) for { select { case <-ticker.C: log.Println("start to request...") r, err := c.SayHello(context.Background(), &api.HelloRequest{Name: name}) if err != nil { log.Fatalf("call say hello fail: %v", err) } log.Println(r.Reply) } }}
客户端想要实现负载平衡的性能,须要在dial选项中加上grpc.WithBalancer(b)
,能够看到有两个组件:r := lb.NewResolver(GREETER_SERVICE)
,b := grpc.RoundRobin(r)
。resolver须要想要拜访服务的服务名以及从哪里去寻找服务名对应的地址;最初会将其作为参数用来初始化balancer。
resolver
type resolver struct { serviceName string}func NewResolver(serviceName string) *resolver { return &resolver{ serviceName: serviceName, }}func (r *resolver) Resolve(target string) (naming.Watcher, error) { if r.serviceName == "" { log.Println("no service name provided") return nil, fmt.Errorf("no service name provided") } client, err := etcd3.New(etcd3.Config{ Endpoints: strings.Split(target, ","), }) if err != nil { return nil, fmt.Errorf("grpclb: create etcd3 client failed: %s", err.Error()) } return &watcher{client, false, r.serviceName}, nil}
应用resolver实现了naming.Resolver接口,这个接口只有一个办法:Resolve。
type Resolver interface { // Resolve creates a Watcher for target. Resolve(target string) (Watcher, error)}
依据client给定的target返回一个watcher。watcher也是一个naming.Watcher接口,这个接口有两个办法:Next和Close。
type Watcher interface { // Next blocks until an update or error happens. It may return one or more // updates. The first call should get the full set of the results. It should // return an error if and only if Watcher cannot recover. Next() ([]*Update, error) // Close closes the Watcher. Close()}
watcher的Next办法将返回一个naming.Update数组,这个数组标识每次watcher监听到的变动:
type Update struct { // Op indicates the operation of the update. Op Operation // Addr is the updated address. It is empty string if there is no address update. Addr string // Metadata is the updated metadata. It is nil if there is no metadata update. // Metadata is not required for a custom naming implementation. Metadata interface{}}
依据变动选取正确的地址供客户端应用。
能够从grpc.balancer的源码中看到调用这个接口的形式:
func (rr *roundRobin) Start(target string, config BalancerConfig) error { rr.mu.Lock() defer rr.mu.Unlock() if rr.done { return ErrClientConnClosing } if rr.r == nil { // If there is no name resolver installed, it is not needed to // do name resolution. In this case, target is added into rr.addrs // as the only address available and rr.addrCh stays nil. rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}}) return nil } w, err := rr.r.Resolve(target) if err != nil { return err } rr.w = w rr.addrCh = make(chan []Address, 1) go func() { for { if err := rr.watchAddrUpdates(); err != nil { return } } }() return nil}
从源码能够看出,首先调用了resolver的Resolve接口,返回了一个watcher,之后在一个有限循环的协程中一直调用watcher的watchAddrUpdates办法监听变动。
所以能够看出grpc接口设计的用意:resolver的target就是用来提供给watcher,watcher应用这个target去寻址到具体的服务地址。
再看下源码中watchAddrUpdates的具体实现:
func (rr *roundRobin) watchAddrUpdates() error { updates, err := rr.w.Next() if err != nil { grpclog.Warningf("grpc: the naming watcher stops working due to %v.", err) return err } rr.mu.Lock() defer rr.mu.Unlock() for _, update := range updates { addr := Address{ Addr: update.Addr, Metadata: update.Metadata, } switch update.Op { case naming.Add: var exist bool for _, v := range rr.addrs { if addr == v.addr { exist = true grpclog.Infoln("grpc: The name resolver wanted to add an existing address: ", addr) break } } if exist { continue } rr.addrs = append(rr.addrs, &addrInfo{addr: addr}) case naming.Delete: for i, v := range rr.addrs { if addr == v.addr { copy(rr.addrs[i:], rr.addrs[i+1:]) rr.addrs = rr.addrs[:len(rr.addrs)-1] break } } default: grpclog.Errorln("Unknown update.Op ", update.Op) } } // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified. open := make([]Address, len(rr.addrs)) for i, v := range rr.addrs { open[i] = v.addr } if rr.done { return ErrClientConnClosing } select { case <-rr.addrCh: default: } rr.addrCh <- open return nil}
从源码能够看出每次调用watcher的Next办法,返回了一个update数组,而后遍历这个数组每一项,若是Add类型的事件,失去地址,将其与本身缓存的地址列表逐个比对,若已存在在缓存中则不做任何操作;否则退出到缓存的地址列表中;若是Delete类型的事件,则从缓存地址列表中删除掉该地址。最初将地址列表放入addrCh管道中,这个有点没明确用意。
因而咱们须要实现watcher的Next办法,返回update事件数组即可。
watcher
func (w *watcher) Next() ([]*naming.Update, error) { key := fmt.Sprintf("%s/%s", PREFIX, w.serviceName) baseCtx := context.TODO() log.Printf("%+v, call next method", w) if !w.init { ctx, cancel := context.WithTimeout(baseCtx, 3*time.Second) defer cancel() resp, err := w.cli.Get(ctx, key, etcd3.WithPrefix()) //log.Printf("get etcd resp:%+v", resp) if err != nil { log.Println("get from etcd error ", err) } else { w.init = true addrs := getAddrFromResp(resp) if len(addrs) != 0 { updates := make([]*naming.Update, 0, len(addrs)) for _, addr := range addrs { updates = append(updates, &naming.Update{ Op: naming.Add, Addr: addr, }) } return updates, nil } } } rch := w.cli.Watch(context.Background(), key, etcd3.WithPrefix()) for wresp := range rch { events := wresp.Events log.Printf("get etcd events:%+v", events) updates := make([]*naming.Update, 0, len(events)) for _, ev := range events { switch ev.Type { case mvccpb.PUT: updates = append(updates, &naming.Update{ Op: naming.Add, Addr: string(ev.Kv.Value), }) case mvccpb.DELETE: updates = append(updates, &naming.Update{ Op: naming.Delete, Addr: string(ev.Kv.Value), }) } } return updates, nil } return nil, nil}
第一次watcher启动时,咱们会间接从etcd中取服务名前缀的所有地址,并将每一项都放入update数组中,返回。
之后则间接监听服务名前缀key,从其管道中获取变动事件,若为PUT事件,阐明有新服务实例退出了,则将其退出到update数组中,事件类型为Add,若监听到DELETE事件,也将其退出到update数组中,事件类型为Delete。
从etcd响应中抽取地址的实现如下:
func getAddrFromResp(resp *etcd3.GetResponse) []string { results := resp.Kvs addrs := make([]string, 0, len(results)) for _, r := range results { if string(r.Value) != "" { addrs = append(addrs, string(r.Value)) } } return addrs}
演示
至此,咱们曾经实现了grpc负载平衡所须要的所有接口,启动之能够发现申请被平均的调配到了各个实例上,并且新退出的server也能很快承受到申请。
总结
不足之处:
- 本次试验所用的grpc版本是v1.26.0,grpc.RoundRobin接口目前在新版本的grpc中曾经被弃用了:
// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch// the name resolution updates and updates the addresses available correspondingly.//// Deprecated: please use package balancer/roundrobin. May be removed in a future 1.x release.func RoundRobin(r naming.Resolver) Balancer { return &roundRobin{r: r}}
之后有空再用新版balancer/roundrobin实现一版
- register接口若get到服务名前缀,能够不应用put,间接调用etcdV3的keepalive接口为key续期即可。
- watcher监听etcd管道,每次失去数据后间接返回,敞开了管道,有失落数据的危险,此处能够起一个后盾协程,每次将数据送入管道,主协程从管道取数据,防止etcd resp管道的频繁敞开操作。
参考文章
https://segmentfault.com/a/11...