介绍
上面介绍 jupiter-0.2.7 版本中 grpc 通过 etcd 实现服务发现与注册。
服务发现与注册的实现解析
服务注册
服务注册的流程图:
etcd的服务注册代码模块在 jupiter/pkg/registry/etcdv3
中。
上面让咱们来看看理论的代码
// Registry register/unregister service// registry impl should control rpc timeouttype Registry interface { RegisterService(context.Context, *server.ServiceInfo) error UnregisterService(context.Context, *server.ServiceInfo) error ListServices(context.Context, string, string) ([]*server.ServiceInfo, error) WatchServices(context.Context, string, string) (chan Endpoints, error) io.Closer}
在 pkg/registry/registry.go
中定义了注册服务对象的接口。不同的服务只有实现了这些接口,jupiter 就能应用。
首先咱们来看看注册办法
// RegisterService register service to registryfunc (reg *etcdv3Registry) RegisterService(ctx context.Context, info *server.ServiceInfo) error { err := reg.registerBiz(ctx, info) ...}// 业务信息注册func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.ServiceInfo) error { ... // 提交信息到 etcd _, err := reg.client.Put(readCtx, key, val, opOptions...) ...}
这里次要的局部是 reg.client.Put()
将服务信息提交到 etcd 中。其中的租约机制我会在之后独自写一篇文章介绍。这里次要还是关注如何注册。
源码中还有个 registerMetric()
办法,这个办法的目标是将服务信息在提交到etcd的 prometheus 前缀目录下,用于服务监控,用的也是 client.Put() 办法。这里具体就不展现代码了,感兴趣的同学能够去源码库中查看。
服务退出
// 删除服务func (reg *etcdv3Registry) unregister(ctx context.Context, key string) error { ... // 删除服务信息 _, err := reg.client.Delete(ctx, key) ...}
这里通过 client.Delete()
办法将服务信息从 etcd 中删除掉。
获取服务列表
// ListServices list service registered in registry with name `name`func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) { // 服务信息key的前缀 target := fmt.Sprintf("/%s/%s/providers/%s://", reg.Prefix, name, scheme) // 获取相干前缀的所有信息 getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix()) ...}
通过 client.Get()
办法获取到雷同前缀的服务信息。
服务信息变动监控
// WatchServices watch service change event, then return address listfunc (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) { prefix := fmt.Sprintf("/%s/%s/", reg.Prefix, name) // 通过etcd客户端创立一个监控通道 watch, err := reg.client.WatchPrefix(context.Background(), prefix) if err != nil { return nil, err } ... xgo.Go(func() { // 一直接管etcd发送过去的变动事件 for event := range watch.C() { switch event.Type { case mvccpb.PUT: updateAddrList(al, prefix, scheme, event.Kv) case mvccpb.DELETE: deleteAddrList(al, prefix, scheme, event.Kv) } out := al.DeepCopy() fmt.Printf("al => %p\n", al.Nodes) fmt.Printf("snapshot => %p\n", out.Nodes) select { // 将更新后的服务信息发送进来,接管方是 resolver case addresses <- *out: default: xlog.Warnf("invalid") } } }) // 返回一个地址通道,用于传递 return addresses, nil}
WatchServices()
办法次要是监控信息的变动事件,并将变动后的服务信息从新返回给 resolver。具体思路是通过 etcdClient.Watch()
办法创立一个监控通道,而后放入一个 goroutine来一直接管 etcd 推送过去的事件,保护本地的服务信息,并通过 resolver 最终返回到 grpclb 负载均衡器进行服务地址信息的更新。
服务发现
服务发现流程图:
grpc 的 resolver 模块定义了两个接口
// Builder creates a resolver that will be used to watch name resolution updates.type Builder interface { Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) Scheme() string}// Resolver watches for the updates on the specified target.// Updates include address updates and service config updates.type Resolver interface { ResolveNow(ResolveNowOptions) Close()}
首先咱们来看看 Builder 接口的具体实现
type baseBuilder struct { name string reg registry.Registry}// Build ...func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { endpoints, err := b.reg.WatchServices(context.Background(), target.Endpoint, "grpc") if err != nil { return nil, err } var stop = make(chan struct{}) xgo.Go(func() { for { select { case endpoint := <-endpoints: var state = resolver.State{ Addresses: make([]resolver.Address, 0), ... } for _, node := range endpoint.Nodes { ... state.Addresses = append(state.Addresses, address) } cc.UpdateState(state) case <-stop: return } } }) return &baseResolver{ stop: stop, }, nil}
这里Build 办法次要是通过 Registry 模块取得监控服务通道,而后将更新的服务信息再更新到 grpcClient 中去,保障 grpcClient 的负载均衡器的服务地址永远都是最新的。
如何将Builder的具体实现注册到 grpc 中
import "google.golang.org/grpc/resolver"// Register ...func Register(name string, reg registry.Registry) { resolver.Register(&baseBuilder{ name: name, reg: reg, })}
将 Registry模块注入到 Builder 对象中,而后注入到 grpc 的 resolver 模块中去。这样 grpcClient 在理论运行中就会调用 etcd 的服务发现性能了。
grpc 如何应用服务与发现的源码解析
这里在介绍一下jupiter框架在理论我的项目中如何应用服务发现与注册。
服务注册
func (app *Application) startServers() error { var eg errgroup.Group // start multi servers for _, s := range app.servers { s := s eg.Go(func() (err error) { _ = app.registerer.RegisterService(context.TODO(), s.Info()) defer app.registerer.UnregisterService(context.TODO(), s.Info()) ... }) } return eg.Wait()}eng := engine.NewEngine()eng.SetRegistry(compound_registry.New( etcdv3_registry.StdConfig("default").Build(),))
在框架的 Application 模块中曾经实现了服务的主动注册与删除。个别应用框架时不须要再调用。我的项目应用中只须要在创立 Application 对象时,将注册核心信息注入即可。
服务发现
// 服务发现须要初始化,拿到etcd中服务的信息func (eng *Engine) initResolver() error { resolver.Register("etcd", etcdv3.StdConfig("default").Build()) return nil}
服务发现也是类型的将注册核心信息注入即可。
**