本文次要钻研一下dubbo-go的kubernetesRegistry

kubernetesRegistry

dubbo-go-v1.4.2/registry/kubernetes/registry.go

var (    processID = ""    localIP   = "")const (    Name         = "kubernetes"    ConnDelay    = 3    MaxFailTimes = 15)func init() {    processID = fmt.Sprintf("%d", os.Getpid())    localIP, _ = gxnet.GetLocalIP()    extension.SetRegistry(Name, newKubernetesRegistry)}type kubernetesRegistry struct {    registry.BaseRegistry    cltLock        sync.RWMutex    client         *kubernetes.Client    listenerLock   sync.Mutex    listener       *kubernetes.EventListener    dataListener   *dataListener    configListener *configurationListener}
  • kubernetesRegistry定义了cltLock、client、listenerLock、listener、dataListener、configListener属性

newKubernetesRegistry

dubbo-go-v1.4.2/registry/kubernetes/registry.go

func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {    // actually, kubernetes use in-cluster config,    r := &kubernetesRegistry{}    r.InitBaseRegistry(url, r)    if err := kubernetes.ValidateClient(r); err != nil {        return nil, perrors.WithStack(err)    }    r.WaitGroup().Add(1)    go r.HandleClientRestart()    r.InitListeners()    logger.Debugf("the kubernetes registry started")    return r, nil}
  • newKubernetesRegistry办法实例化kubernetesRegistry,而后执行InitBaseRegistry、InitListeners

InitListeners

dubbo-go-v1.4.2/registry/kubernetes/registry.go

func (r *kubernetesRegistry) InitListeners() {    r.listener = kubernetes.NewEventListener(r.client)    r.configListener = NewConfigurationListener(r)    r.dataListener = NewRegistryDataListener(r.configListener)}
  • InitListeners办法执行kubernetes.NewEventListener、NewConfigurationListener、NewRegistryDataListener

DoRegister

dubbo-go-v1.4.2/registry/kubernetes/registry.go

func (r *kubernetesRegistry) DoRegister(root string, node string) error {    return r.client.Create(path.Join(root, node), "")}
  • DoRegister执行r.client.Create(path.Join(root, node), "")

DoSubscribe

dubbo-go-v1.4.2/registry/kubernetes/registry.go

func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {    var (        configListener *configurationListener    )    r.listenerLock.Lock()    configListener = r.configListener    r.listenerLock.Unlock()    if r.listener == nil {        r.cltLock.Lock()        client := r.client        r.cltLock.Unlock()        if client == nil {            return nil, perrors.New("kubernetes client broken")        }        r.listenerLock.Lock()        if r.listener == nil {            // double check            r.listener = kubernetes.NewEventListener(r.client)        }        r.listenerLock.Unlock()    }    //register the svc to dataListener    r.dataListener.AddInterestedURL(svc)    for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") {        go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener)    }    return configListener, nil}
  • DoSubscribe办法在r.listener为nil时则通过kubernetes.NewEventListener(r.client)创立listener;之后执行r.dataListener.AddInterestedURL(svc);最初遍历category,执行r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener)

小结

kubernetesRegistry定义了cltLock、client、listenerLock、listener、dataListener、configListener属性;InitListeners办法执行kubernetes.NewEventListener、NewConfigurationListener、NewRegistryDataListener

doc

  • registry