有了服务注册和发现机制,消费者不须要晓得具体服务提供者的实在物理地址就能够进行调用,也毋庸晓得具体有多少个服务者可用;而服务提供者只须要注册到注册核心,就能够对外提供服务,在对外服务时不须要晓得具体是哪些服务调用了本人。

RPC 配置

Etcd:  Hosts:  - 127.0.0.1:2379  Key: user.rpc
  • 这里剖析go-zeroetcd局部源码, 源码援用https://github.com/zeromicro/go-zero-demo/tree/master/mall

被调方-服务注册

  • mall/user/rpc/user.go 源码如下
package mainimport (    "flag"    "fmt"    "go-zero-demo-rpc/mall/user/rpc/internal/config"    "go-zero-demo-rpc/mall/user/rpc/internal/server"    "go-zero-demo-rpc/mall/user/rpc/internal/svc"    "go-zero-demo-rpc/mall/user/rpc/types/user"    "github.com/zeromicro/go-zero/core/conf"    "github.com/zeromicro/go-zero/core/service"    "github.com/zeromicro/go-zero/zrpc"    "google.golang.org/grpc"    "google.golang.org/grpc/reflection")var configFile = flag.String("f", "etc/user.yaml", "the config file")func main() {    flag.Parse()        var c config.Config    conf.MustLoad(*configFile, &c)    ctx := svc.NewServiceContext(c)    svr := server.NewUserServer(ctx)        s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {        user.RegisterUserServer(grpcServer, svr)            if c.Mode == service.DevMode || c.Mode == service.TestMode {            reflection.Register(grpcServer)        }    })    defer s.Stop()        fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)    s.Start()}
  • MustNewServer外部实现调用了NewServer办法, 这里咱们关注NewServer通过internal.NewRpcPubServer办法实例化了internal.Server
if c.HasEtcd() {    server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, serverOptions...)    if err != nil {        return nil, err    }}
  • internal.NewRpcPubServer中的registerEtcd会调用Publisher.KeepAlive办法
// KeepAlive keeps key:value alive.func (p *Publisher) KeepAlive() error {    // 这里获取 etcd 的连贯    cli, err := internal.GetRegistry().GetConn(p.endpoints)    if err != nil {        return err    }        p.lease, err = p.register(cli)    if err != nil {        return err    }        proc.AddWrapUpListener(func() {        p.Stop()    })        return p.keepAliveAsync(cli)}
  • p.register这里把本人注册到服务中
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {    // 这里新建一个租约    resp, err := client.Grant(client.Ctx(), TimeToLive)    if err != nil {        return clientv3.NoLease, err    }        // 失去租约的 ID    lease := resp.ID        // 这里拼接出理论存储的 key    if p.id > 0 {        p.fullKey = makeEtcdKey(p.key, p.id)    } else {        p.fullKey = makeEtcdKey(p.key, int64(lease))    }    // p.value 是后面的 figureOutListenOn 办法获取到本人的地址    _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))    return lease, err}
  • 注册完之后, keepAliveAsync开了一个协程保活这个服务
  • 当这个服务意外宕机时, 就不会再向etcd保活, etcd就会删除这个key
  • 注册好的服务如图

调用方-服务发现

  • order/api/order.go 源码如下
package mainimport (    "flag"    "fmt"    "go-zero-demo-rpc/order/api/internal/config"    "go-zero-demo-rpc/order/api/internal/handler"    "go-zero-demo-rpc/order/api/internal/svc"    "github.com/zeromicro/go-zero/core/conf"    "github.com/zeromicro/go-zero/rest")var configFile = flag.String("f", "etc/order.yaml", "the config file")func main() {    flag.Parse()    var c config.Config    conf.MustLoad(*configFile, &c)    server := rest.MustNewServer(c.RestConf)    defer server.Stop()    ctx := svc.NewServiceContext(c)    handler.RegisterHandlers(server, ctx)    fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)    server.Start()}
  • svc.NewServiceContext办法外部又调用了zrpc.MustNewClient, zrpc.MustNewClient次要实现在zrpc.NewClient

    func NewServiceContext(c config.Config) *ServiceContext {  return &ServiceContext{      Config:  c,      UserRpc: user.NewUser(zrpc.MustNewClient(c.UserRpc)),  }}
  • 最初理论调用了internal.NewClient去实例化rpc client
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {    var opts []ClientOption    if c.HasCredential() {        opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{            App:   c.App,            Token: c.Token,        })))    }    if c.NonBlock {        opts = append(opts, WithNonBlock())    }    if c.Timeout > 0 {        opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))    }    opts = append(opts, options...)    target, err := c.BuildTarget()    if err != nil {        return nil, err    }    client, err := internal.NewClient(target, opts...)    if err != nil {        return nil, err    }    return &RpcClient{        client: client,    }, nil}
  • zrpc/internal/client.go文件里, 蕴含一个init办法, 这里就是理论发现服务的中央, 在这里注册服务发现者
func init() {    resolver.Register()}
  • resolver.Register办法实现
package resolverimport (    "github.com/zeromicro/go-zero/zrpc/resolver/internal")// Register registers schemes defined zrpc.// Keep it in a separated package to let third party register manually.func Register() {    internal.RegisterResolver()}
  • 最初又回到interval包的internal.RegisterResolver办法, 这里咱们关注etcdResolverBuilder
func RegisterResolver() {    resolver.Register(&directResolverBuilder)    resolver.Register(&discovResolverBuilder)    resolver.Register(&etcdResolverBuilder)    resolver.Register(&k8sResolverBuilder)}
  • etcdBuilder的内嵌了discovBuilder构造体,

    • Build办法调用过程:
    • 实例化服务端: internal.NewClient->client.dial->grpc.DialContext
    • 因为etcdresolver.BuildDiscovTarget生成的taget所以是相似这样子的: discov://127.0.0.1:2379/user.rpc
    • 解析服务发现:ClientConn.parseTargetAndFindResolver->grpc.parseTarget->ClientConn.getResolver
    • 而后在grpc.newCCResolverWrapper调用resolver.Builder.Build办法去发现服务
  • 咱们着重关注discovBuilder.Build办法
type etcdBuilder struct {    discovBuilder}type discovBuilder struct{}func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (    resolver.Resolver, error) {    hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {        return r == EndpointSepChar    })    sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))    if err != nil {        return nil, err    }    update := func() {        var addrs []resolver.Address        for _, val := range subset(sub.Values(), subsetSize) {            addrs = append(addrs, resolver.Address{                Addr: val,            })        }        if err := cc.UpdateState(resolver.State{            Addresses: addrs,        }); err != nil {            logx.Error(err)        }    }    sub.AddListener(update)    update()    return &nopResolver{cc: cc}, nil}func (b *discovBuilder) Scheme() string {    return DiscovScheme}
  • discov.NewSubscriber办法调用internal.GetRegistry().Monitor最初调用Registry.monitor办法进行监督

    • cluster.getClient拿到etcd连贯
    • cluster.load作为第一次载入数据
    • cluster.watchwatch监听etcd前缀key的改变
func (c *cluster) monitor(key string, l UpdateListener) error {    c.lock.Lock()    c.listeners[key] = append(c.listeners[key], l)    c.lock.Unlock()    cli, err := c.getClient()    if err != nil {        return err    }    c.load(cli, key)    c.watchGroup.Run(func() {        c.watch(cli, key)    })    return nil}
  • 如下图是cluster.load的实现, 就是依据前缀拿到user.prc服务注册的所有地址

Q

  • 为什么不必Redis做注册核心(反正只是把被调方的地址存储, 过期 Redis也能胜任), 找了很久找到这个说法

    简略从以下几个方面说一下瑞迪斯为啥在微服务中不能取代 etcd:

    1、redis 没有版本的概念,历史版本数据在大规模微服务中十分有必要,对于状态回滚和故障排查,甚至定锅都很重要

    2、redis 的注册和发现目前只能通过 pub 和 sub 来实现,这两个命令齐全不能满足生产环境的要求,具体起因能够 gg 或看源码实现

    3、etcd 在 2.+版本时,watch 到数据官网文档均倡议再 get 一次,因为会存在数据提早,3.+版本不再须要,可想 redis 的 pub 和 sub 是否达到此种低提早的要求

    4、楼主看到的微服务架构应该都是将 etcd 间接裸露给 client 和 server 的,etcd 的性能摆在那,可能接受多少的 c/s 直连呢,更好的做法应该是对 etcd 做一层爱护,当然这种做法会损失一些性能

    5、redis 和 etcd 的集群实现计划是不统一的,etcd 采纳的是 raft 协定,一主多从,只能写主,底层采纳 boltdb 作为 k/v 存储,间接落盘

    6、redis 的长久化计划有 aof 和 rdb,这两种计划在宕机的时候都或多或少的会失落数据

  • 援用自 https://www.v2ex.com/t/520367

原文链接 https://www.shiguopeng.cn/posts/2022061518/