共计 6218 个字符,预计需要花费 16 分钟才能阅读完成。
有了服务注册和发现机制,消费者不须要晓得具体服务提供者的实在物理地址就能够进行调用,也毋庸晓得具体有多少个服务者可用;而服务提供者只须要注册到注册核心,就能够对外提供服务,在对外服务时不须要晓得具体是哪些服务调用了本人。
RPC 配置
Etcd:
Hosts:
- 127.0.0.1:2379
Key: user.rpc
- 这里剖析
go-zero
的etcd
局部源码, 源码援用 https://github.com/zeromicro/go-zero-demo/tree/master/mall
被调方 - 服务注册
mall/user/rpc/user.go
源码如下
package main
import (
"flag"
"fmt"
"go-zero-demo-rpc/mall/user/rpc/internal/config"
"go-zero-demo-rpc/mall/user/rpc/internal/server"
"go-zero-demo-rpc/mall/user/rpc/internal/svc"
"go-zero-demo-rpc/mall/user/rpc/types/user"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
var configFile = flag.String("f", "etc/user.yaml", "the config file")
func main() {flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
svr := server.NewUserServer(ctx)
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {user.RegisterUserServer(grpcServer, svr)
if c.Mode == service.DevMode || c.Mode == service.TestMode {reflection.Register(grpcServer)
}
})
defer s.Stop()
fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
s.Start()}
MustNewServer
外部实现调用了NewServer
办法, 这里咱们关注NewServer
通过internal.NewRpcPubServer
办法实例化了internal.Server
if c.HasEtcd() {server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, serverOptions...)
if err != nil {return nil, err}
}
internal.NewRpcPubServer
中的registerEtcd
会调用Publisher.KeepAlive
办法
// KeepAlive keeps key:value alive.
func (p *Publisher) KeepAlive() error {
// 这里获取 etcd 的连贯
cli, err := internal.GetRegistry().GetConn(p.endpoints)
if err != nil {return err}
p.lease, err = p.register(cli)
if err != nil {return err}
proc.AddWrapUpListener(func() {p.Stop()
})
return p.keepAliveAsync(cli)
}
p.register
这里把本人注册到服务中
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
// 这里新建一个租约
resp, err := client.Grant(client.Ctx(), TimeToLive)
if err != nil {return clientv3.NoLease, err}
// 失去租约的 ID
lease := resp.ID
// 这里拼接出理论存储的 key
if p.id > 0 {p.fullKey = makeEtcdKey(p.key, p.id)
} else {p.fullKey = makeEtcdKey(p.key, int64(lease))
}
// p.value 是后面的 figureOutListenOn 办法获取到本人的地址
_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
return lease, err
}
- 注册完之后,
keepAliveAsync
开了一个协程保活这个服务 - 当这个服务意外宕机时, 就不会再向
etcd
保活,etcd
就会删除这个key
- 注册好的服务如图
调用方 - 服务发现
order/api/order.go
源码如下
package main
import (
"flag"
"fmt"
"go-zero-demo-rpc/order/api/internal/config"
"go-zero-demo-rpc/order/api/internal/handler"
"go-zero-demo-rpc/order/api/internal/svc"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/rest"
)
var configFile = flag.String("f", "etc/order.yaml", "the config file")
func main() {flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
server := rest.MustNewServer(c.RestConf)
defer server.Stop()
ctx := svc.NewServiceContext(c)
handler.RegisterHandlers(server, ctx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start()}
-
在
svc.NewServiceContext
办法外部又调用了zrpc.MustNewClient
,zrpc.MustNewClient
次要实现在zrpc.NewClient
func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, UserRpc: user.NewUser(zrpc.MustNewClient(c.UserRpc)), } }
- 最初理论调用了
internal.NewClient
去实例化rpc client
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {var opts []ClientOption
if c.HasCredential() {
opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
App: c.App,
Token: c.Token,
})))
}
if c.NonBlock {opts = append(opts, WithNonBlock())
}
if c.Timeout > 0 {opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
}
opts = append(opts, options...)
target, err := c.BuildTarget()
if err != nil {return nil, err}
client, err := internal.NewClient(target, opts...)
if err != nil {return nil, err}
return &RpcClient{client: client,}, nil
}
- 在
zrpc/internal/client.go
文件里, 蕴含一个init
办法, 这里就是理论发现服务的中央, 在这里注册服务发现者
func init() {resolver.Register()
}
resolver.Register
办法实现
package resolver
import ("github.com/zeromicro/go-zero/zrpc/resolver/internal")
// Register registers schemes defined zrpc.
// Keep it in a separated package to let third party register manually.
func Register() {internal.RegisterResolver()
}
- 最初又回到
interval
包的internal.RegisterResolver
办法, 这里咱们关注etcdResolverBuilder
func RegisterResolver() {resolver.Register(&directResolverBuilder)
resolver.Register(&discovResolverBuilder)
resolver.Register(&etcdResolverBuilder)
resolver.Register(&k8sResolverBuilder)
}
-
etcdBuilder
的内嵌了discovBuilder
构造体,Build
办法调用过程:- 实例化服务端:
internal.NewClient
->client.dial
->grpc.DialContext
- 因为
etcd
是resolver.BuildDiscovTarget
生成的taget
所以是相似这样子的:discov://127.0.0.1:2379/user.rpc
- 解析服务发现:
ClientConn.parseTargetAndFindResolver
->grpc.parseTarget
->ClientConn.getResolver
- 而后在
grpc.newCCResolverWrapper
调用resolver.Builder.Build
办法去发现服务
- 咱们着重关注
discovBuilder.Build
办法
type etcdBuilder struct {discovBuilder}
type discovBuilder struct{}
func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {return r == EndpointSepChar})
sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))
if err != nil {return nil, err}
update := func() {var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{Addr: val,})
}
if err := cc.UpdateState(resolver.State{Addresses: addrs,}); err != nil {logx.Error(err)
}
}
sub.AddListener(update)
update()
return &nopResolver{cc: cc}, nil
}
func (b *discovBuilder) Scheme() string {return DiscovScheme}
-
discov.NewSubscriber
办法调用internal.GetRegistry().Monitor
最初调用Registry.monitor
办法进行监督cluster.getClient
拿到etcd
连贯cluster.load
作为第一次载入数据cluster.watch
去watch
监听etcd
前缀key
的改变
func (c *cluster) monitor(key string, l UpdateListener) error {c.lock.Lock()
c.listeners[key] = append(c.listeners[key], l)
c.lock.Unlock()
cli, err := c.getClient()
if err != nil {return err}
c.load(cli, key)
c.watchGroup.Run(func() {c.watch(cli, key)
})
return nil
}
- 如下图是
cluster.load
的实现, 就是依据前缀拿到user.prc
服务注册的所有地址
Q
-
为什么不必
Redis
做注册核心 (反正只是把被调方的地址存储, 过期Redis
也能胜任), 找了很久找到这个说法简略从以下几个方面说一下瑞迪斯为啥在微服务中不能取代 etcd:
1、redis 没有版本的概念,历史版本数据在大规模微服务中十分有必要,对于状态回滚和故障排查,甚至定锅都很重要
2、redis 的注册和发现目前只能通过 pub 和 sub 来实现,这两个命令齐全不能满足生产环境的要求,具体起因能够 gg 或看源码实现
3、etcd 在 2.+ 版本时,watch 到数据官网文档均倡议再 get 一次,因为会存在数据提早,3.+ 版本不再须要,可想 redis 的 pub 和 sub 是否达到此种低提早的要求
4、楼主看到的微服务架构应该都是将 etcd 间接裸露给 client 和 server 的,etcd 的性能摆在那,可能接受多少的 c/s 直连呢,更好的做法应该是对 etcd 做一层爱护,当然这种做法会损失一些性能
5、redis 和 etcd 的集群实现计划是不统一的,etcd 采纳的是 raft 协定,一主多从,只能写主,底层采纳 boltdb 作为 k/v 存储,间接落盘
6、redis 的长久化计划有 aof 和 rdb,这两种计划在宕机的时候都或多或少的会失落数据
- 援用自 https://www.v2ex.com/t/520367
原文链接 https://www.shiguopeng.cn/posts/2022061518/