共计 8056 个字符,预计需要花费 21 分钟才能阅读完成。
序
本文主要研究一下 nacos-sdk-go 的 NamingClient
NamingClient
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
type NamingClient struct {
nacos_client.INacosClient
hostReactor HostReactor
serviceProxy NamingProxy
subCallback SubscribeCallback
beatReactor BeatReactor
indexMap cache.ConcurrentMap
}
- NamingClient 定义了 hostReactor、serviceProxy、subCallback、beatReactor、indexMap 属性
NewNamingClient
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
func NewNamingClient(nc nacos_client.INacosClient) (NamingClient, error) {naming := NamingClient{}
clientConfig, err :=
nc.GetClientConfig()
if err != nil {return naming, err}
serverConfig, err := nc.GetServerConfig()
if err != nil {return naming, err}
httpAgent, err := nc.GetHttpAgent()
if err != nil {return naming, err}
err = logger.InitLog(clientConfig.LogDir)
if err != nil {return naming, err}
naming.subCallback = NewSubscribeCallback()
naming.serviceProxy, err = NewNamingProxy(clientConfig, serverConfig, httpAgent)
if err != nil {return naming, err}
naming.hostReactor = NewHostReactor(naming.serviceProxy, clientConfig.CacheDir+string(os.PathSeparator)+"naming",
clientConfig.UpdateThreadNum, clientConfig.NotLoadCacheAtStart, naming.subCallback, clientConfig.UpdateCacheWhenEmpty)
naming.beatReactor = NewBeatReactor(naming.serviceProxy, clientConfig.BeatInterval)
naming.indexMap = cache.NewConcurrentMap()
return naming, nil
}
- NewNamingClient 方法创建 NamingClient,并设置其 subCallback、serviceProxy、hostReactor、beatReactor、indexMap 属性
RegisterInstance
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
// 注册服务实例
func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) {
if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
instance := model.Instance{
Ip: param.Ip,
Port: param.Port,
Metadata: param.Metadata,
ClusterName: param.ClusterName,
Healthy: param.Healthy,
Enable: param.Enable,
Weight: param.Weight,
Ephemeral: param.Ephemeral,
}
beatInfo := model.BeatInfo{
Ip: param.Ip,
Port: param.Port,
Metadata: param.Metadata,
ServiceName: utils.GetGroupName(param.ServiceName, param.GroupName),
Cluster: param.ClusterName,
Weight: param.Weight,
Period: utils.GetDurationWithDefault(param.Metadata, constant.HEART_BEAT_INTERVAL, time.Second*5),
}
_, err := sc.serviceProxy.RegisterInstance(utils.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance)
if err != nil {return false, err}
if instance.Ephemeral {sc.beatReactor.AddBeatInfo(utils.GetGroupName(param.ServiceName, param.GroupName), beatInfo)
}
return true, nil
}
- RegisterInstance 方法根据 RegisterInstanceParam 构建 instance,然后通过 sc.serviceProxy.RegisterInstance(utils.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance) 去注册;如果 instance 是 Ephemeral 类型的,则执行 sc.beatReactor.AddBeatInfo(utils.GetGroupName(param.ServiceName, param.GroupName), beatInfo)
DeregisterInstance
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
// 注销服务实例
func (sc *NamingClient) DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error) {
if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
sc.beatReactor.RemoveBeatInfo(utils.GetGroupName(param.ServiceName, param.GroupName), param.Ip, param.Port)
_, err := sc.serviceProxy.DeregisterInstance(utils.GetGroupName(param.ServiceName, param.GroupName), param.Ip, param.Port, param.Cluster, param.Ephemeral)
if err != nil {return false, err}
return true, nil
}
- DeregisterInstance 方法执行 sc.beatReactor.RemoveBeatInfo 及 sc.serviceProxy.DeregisterInstance
GetService
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
// 获取服务列表
func (sc *NamingClient) GetService(param vo.GetServiceParam) (model.Service, error) {
if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
return service, nil
}
- GetService 方法通过 sc.hostReactor.GetServiceInfo 来查询 service 信息
GetAllServicesInfo
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
func (sc *NamingClient) GetAllServicesInfo(param vo.GetAllServiceInfoParam) ([]model.Service, error) {
if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
if param.NameSpace == "" {param.NameSpace = constant.DEFAULT_NAMESPACE_ID}
service := sc.hostReactor.GetAllServiceInfo(param.NameSpace, param.GroupName, strings.Join(param.Clusters, ","))
return service, nil
}
- GetAllServicesInfo 方法通过 sc.hostReactor.GetAllServiceInfo 来查询所有的 service 信息
SelectAllInstances
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
func (sc *NamingClient) SelectAllInstances(param vo.SelectAllInstancesParam) ([]model.Instance, error) {
if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
if service.Hosts == nil || len(service.Hosts) == 0 {return []model.Instance{}, errors.New("instance list is empty!")
}
return service.Hosts, nil
}
- SelectAllInstances 方法通过 sc.hostReactor.GetServiceInfo 获取 service 信息,然后返回 service.Hosts
SelectInstances
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
func (sc *NamingClient) SelectInstances(param vo.SelectInstancesParam) ([]model.Instance, error) {
if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
return sc.selectInstances(service, param.HealthyOnly)
}
func (sc *NamingClient) selectInstances(service model.Service, healthy bool) ([]model.Instance, error) {if service.Hosts == nil || len(service.Hosts) == 0 {return []model.Instance{}, errors.New("instance list is empty!")
}
hosts := service.Hosts
var result []model.Instance
for _, host := range hosts {
if host.Healthy == healthy && host.Enable && host.Weight > 0 {result = append(result, host)
}
}
return result, nil
}
- SelectInstances 方法通过 sc.hostReactor.GetServiceInfo 获取 service 信息,然后再通过 sc.selectInstances(service, param.HealthyOnly) 获取健康的 instance
SelectOneHealthyInstance
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
func (sc *NamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
return sc.selectOneHealthyInstances(service)
}
func (sc *NamingClient) selectOneHealthyInstances(service model.Service) (*model.Instance, error) {if service.Hosts == nil || len(service.Hosts) == 0 {return nil, errors.New("instance list is empty!")
}
hosts := service.Hosts
var result []model.Instance
mw := 0
for _, host := range hosts {
if host.Healthy && host.Enable && host.Weight > 0 {cw := int(math.Ceil(host.Weight))
if cw > mw {mw = cw}
result = append(result, host)
}
}
if len(result) == 0 {return nil, errors.New("healthy instance list is empty!")
}
randomInstances := random(result, mw)
key := utils.GetServiceCacheKey(service.Name, service.Clusters)
i, indexOk := sc.indexMap.Get(key)
var index int
if !indexOk {index = rand.Intn(len(randomInstances))
} else {index = i.(int)
index += 1
if index >= len(randomInstances) {index = index % len(randomInstances)
}
}
sc.indexMap.Set(key, index)
return &randomInstances[index], nil
}
- SelectOneHealthyInstance 方法通过 sc.hostReactor.GetServiceInfo 获取 service 信息,再通过 sc.selectOneHealthyInstances(service) 选择一个健康的 instance
Subscribe
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
// 服务监听
func (sc *NamingClient) Subscribe(param *vo.SubscribeParam) error {
if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
serviceParam := vo.GetServiceParam{
ServiceName: param.ServiceName,
GroupName: param.GroupName,
Clusters: param.Clusters,
}
sc.subCallback.AddCallbackFuncs(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), ¶m.SubscribeCallback)
_, err := sc.GetService(serviceParam)
if err != nil {return err}
return nil
}
- Subscribe 方法通过 sc.subCallback.AddCallbackFuncs 来注册 callback
Unsubscribe
nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go
// 取消服务监听
func (sc *NamingClient) Unsubscribe(param *vo.SubscribeParam) error {sc.subCallback.RemoveCallbackFuncs(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), ¶m.SubscribeCallback)
return nil
}
- Unsubscribe 方法则通过 sc.subCallback.RemoveCallbackFuncs 来取消 callback
小结
nacos-sdk-go 的 NamingClient 提供了 RegisterInstance、DeregisterInstance、GetService、GetAllServicesInfo、SelectAllInstances、SelectInstances、SelectOneHealthyInstance、Subscribe、Unsubscribe 方法
doc
- naming_client