乐趣区

关于云原生-cloud-native:Dubbogo-源码笔记二客户端调用过程

作者 | 李志信

导读 :有了上一篇文章《Dubbo-go 源码笔记(一)Server 端开启服务过程》的铺垫,能够类比客户端启动于服务端的启动过程。其中最大的区别是服务端通过 zk 注册服务,公布本人的 ivkURL 并订阅事件开启监听;而客户应该是通过 zk 注册组件,拿到须要调用的 serviceURL,更新 invoker 并重写用户的 RPCService,从而实现对近程过程调用细节的封装。

配置文件和客户端源代码

1. client 配置文件

helloworld 提供的 demo:profiles/client.yaml。

registries :
  "demoZk":
    protocol: "zookeeper"
    timeout  : "3s"
    address: "127.0.0.1:2181"
    username: ""password:""
references:
  "UserProvider":
    # 能够指定多个 registry,应用逗号隔开; 不指定默认向所有注册核心注册
    registry: "demoZk"
    protocol : "dubbo"
    interface : "com.ikurento.user.UserProvider"
    cluster: "failover"
    methods :
    - name: "GetUser"
      retries: 3

可看到配置文件与之前探讨过的 Server 端十分相似,其 refrences 局部字段就是对以后服务要主调的服务的配置,其中具体阐明了调用协定、注册协定、接口 id、调用办法、集群策略等,这些配置都会在之后与注册组件交互、重写 ivk、调用的过程中应用到。

2. 客户端应用框架源码

user.go:

func init() {config.SetConsumerService(userProvider)
  hessian.RegisterPOJO(&User{})
}

main.go:

func main() {hessian.RegisterPOJO(&User{})
  config.Load()
  time.Sleep(3e9)
  println("\n\n\nstart to test dubbo")
  user := &User{}
  err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)
  if err != nil {panic(err)
  }
  println("response result: %v\n", user)
  initSignal()}

在官网提供的 helloworld demo 的源码中,可看到与服务端相似,在 user.go 内注册了 rpc-service,以及须要 rpc 传输的构造体 user。

在 main 函数中,同样调用了 config.Load() 函数,之后就能够通过实现好的 rpc-service:userProvider 间接调用对应的性能函数,即可实现 rpc 调用。

能够猜到,从 hessian 注册构造、SetConsumerService,到调用函数 .GetUser() 期间,用户定义的 rpc-service 也就是 userProvider 对应的函数被重写,重写后的 GetUser 函数曾经蕴含实现了近程调用逻辑的 invoker。

接下来,就要通过浏览源码,看看 dubbo-go 是如何做到的。

实现近程过程调用

1. 加载配置文件

// file: config/config_loader.go :Load()

// Load Dubbo Init
func Load() {
  // init router
  initRouter()
  // init the global event dispatcher
  extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)
  // start the metadata report if config set
  if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
  return
  }
  // reference config
  loadConsumerConfig()

在 main 函数中调用了 config.Load() 函数,进而调用了 loadConsumerConfig,相似于之前讲到的 server 端配置读入函数。

在 loadConsumerConfig 函数中,进行了三步操作:

// config/config_loader.go
func loadConsumerConfig() {
    // 1 init other consumer config
    conConfigType := consumerConfig.ConfigType
    for key, value := range extension.GetDefaultConfigReader() {}
    checkApplicationName(consumerConfig.ApplicationConfig)
    configCenterRefreshConsumer()
    checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
    
    // 2 refer-implement-reference
    for key, ref := range consumerConfig.References {
        if ref.Generic {genericService := NewGenericService(key)
            SetConsumerService(genericService)
        }
        rpcService := GetConsumerService(key)
        ref.id = key
        ref.Refer(rpcService)
        ref.Implement(rpcService)
    }
    // 3 wait for invoker is available, if wait over default 3s, then panic
    for {}}
  1. 查看配置文件并将配置写入内存
  2. 在 for 循环外部 ,顺次援用(refer)并且实例化(implement)每个被调 reference
  3. 期待三秒钟所有 invoker 就绪

其中重要的就是 for 循环外面的援用和实例化,两步操作,会在接下来展开讨论。

至此,配置曾经被写入了框架。

2. 获取近程 Service URL,实现可供调用的 invoker

上述的 ref.Refer 实现的就是这部分的操作。


图(一)

1)结构注册 url

和 server 端相似,存在注册 url 和服务 url,dubbo 习惯将服务 url 作为注册 url 的 sub。

// file: config/reference_config.go: Refer()
func (c *ReferenceConfig) Refer(_ interface{}) {//(一)配置 url 参数 (serviceUrl),将会作为 sub
  cfgURL := common.NewURLWithOptions(common.WithPath(c.id),
  common.WithProtocol(c.Protocol),
  common.WithParams(c.getUrlMap()),
  common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
  )
  ...
  //(二)注册地址能够通过 url 格局给定,也能够通过配置格局给定
  // 这一步的意义就是配置 -> 提取信息生成 URL
  if c.Url != "" {// 用户给定 url 信息,能够是点对点的地址,也能够是注册核心的地址
  // 1. user specified URL, could be peer-to-peer address, or register center's address.
  urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*")
  for _, urlStr := range urlStrings {serviceUrl, err := common.NewURL(urlStr)
    ...
  }
  } else {// 配置读入注册核心的信息
  //  assemble SubURL from register center's configuration mode
  // 这是注册 url,protocol = registry, 蕴含了 zk 的用户名、明码、ip 等等
  c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)
  ...
  // set url to regUrls
  for _, regUrl := range c.urls {regUrl.SubURL = cfgURL// regUrl 的 subURl 存以后配置 url}
  }
  // 至此,无论通过什么模式,曾经拿到了全副的 regURL
  //(三)获取 registryProtocol 实例,调用其 Refer 办法,传入新构建好的 regURL
  if len(c.urls) == 1 {
  // 这一步拜访到 registry/protocol/protocol.go registryProtocol.Refer
  // 这里是 registry
  c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])
  } else {
  // 如果有多个注册核心,即有多个 invoker, 则采取集群策略
  invokers := make([]protocol.Invoker, 0, len(c.urls))
  ...
  }

这个函数中,曾经解决完从 Register 配置到 RegisterURL 的转换, 即图(一)中局部:

接下来,曾经拿到的 url 将被传递给 RegistryProtocol,进一步 refer。

2)registryProtocol 获取到 zkRegistry 实例,进一步 Refer

// file: registry/protocol/protocol.go: Refer

// Refer provider service from registry center
// 拿到的是配置文件 registries 的 url,他可能生成一个 invoker = 指向目标 addr,以供客户端间接调用。func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
  var registryUrl = url
  // 这里拿到的是 referenceConfig,serviceUrl 外面蕴含了 Reference 的所有信息,蕴含 interfaceName、method 等等
  var serviceUrl = registryUrl.SubURL
  if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = "registry"
  protocol := registryUrl.GetParam(constant.REGISTRY_KEY, "")
  registryUrl.Protocol = protocol// 替换成了具体的值,比方 "zookeeper"
  }
  // 接口对象
  var reg registry.Registry
  //(一)实例化接口对象,缓存策略
  if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
  // 缓存中不存在以后 registry,新建一个 reg
  reg = getRegistry(&registryUrl)
  // 缓存起来
  proto.registries.Store(registryUrl.Key(), reg)
  } else {reg = regI.(registry.Registry)
  }
  // 到这里,获取到了 reg 实例 zookeeper 的 registry
  //(二)依据 Register 的实例 zkRegistry 和传入的 regURL 新建一个 directory
  // 这一步存在简单的异步逻辑,从注册核心拿到了目标 service 的实在 addr,获取了 invoker 并放入 directory,// 这一步将在上面具体给出步骤
  // new registry directory for store service url from registry
  directory, err := extension.GetDefaultRegistryDirectory(&registryUrl, reg)
  if err != nil {
  logger.Errorf("consumer service %v  create registry directory  error, error message is %s, and will return nil invoker!",
    serviceUrl.String(), err.Error())
  return nil
  }
  //(三)DoRegister 在 zk 上注册以后 client service
  err = reg.Register(*serviceUrl)
  if err != nil {
  logger.Errorf("consumer service %v register registry %v error, error message is %s",
    serviceUrl.String(), registryUrl.String(), err.Error())
  }
  //(四)new cluster invoker,将 directory 写入集群,取得具备集群策略的 invoker
  cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
  invoker := cluster.Join(directory)
  // invoker 保留
  proto.invokers = append(proto.invokers, invoker)
  return invoker
}

可具体浏览上述正文,这个函数实现了从 url 到 invoker 的全副过程:

(一)首先取得 Registry 对象,默认是之前实例化的 zkRegistry,和之前 server 获取 Registry 的解决很相似。

(二)通过结构一个新的 directory,异步拿到之前在 zk 上注册的 server 端信息,生成 invoker。

(三)在 zk 上注册以后 service。

(四)集群策略,取得最终 invoker。

这一步实现了图(一)中所有余下的绝大多数操作,接下来就须要具体地查看 directory 的结构过程。

3)结构 directory(蕴含较简单的异步操作)


图(二)

上述的 extension.GetDefaultRegistryDirectory(&registryUrl, reg) 函数,实质上调用了曾经注册好的 NewRegistryDirectory 函数:

// file: registry/directory/directory.go: NewRegistryDirectory()

// NewRegistryDirectory will create a new RegistryDirectory
// 这个函数作为 default 注册在 extension 下面
// url 为注册 url,reg 为 zookeeper registry
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
  if url.SubURL == nil {return nil, perrors.Errorf("url is invalid, suburl can not be nil")
  }
  dir := &RegistryDirectory{BaseDirectory:    directory.NewBaseDirectory(url),
  cacheInvokers:    []protocol.Invoker{},
  cacheInvokersMap: &sync.Map{},
  serviceType:      url.SubURL.Service(),
  registry:         registry,
  }
  dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
  go dir.subscribe(url.SubURL)
  return dir, nil
}

首先结构了一个注册 directory,开启协程调用其 subscribe 函数,传入 serviceURL。

这个 directory 目前蕴含了对应的 zkRegistry,以及传入的 URL,它的 cacheInvokers 局部是空的。

进入 dir.subscribe(url.SubURL) 这个异步函数:

/ file: registry/directory/directory.go: subscribe()

// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
  // 减少两个监听,dir.consumerConfigurationListener.addNotifyListener(dir)
  dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
  // subscribe 调用
  dir.registry.Subscribe(url, dir)
}

重点来了,它调用了 zkRegistry 的 Subscribe 办法, 与此同时将本人作为 ConfigListener 传入。

我认为这种传入 listener 的设计模式十分值得学习,而且很有 java 的滋味。
针对期待 zk 返回订阅信息这样的异步操作,须要传入一个 Listener,这个 Listener 须要实现 Notify 办法,进而在作为参数传入外部之后,能够被异步地调用 Notify,将外部触发的异步事件“传递进去”,再进一步解决加工。
层层的 Listener 事件链,能将传入的原始 serviceURL 通过 zkConn 发送给 zk 服务,获取到服务端注册好的 url 对应的二进制信息。
而 Notify 回调链,则将这串 byte[] 一步一步解析、加工;以事件的模式向外传递,最终落到 directory 上的时候,曾经是成型的 newInvokers 了。
具体细节不再以源码模式展现,可参照上图查阅源码。

至此曾经拿到了 server 端注册好的实在 invoker。

实现了图(一)中的局部:

4)结构带有集群策略的 clusterinvoker

通过上述操作,曾经拿到了 server 端 Invokers,放入了 directory 的 cacheinvokers 数组外面缓存。

后续的操作对应本文从 url 到 invoker 的过程的最初一步,由 directory 生成带有个性集群策略的 invoker。

//(四)new cluster invoker,将 directory 写入集群,取得具备集群策略的 invoker
  cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
  invoker := cluster.Join(directory)
123

Join 函数的实现就是如下函数:

// file: cluster/cluster_impl/failover_cluster_invokers.go: newFailoverClusterInvoker()

func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
  return &failoverClusterInvoker{baseClusterInvoker: newBaseClusterInvoker(directory),
  }
}
12345

dubbo-go 框架默认抉择 failover 策略,既然返回了一个 invoker,咱们查看一下 failoverClusterInvoker 的 Invoker 办法,看它是如何将集群策略封装到 Invoker 函数外部的:

// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()

// Invoker 函数
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
  ...
  // 调用 List 办法拿到 directory 缓存的所有 invokers
  invokers := invoker.directory.List(invocation)
  if err := invoker.checkInvokers(invokers, invocation); err != nil {// 查看是否能够实现调用
  return &protocol.RPCResult{Err: err}
  }
  // 获取来自用户方向传入的
  methodName := invocation.MethodName()
  retries := getRetries(invokers, methodName)
  loadBalance := getLoadBalance(invokers[0], invocation)
  for i := 0; i <= retries; i++ {
  // 重要!这里是集群策略的体现,失败后重试!//Reselect before retry to avoid a change of candidate `invokers`.
  //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
  if i > 0 {if err := invoker.checkWhetherDestroyed(); err != nil {return &protocol.RPCResult{Err: err}
    }
    invokers = invoker.directory.List(invocation)
    if err := invoker.checkInvokers(invokers, invocation); err != nil {return &protocol.RPCResult{Err: err}
    }
  }
  // 这里是负载平衡策略的体现!抉择特定 ivk 进行调用。ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
  if ivk == nil {continue}
  invoked = append(invoked, ivk)
  //DO INVOKE
  result = ivk.Invoke(ctx, invocation)
  if result.Error() != nil {providers = append(providers, ivk.GetUrl().Key())
    continue
  }
  return result
  }
  ...
}

看了很多 Invoke 函数的实现,所有相似的 Invoker 函数都蕴含两个方向:一个是用户方向的 invcation;一个是函数方向的底层 invokers。
而集群策略的 invoke 函数自身作为接线员,把 invocation 一步步解析,依据调用需要和集群策略,抉择特定的 invoker 来执行。
proxy 函数也是这样,一个是用户方向的 ins[] reflect.Type, 一个是函数方向的 invoker。
proxy 函数负责将 ins 转换为 invocation,调用对应 invoker 的 invoker 函数,实现连通。
而出于这样的设计,能够在一步步 Invoker 封装的过程中,每个 Invoker 只关怀本人负责操作的局部,从而使整个调用栈解耦。
妙啊!!!

至此,咱们了解了 failoverClusterInvoker 的 Invoke 函数实现,也正是和这个集群策略 Invoker 被返回,承受来自上方的调用。

已实现图(一)中的:

5)在 zookeeper 上注册以后 client

拿到 invokers 后,能够回到这个函数了:

  // file: config/refrence_config.go: Refer()
  
  if len(c.urls) == 1 {
  // 这一步拜访到 registry/protocol/protocol.go registryProtocol.Refer
  c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])
  //(一)拿到了实在的 invokers
  } else {
  // 如果有多个注册核心,即有多个 invoker, 则采取集群策略
  invokers := make([]protocol.Invoker, 0, len(c.urls))
  ...
  cluster := extension.GetCluster(hitClu)
  // If 'zone-aware' policy select, the invoker wrap sequence would be:
  // ZoneAwareClusterInvoker(StaticDirectory) ->
  // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
  c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
  }
  //(二)create proxy,为函数配置代理
  if c.Async {callback := GetCallback(c.id)
  c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)
  } else {
  // 这里 c.invoker 曾经是目标 addr 了
  c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)
  }

咱们有了能够买通的 invokers,但还不能间接调用,因为 invoker 的入参是 invocation,而调用函数应用的是具体的参数列表,须要通过一层 proxy 来标准入参和出参。

接下来新建一个默认 proxy,搁置在 c.proxy 内,以供后续应用。

至此,实现了图(一)中最初的操作:

3. 将调用逻辑以代理函数的模式写入 rpc-service

下面实现了 config.Refer 操作,回到:

config/config_loader.go: loadConsumerConfig()

下一个重要的函数是 Implement,它的操作较为简单:旨在应用下面生成的 c.proxy 代理,链接用户本人定义的 rpcService 到 clusterInvoker 的信息传输。

函数较长,只选取了重要的局部:

// file: common/proxy/proxy.go: Implement()

// Implement
// proxy implement
// In consumer, RPCService like:
//    type XxxProvider struct {//    Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
//    }
// Implement 实现的过程,就是 proxy 依据函数名和返回值,通过调用 invoker 结构出领有近程调用逻辑的代理函数
// 将以后 rpc 所有可供调用的函数注册到 proxy.rpc 内
func (p *Proxy) Implement(v common.RPCService) {// makeDubboCallProxy 这是一个结构代理函数,这个函数的返回值是 func(in []reflect.Value) []reflect.Value 这样一个函数
  // 这个被返回的函数是申请实现的载体,由他来发动调用获取后果
  makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {return func(in []reflect.Value) []reflect.Value {
    // 依据 methodName 和 outs 的类型,结构这样一个函数,这个函数能将 in 输出的 value 转换为输入的 value
    // 这个函数具体的实现如下:...
    // 目前拿到了 methodName、所有入参的 interface 和 value,出参数 reply
    //(一)依据这些生成一个 rpcinvocation
    inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName),
    invocation_impl.WithArguments(inIArr),
    invocation_impl.WithReply(reply.Interface()),
    invocation_impl.WithCallBack(p.callBack),
    invocation_impl.WithParameterValues(inVArr))
    for k, value := range p.attachments {inv.SetAttachments(k, value)
    }
    // add user setAttachment
    atm := invCtx.Value(constant.AttachmentKey) // 如果传入的 ctx 外面有 attachment,也要写入 inv
    if m, ok := atm.(map[string]string); ok {
    for k, value := range m {inv.SetAttachments(k, value)
    }
    }
    // 至此结构 inv 结束
    // ( 二)触发 Invoker 之前曾经将 cluster_invoker 放入 proxy,应用 Invoke 办法,通过 getty 近程过程调用
    result := p.invoke.Invoke(invCtx, inv)
    // 如果有 attachment,则退出
    if len(result.Attachments()) > 0 {invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())
    }
    ...
  }
  }
  numField := valueOfElem.NumField()
  for i := 0; i < numField; i++ {t := typeOf.Field(i)
  methodName := t.Tag.Get("dubbo")
  if methodName == "" {methodName = t.Name}
  f := valueOfElem.Field(i)
  if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { // 针对于每个函数
    outNum := t.Type.NumOut()
    // 规定函数输入只能有 1 / 2 个
    if outNum != 1 && outNum != 2 {
    logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",
      t.Name, t.Type.String(), outNum)
    continue
    }
    // The latest return type of the method must be error.
    // 规定最初一个返回值肯定是 error
    if returnType := t.Type.Out(outNum - 1); returnType != typError {logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name)
    continue
    }
    // 获取到所有的出参类型,放到数组里
    var funcOuts = make([]reflect.Type, outNum)
    for i := 0; i < outNum; i++ {funcOuts[i] = t.Type.Out(i)
    }
    // do method proxy here:
    //(三)调用 make 函数,传入函数名和返回值,取得能调用近程的 proxy,将这个 proxy 替换掉原来的函数地位
    f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))
    logger.Debugf("set method [%s]", methodName)
  }
  }
  ...
}

正如之前所说,proxy 的作用是将用户定义的函数参数列表,转化为形象的 invocation 传入 Invoker,进行调用。

其中已表明有三处较为重要的中央:

  1. 在代理函数中实现由参数列表生成 Invocation 的逻辑
  2. 在代理函数实现调用 Invoker 的逻辑
  3. 将代理函数替换为原始 rpc-service 对应函数

至此,也就解决了一开始的问题:

  // file: client.go: main()
  
  config.Load()
  user := &User{}
  err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)

这里间接调用用户定义的 rpcService 的函数 GetUser,此处理论调用的是通过重写入的函数代理,所以就能实现近程调用了。

从 client 到 server 的 invoker 嵌套链 - 小结

在浏览 dubbo-go 源码的过程中,咱们可能发现一条清晰的 invoker-proxy 嵌套链,心愿可能通过图的模式来展示:

如果你有任何疑难,欢送钉钉扫码退出钉钉交换群:钉钉群号 23331795。

作者简介

李志信  (GitHubID LaurenceLiZhixin),中山大学软件工程业余在校学生,善于应用 Java/Go 语言,专一于云原生和微服务等技术方向。

“阿里巴巴云原生关注微服务、Serverless、容器、Service Mesh 等技术畛域、聚焦云原生风行技术趋势、云原生大规模的落地实际,做最懂云原生开发者的公众号。”

退出移动版