乐趣区

关于dubbo:聊聊dubbogo的forkingCluster

本文次要钻研一下 dubbo-go 的 forkingCluster

forkingCluster

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster.go

type forkingCluster struct{}

const forking = "forking"

func init() {extension.SetCluster(forking, NewForkingCluster)
}

// NewForkingCluster ...
func NewForkingCluster() cluster.Cluster {return &forkingCluster{}
}

func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {return newForkingClusterInvoker(directory)
}
  • forkingCluster 的 Join 办法执行 newForkingClusterInvoker

newForkingClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go

type forkingClusterInvoker struct {baseClusterInvoker}

func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
    return &forkingClusterInvoker{baseClusterInvoker: newBaseClusterInvoker(directory),
    }
}
  • newForkingClusterInvoker 创立了 forkingClusterInvoker

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go

// Invoke ...
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {err := invoker.checkWhetherDestroyed()
    if err != nil {return &protocol.RPCResult{Err: err}
    }

    invokers := invoker.directory.List(invocation)
    err = invoker.checkInvokers(invokers, invocation)
    if err != nil {return &protocol.RPCResult{Err: err}
    }

    var selected []protocol.Invoker
    forks := int(invoker.GetUrl().GetParamInt(constant.FORKS_KEY, constant.DEFAULT_FORKS))
    timeouts := invoker.GetUrl().GetParamInt(constant.TIMEOUT_KEY, constant.DEFAULT_TIMEOUT)
    if forks < 0 || forks > len(invokers) {selected = invokers} else {selected = make([]protocol.Invoker, 0)
        loadbalance := getLoadBalance(invokers[0], invocation)
        for i := 0; i < forks; i++ {ivk := invoker.doSelect(loadbalance, invocation, invokers, selected)
            if ivk != nil {selected = append(selected, ivk)
            }
        }
    }

    resultQ := queue.New(1)
    for _, ivk := range selected {go func(k protocol.Invoker) {result := k.Invoke(ctx, invocation)
            err := resultQ.Put(result)
            if err != nil {logger.Errorf("resultQ put failed with exception: %v.\n", err)
            }
        }(ivk)
    }

    rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))
    if err != nil {
        return &protocol.RPCResult{
            Err: fmt.Errorf("failed to forking invoke provider %v,"+
                "but no luck to perform the invocation. Last error is: %v", selected, err),
        }
    }
    if len(rsps) == 0 {return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but no resp", selected)}
    }

    result, ok := rsps[0].(protocol.Result)
    if !ok {return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)}
    }

    return result
}
  • Invoke 办法先通过 invoker.directory.List(invocation) 获取 invokers,之后从 invoker.GetUrl() 获取 forks 及 timeouts 参数,而后循环 forks 次通过 invoker.doSelect(loadbalance, invocation, invokers, selected) 选出 selected 的 invokers;之后遍历 selected 异步执行其 Invoke 办法,并将后果放到 resultQ 中;最初通过 resultQ.Poll(1, time.Millisecond*time.Duration(timeouts)) 拉取最先返回的后果返回

小结

forkingCluster 的 Join 办法执行 newForkingClusterInvoker;其 Invoke 办法循环 forks 次通过 invoker.doSelect(loadbalance, invocation, invokers, selected) 选出 selected 的 invokers;之后遍历 selected 异步执行其 Invoke 办法,并将后果放到 resultQ 中;最初通过 resultQ.Poll(1, time.Millisecond*time.Duration(timeouts)) 拉取最先返回的后果返回

doc

  • forking_cluster
退出移动版