本文次要钻研一下dubbo-go的forkingCluster

forkingCluster

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster.go

type forkingCluster struct{}const forking = "forking"func init() {    extension.SetCluster(forking, NewForkingCluster)}// NewForkingCluster ...func NewForkingCluster() cluster.Cluster {    return &forkingCluster{}}func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {    return newForkingClusterInvoker(directory)}
  • forkingCluster的Join办法执行newForkingClusterInvoker

newForkingClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go

type forkingClusterInvoker struct {    baseClusterInvoker}func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {    return &forkingClusterInvoker{        baseClusterInvoker: newBaseClusterInvoker(directory),    }}
  • newForkingClusterInvoker创立了forkingClusterInvoker

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go

// Invoke ...func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {    err := invoker.checkWhetherDestroyed()    if err != nil {        return &protocol.RPCResult{Err: err}    }    invokers := invoker.directory.List(invocation)    err = invoker.checkInvokers(invokers, invocation)    if err != nil {        return &protocol.RPCResult{Err: err}    }    var selected []protocol.Invoker    forks := int(invoker.GetUrl().GetParamInt(constant.FORKS_KEY, constant.DEFAULT_FORKS))    timeouts := invoker.GetUrl().GetParamInt(constant.TIMEOUT_KEY, constant.DEFAULT_TIMEOUT)    if forks < 0 || forks > len(invokers) {        selected = invokers    } else {        selected = make([]protocol.Invoker, 0)        loadbalance := getLoadBalance(invokers[0], invocation)        for i := 0; i < forks; i++ {            ivk := invoker.doSelect(loadbalance, invocation, invokers, selected)            if ivk != nil {                selected = append(selected, ivk)            }        }    }    resultQ := queue.New(1)    for _, ivk := range selected {        go func(k protocol.Invoker) {            result := k.Invoke(ctx, invocation)            err := resultQ.Put(result)            if err != nil {                logger.Errorf("resultQ put failed with exception: %v.\n", err)            }        }(ivk)    }    rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))    if err != nil {        return &protocol.RPCResult{            Err: fmt.Errorf("failed to forking invoke provider %v, "+                "but no luck to perform the invocation. Last error is: %v", selected, err),        }    }    if len(rsps) == 0 {        return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but no resp", selected)}    }    result, ok := rsps[0].(protocol.Result)    if !ok {        return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)}    }    return result}
  • Invoke办法先通过invoker.directory.List(invocation)获取invokers,之后从invoker.GetUrl()获取forks及timeouts参数,而后循环forks次通过invoker.doSelect(loadbalance, invocation, invokers, selected)选出selected的invokers;之后遍历selected异步执行其Invoke办法,并将后果放到resultQ中;最初通过resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))拉取最先返回的后果返回

小结

forkingCluster的Join办法执行newForkingClusterInvoker;其Invoke办法循环forks次通过invoker.doSelect(loadbalance, invocation, invokers, selected)选出selected的invokers;之后遍历selected异步执行其Invoke办法,并将后果放到resultQ中;最初通过resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))拉取最先返回的后果返回

doc

  • forking_cluster