共计 8490 个字符,预计需要花费 22 分钟才能阅读完成。
组件
server
grpc 的服务端首先须要三局部的组件:
- service 为提供客户端响应;
- register 是服务端每次启动时将本人的地址注册给注册核心;
- unregister 是服务端每次终止时从注册核心革除掉本人的注册信息;
register
func Register(client *etcd3.Client, target, key, val string, stopSignal chan os.Signal) error {go func() {ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
ttl := 10
resp, err := client.Grant(context.Background(), int64(ttl))
if err != nil {log.Println("grant lease error", err)
}
_, err = client.Get(context.Background(), key)
if err != nil {
if err == rpctypes.ErrKeyNotFound {if _, err = client.Put(context.Background(), key, val, etcd3.WithLease(resp.ID)); err != nil {log.Printf("put %+v in etcd error:%+v", val, err)
}
} else {log.Printf("get from etcd error:%+v", err)
}
} else {if _, err = client.Put(context.Background(), key, val, etcd3.WithLease(resp.ID)); err != nil {log.Printf("put %+v in etcd error:%+v", val, err)
}
}
select {
case <-stopSignal:
return
default:
}
}
}
}()
return nil
}
次要逻辑阐明:
首先 etcd client 去 etcd 获取该 service 实例的 key,若获取不到,则往 etcd 中存入该实例的 key,val,key 为“服务名 / 本次启动的地址”,val 为本次启动的地址;若获取失去,阐明该 key 曾经被存过了,只须要 keepalive 即可;留神,register 是一个有限循环的协程,会一直查看以后实例的注册状况,并且是有时效性的,避免该实例意外退出没有清理掉本人的注册信息而被客户端将申请发送到该实例上。
unregister
func Unregister(client *etcd3.Client, key string) error {
var err error
if _, err := client.Delete(context.Background(), key); err != nil {log.Printf("grpclb: unregister'%s'failed: %s", key, err.Error())
} else {log.Printf("grpclb: unregister'%s'ok.", key)
}
return err
}
次要逻辑:登记注册信息是在服务失常退出时被调用,将本身的注册信息从 etcd 中清理掉,避免之后持续承受客户端的申请。
service
func RungGRPCServer(grpcPort int16) {svcKey := fmt.Sprintf("%+v/%+v/127.0.0.1:%+v", lb.PREFIX, SERVICE_NAME, *port)
svcVal := fmt.Sprintf("127.0.0.1:%+v", *port)
// 启动一个 grpc server
grpcServer := grpc.NewServer()
// 绑定服务实现 RegisterHelloWorldServiceServer
client, err := etcd3.New(etcd3.Config{Endpoints: strings.Split(ETCD_ADDR, ","),
})
if err != nil {log.Fatalf("creat etcd3 client failed:%+v", err)
}
gs := &GreeterService{client}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
s := <-ch
log.Printf("receive stop signal:%+v", s)
lb.Unregister(gs.etcdCli, svcKey)
os.Exit(1)
}()
lb.Register(gs.etcdCli, ETCD_ADDR, svcKey, svcVal, ch)
// 实现了 GreeterService 的所有办法能力注册给 grpcServer
api.RegisterGreeterServer(grpcServer, gs)
// 监听端口
listen, e := net.Listen("tcp", fmt.Sprintf(":%+v", *port))
if e != nil {log.Fatal(e)
}
// 绑定监听端口
log.Printf("serve gRPC server: 127.0.0.1:%+v", grpcPort)
if err := grpcServer.Serve(listen); err != nil {log.Printf("failed to serve: %v", err)
return
}
}
次要逻辑:启动实例,注册到注册核心,并且起一个协程用来监听退出信息,若退出则调用登记函数登记本次启动实例的注册信息;
client
client
func main() {r := lb.NewResolver(GREETER_SERVICE)
// r 须要实现 naming.Resolver 的 Resolve 接口
b := grpc.RoundRobin(r)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, ETCD_ADDR, grpc.WithInsecure(), grpc.WithBalancer(b))
time.Sleep(5 * time.Second)
if err != nil {log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := api.NewGreeterClient(conn)
name := "world"
if len(os.Args) > 1 {name = os.Args[1]
}
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
log.Println("start to request...")
r, err := c.SayHello(context.Background(), &api.HelloRequest{Name: name})
if err != nil {log.Fatalf("call say hello fail: %v", err)
}
log.Println(r.Reply)
}
}
}
客户端想要实现负载平衡的性能,须要在 dial 选项中加上 grpc.WithBalancer(b)
,能够看到有两个组件:r := lb.NewResolver(GREETER_SERVICE)
,b := grpc.RoundRobin(r)
。resolver 须要想要拜访服务的服务名以及从哪里去寻找服务名对应的地址;最初会将其作为参数用来初始化 balancer。
resolver
type resolver struct {serviceName string}
func NewResolver(serviceName string) *resolver {
return &resolver{serviceName: serviceName,}
}
func (r *resolver) Resolve(target string) (naming.Watcher, error) {
if r.serviceName == "" {log.Println("no service name provided")
return nil, fmt.Errorf("no service name provided")
}
client, err := etcd3.New(etcd3.Config{Endpoints: strings.Split(target, ","),
})
if err != nil {return nil, fmt.Errorf("grpclb: create etcd3 client failed: %s", err.Error())
}
return &watcher{client, false, r.serviceName}, nil
}
应用 resolver 实现了 naming.Resolver 接口,这个接口只有一个办法:Resolve。
type Resolver interface {
// Resolve creates a Watcher for target.
Resolve(target string) (Watcher, error)
}
依据 client 给定的 target 返回一个 watcher。watcher 也是一个 naming.Watcher 接口,这个接口有两个办法:Next 和 Close。
type Watcher interface {
// Next blocks until an update or error happens. It may return one or more
// updates. The first call should get the full set of the results. It should
// return an error if and only if Watcher cannot recover.
Next() ([]*Update, error)
// Close closes the Watcher.
Close()}
watcher 的 Next 办法将返回一个 naming.Update 数组,这个数组标识每次 watcher 监听到的变动:
type Update struct {
// Op indicates the operation of the update.
Op Operation
// Addr is the updated address. It is empty string if there is no address update.
Addr string
// Metadata is the updated metadata. It is nil if there is no metadata update.
// Metadata is not required for a custom naming implementation.
Metadata interface{}}
依据变动选取正确的地址供客户端应用。
能够从 grpc.balancer 的源码中看到调用这个接口的形式:
func (rr *roundRobin) Start(target string, config BalancerConfig) error {rr.mu.Lock()
defer rr.mu.Unlock()
if rr.done {return ErrClientConnClosing}
if rr.r == nil {
// If there is no name resolver installed, it is not needed to
// do name resolution. In this case, target is added into rr.addrs
// as the only address available and rr.addrCh stays nil.
rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
return nil
}
w, err := rr.r.Resolve(target)
if err != nil {return err}
rr.w = w
rr.addrCh = make(chan []Address, 1)
go func() {
for {if err := rr.watchAddrUpdates(); err != nil {return}
}
}()
return nil
}
从源码能够看出,首先调用了 resolver 的 Resolve 接口,返回了一个 watcher,之后在一个有限循环的协程中一直调用 watcher 的 watchAddrUpdates 办法监听变动。
所以能够看出 grpc 接口设计的用意:resolver 的 target 就是用来提供给 watcher,watcher 应用这个 target 去寻址到具体的服务地址。
再看下源码中 watchAddrUpdates 的具体实现:
func (rr *roundRobin) watchAddrUpdates() error {updates, err := rr.w.Next()
if err != nil {grpclog.Warningf("grpc: the naming watcher stops working due to %v.", err)
return err
}
rr.mu.Lock()
defer rr.mu.Unlock()
for _, update := range updates {
addr := Address{
Addr: update.Addr,
Metadata: update.Metadata,
}
switch update.Op {
case naming.Add:
var exist bool
for _, v := range rr.addrs {
if addr == v.addr {
exist = true
grpclog.Infoln("grpc: The name resolver wanted to add an existing address:", addr)
break
}
}
if exist {continue}
rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
case naming.Delete:
for i, v := range rr.addrs {
if addr == v.addr {copy(rr.addrs[i:], rr.addrs[i+1:])
rr.addrs = rr.addrs[:len(rr.addrs)-1]
break
}
}
default:
grpclog.Errorln("Unknown update.Op", update.Op)
}
}
// Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified.
open := make([]Address, len(rr.addrs))
for i, v := range rr.addrs {open[i] = v.addr
}
if rr.done {return ErrClientConnClosing}
select {
case <-rr.addrCh:
default:
}
rr.addrCh <- open
return nil
}
从源码能够看出每次调用 watcher 的 Next 办法,返回了一个 update 数组,而后遍历这个数组每一项,若是 Add 类型的事件,失去地址,将其与本身缓存的地址列表逐个比对,若已存在在缓存中则不做任何操作;否则退出到缓存的地址列表中;若是 Delete 类型的事件,则从缓存地址列表中删除掉该地址。 最初将地址列表放入 addrCh 管道中,这个有点没明确用意 。
因而咱们须要实现 watcher 的 Next 办法,返回 update 事件数组即可。
watcher
func (w *watcher) Next() ([]*naming.Update, error) {key := fmt.Sprintf("%s/%s", PREFIX, w.serviceName)
baseCtx := context.TODO()
log.Printf("%+v, call next method", w)
if !w.init {ctx, cancel := context.WithTimeout(baseCtx, 3*time.Second)
defer cancel()
resp, err := w.cli.Get(ctx, key, etcd3.WithPrefix())
//log.Printf("get etcd resp:%+v", resp)
if err != nil {log.Println("get from etcd error", err)
} else {
w.init = true
addrs := getAddrFromResp(resp)
if len(addrs) != 0 {updates := make([]*naming.Update, 0, len(addrs))
for _, addr := range addrs {
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: addr,
})
}
return updates, nil
}
}
}
rch := w.cli.Watch(context.Background(), key, etcd3.WithPrefix())
for wresp := range rch {
events := wresp.Events
log.Printf("get etcd events:%+v", events)
updates := make([]*naming.Update, 0, len(events))
for _, ev := range events {
switch ev.Type {
case mvccpb.PUT:
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: string(ev.Kv.Value),
})
case mvccpb.DELETE:
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: string(ev.Kv.Value),
})
}
}
return updates, nil
}
return nil, nil
}
第一次 watcher 启动时,咱们会间接从 etcd 中取服务名前缀的所有地址,并将每一项都放入 update 数组中,返回。
之后则间接监听服务名前缀 key,从其管道中获取变动事件,若为 PUT 事件,阐明有新服务实例退出了,则将其退出到 update 数组中,事件类型为 Add,若监听到 DELETE 事件,也将其退出到 update 数组中,事件类型为 Delete。
从 etcd 响应中抽取地址的实现如下:
func getAddrFromResp(resp *etcd3.GetResponse) []string {
results := resp.Kvs
addrs := make([]string, 0, len(results))
for _, r := range results {if string(r.Value) != "" {addrs = append(addrs, string(r.Value))
}
}
return addrs
}
演示
至此,咱们曾经实现了 grpc 负载平衡所须要的所有接口,启动之能够发现申请被平均的调配到了各个实例上,并且新退出的 server 也能很快承受到申请。
总结
不足之处:
- 本次试验所用的 grpc 版本是 v1.26.0,grpc.RoundRobin 接口目前在新版本的 grpc 中曾经被弃用了:
// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch
// the name resolution updates and updates the addresses available correspondingly.
//
// Deprecated: please use package balancer/roundrobin. May be removed in a future 1.x release.
func RoundRobin(r naming.Resolver) Balancer {return &roundRobin{r: r}
}
之后有空再用新版 balancer/roundrobin 实现一版
- register 接口若 get 到服务名前缀,能够不应用 put,间接调用 etcdV3 的 keepalive 接口为 key 续期即可。
- watcher 监听 etcd 管道,每次失去数据后间接返回,敞开了管道,有失落数据的危险,此处能够起一个后盾协程,每次将数据送入管道,主协程从管道取数据,防止 etcd resp 管道的频繁敞开操作。
参考文章
https://segmentfault.com/a/11…