乐趣区

聊聊nacossdkgo的NamingClient

本文主要研究一下 nacos-sdk-go 的 NamingClient

NamingClient

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

type NamingClient struct {
    nacos_client.INacosClient
    hostReactor  HostReactor
    serviceProxy NamingProxy
    subCallback  SubscribeCallback
    beatReactor  BeatReactor
    indexMap     cache.ConcurrentMap
}
  • NamingClient 定义了 hostReactor、serviceProxy、subCallback、beatReactor、indexMap 属性

NewNamingClient

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

func NewNamingClient(nc nacos_client.INacosClient) (NamingClient, error) {naming := NamingClient{}
    clientConfig, err :=
        nc.GetClientConfig()
    if err != nil {return naming, err}
    serverConfig, err := nc.GetServerConfig()
    if err != nil {return naming, err}
    httpAgent, err := nc.GetHttpAgent()
    if err != nil {return naming, err}
    err = logger.InitLog(clientConfig.LogDir)
    if err != nil {return naming, err}
    naming.subCallback = NewSubscribeCallback()
    naming.serviceProxy, err = NewNamingProxy(clientConfig, serverConfig, httpAgent)
    if err != nil {return naming, err}
    naming.hostReactor = NewHostReactor(naming.serviceProxy, clientConfig.CacheDir+string(os.PathSeparator)+"naming",
        clientConfig.UpdateThreadNum, clientConfig.NotLoadCacheAtStart, naming.subCallback, clientConfig.UpdateCacheWhenEmpty)
    naming.beatReactor = NewBeatReactor(naming.serviceProxy, clientConfig.BeatInterval)
    naming.indexMap = cache.NewConcurrentMap()

    return naming, nil
}
  • NewNamingClient 方法创建 NamingClient,并设置其 subCallback、serviceProxy、hostReactor、beatReactor、indexMap 属性

RegisterInstance

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

// 注册服务实例
func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) {
    if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
    instance := model.Instance{
        Ip:          param.Ip,
        Port:        param.Port,
        Metadata:    param.Metadata,
        ClusterName: param.ClusterName,
        Healthy:     param.Healthy,
        Enable:      param.Enable,
        Weight:      param.Weight,
        Ephemeral:   param.Ephemeral,
    }
    beatInfo := model.BeatInfo{
        Ip:          param.Ip,
        Port:        param.Port,
        Metadata:    param.Metadata,
        ServiceName: utils.GetGroupName(param.ServiceName, param.GroupName),
        Cluster:     param.ClusterName,
        Weight:      param.Weight,
        Period:      utils.GetDurationWithDefault(param.Metadata, constant.HEART_BEAT_INTERVAL, time.Second*5),
    }
    _, err := sc.serviceProxy.RegisterInstance(utils.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance)
    if err != nil {return false, err}
    if instance.Ephemeral {sc.beatReactor.AddBeatInfo(utils.GetGroupName(param.ServiceName, param.GroupName), beatInfo)
    }
    return true, nil

}
  • RegisterInstance 方法根据 RegisterInstanceParam 构建 instance,然后通过 sc.serviceProxy.RegisterInstance(utils.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance) 去注册;如果 instance 是 Ephemeral 类型的,则执行 sc.beatReactor.AddBeatInfo(utils.GetGroupName(param.ServiceName, param.GroupName), beatInfo)

DeregisterInstance

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

// 注销服务实例
func (sc *NamingClient) DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error) {
    if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
    sc.beatReactor.RemoveBeatInfo(utils.GetGroupName(param.ServiceName, param.GroupName), param.Ip, param.Port)

    _, err := sc.serviceProxy.DeregisterInstance(utils.GetGroupName(param.ServiceName, param.GroupName), param.Ip, param.Port, param.Cluster, param.Ephemeral)
    if err != nil {return false, err}
    return true, nil
}
  • DeregisterInstance 方法执行 sc.beatReactor.RemoveBeatInfo 及 sc.serviceProxy.DeregisterInstance

GetService

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

// 获取服务列表
func (sc *NamingClient) GetService(param vo.GetServiceParam) (model.Service, error) {
    if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
    service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
    return service, nil
}
  • GetService 方法通过 sc.hostReactor.GetServiceInfo 来查询 service 信息

GetAllServicesInfo

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

func (sc *NamingClient) GetAllServicesInfo(param vo.GetAllServiceInfoParam) ([]model.Service, error) {
    if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
    if param.NameSpace == "" {param.NameSpace = constant.DEFAULT_NAMESPACE_ID}
    service := sc.hostReactor.GetAllServiceInfo(param.NameSpace, param.GroupName, strings.Join(param.Clusters, ","))
    return service, nil
}
  • GetAllServicesInfo 方法通过 sc.hostReactor.GetAllServiceInfo 来查询所有的 service 信息

SelectAllInstances

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

func (sc *NamingClient) SelectAllInstances(param vo.SelectAllInstancesParam) ([]model.Instance, error) {
    if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
    service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
    if service.Hosts == nil || len(service.Hosts) == 0 {return []model.Instance{}, errors.New("instance list is empty!")
    }
    return service.Hosts, nil
}
  • SelectAllInstances 方法通过 sc.hostReactor.GetServiceInfo 获取 service 信息,然后返回 service.Hosts

SelectInstances

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

func (sc *NamingClient) SelectInstances(param vo.SelectInstancesParam) ([]model.Instance, error) {
    if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
    service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
    return sc.selectInstances(service, param.HealthyOnly)
}

func (sc *NamingClient) selectInstances(service model.Service, healthy bool) ([]model.Instance, error) {if service.Hosts == nil || len(service.Hosts) == 0 {return []model.Instance{}, errors.New("instance list is empty!")
    }
    hosts := service.Hosts
    var result []model.Instance
    for _, host := range hosts {
        if host.Healthy == healthy && host.Enable && host.Weight > 0 {result = append(result, host)
        }
    }
    return result, nil
}
  • SelectInstances 方法通过 sc.hostReactor.GetServiceInfo 获取 service 信息,然后再通过 sc.selectInstances(service, param.HealthyOnly) 获取健康的 instance

SelectOneHealthyInstance

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

func (sc *NamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
    if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
    service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
    return sc.selectOneHealthyInstances(service)
}

func (sc *NamingClient) selectOneHealthyInstances(service model.Service) (*model.Instance, error) {if service.Hosts == nil || len(service.Hosts) == 0 {return nil, errors.New("instance list is empty!")
    }
    hosts := service.Hosts
    var result []model.Instance
    mw := 0
    for _, host := range hosts {
        if host.Healthy && host.Enable && host.Weight > 0 {cw := int(math.Ceil(host.Weight))
            if cw > mw {mw = cw}
            result = append(result, host)
        }
    }
    if len(result) == 0 {return nil, errors.New("healthy instance list is empty!")
    }

    randomInstances := random(result, mw)
    key := utils.GetServiceCacheKey(service.Name, service.Clusters)
    i, indexOk := sc.indexMap.Get(key)
    var index int

    if !indexOk {index = rand.Intn(len(randomInstances))
    } else {index = i.(int)
        index += 1
        if index >= len(randomInstances) {index = index % len(randomInstances)
        }
    }

    sc.indexMap.Set(key, index)
    return &randomInstances[index], nil
}
  • SelectOneHealthyInstance 方法通过 sc.hostReactor.GetServiceInfo 获取 service 信息,再通过 sc.selectOneHealthyInstances(service) 选择一个健康的 instance

Subscribe

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

// 服务监听
func (sc *NamingClient) Subscribe(param *vo.SubscribeParam) error {
    if param.GroupName == "" {param.GroupName = constant.DEFAULT_GROUP}
    serviceParam := vo.GetServiceParam{
        ServiceName: param.ServiceName,
        GroupName:   param.GroupName,
        Clusters:    param.Clusters,
    }

    sc.subCallback.AddCallbackFuncs(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), &param.SubscribeCallback)
    _, err := sc.GetService(serviceParam)
    if err != nil {return err}
    return nil
}
  • Subscribe 方法通过 sc.subCallback.AddCallbackFuncs 来注册 callback

Unsubscribe

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

// 取消服务监听
func (sc *NamingClient) Unsubscribe(param *vo.SubscribeParam) error {sc.subCallback.RemoveCallbackFuncs(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), &param.SubscribeCallback)
    return nil
}
  • Unsubscribe 方法则通过 sc.subCallback.RemoveCallbackFuncs 来取消 callback

小结

nacos-sdk-go 的 NamingClient 提供了 RegisterInstance、DeregisterInstance、GetService、GetAllServicesInfo、SelectAllInstances、SelectInstances、SelectOneHealthyInstance、Subscribe、Unsubscribe 方法

doc

  • naming_client
退出移动版