序
本文次要钻研一下 dubbo-go 的 forkingCluster
forkingCluster
dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster.go
type forkingCluster struct{}
const forking = "forking"
func init() {extension.SetCluster(forking, NewForkingCluster)
}
// NewForkingCluster ...
func NewForkingCluster() cluster.Cluster {return &forkingCluster{}
}
func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {return newForkingClusterInvoker(directory)
}
- forkingCluster 的 Join 办法执行 newForkingClusterInvoker
newForkingClusterInvoker
dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go
type forkingClusterInvoker struct {baseClusterInvoker}
func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &forkingClusterInvoker{baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
- newForkingClusterInvoker 创立了 forkingClusterInvoker
Invoke
dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go
// Invoke ...
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {err := invoker.checkWhetherDestroyed()
if err != nil {return &protocol.RPCResult{Err: err}
}
invokers := invoker.directory.List(invocation)
err = invoker.checkInvokers(invokers, invocation)
if err != nil {return &protocol.RPCResult{Err: err}
}
var selected []protocol.Invoker
forks := int(invoker.GetUrl().GetParamInt(constant.FORKS_KEY, constant.DEFAULT_FORKS))
timeouts := invoker.GetUrl().GetParamInt(constant.TIMEOUT_KEY, constant.DEFAULT_TIMEOUT)
if forks < 0 || forks > len(invokers) {selected = invokers} else {selected = make([]protocol.Invoker, 0)
loadbalance := getLoadBalance(invokers[0], invocation)
for i := 0; i < forks; i++ {ivk := invoker.doSelect(loadbalance, invocation, invokers, selected)
if ivk != nil {selected = append(selected, ivk)
}
}
}
resultQ := queue.New(1)
for _, ivk := range selected {go func(k protocol.Invoker) {result := k.Invoke(ctx, invocation)
err := resultQ.Put(result)
if err != nil {logger.Errorf("resultQ put failed with exception: %v.\n", err)
}
}(ivk)
}
rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))
if err != nil {
return &protocol.RPCResult{
Err: fmt.Errorf("failed to forking invoke provider %v,"+
"but no luck to perform the invocation. Last error is: %v", selected, err),
}
}
if len(rsps) == 0 {return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but no resp", selected)}
}
result, ok := rsps[0].(protocol.Result)
if !ok {return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)}
}
return result
}
- Invoke 办法先通过 invoker.directory.List(invocation) 获取 invokers,之后从 invoker.GetUrl() 获取 forks 及 timeouts 参数,而后循环 forks 次通过 invoker.doSelect(loadbalance, invocation, invokers, selected) 选出 selected 的 invokers;之后遍历 selected 异步执行其 Invoke 办法,并将后果放到 resultQ 中;最初通过 resultQ.Poll(1, time.Millisecond*time.Duration(timeouts)) 拉取最先返回的后果返回
小结
forkingCluster 的 Join 办法执行 newForkingClusterInvoker;其 Invoke 办法循环 forks 次通过 invoker.doSelect(loadbalance, invocation, invokers, selected) 选出 selected 的 invokers;之后遍历 selected 异步执行其 Invoke 办法,并将后果放到 resultQ 中;最初通过 resultQ.Poll(1, time.Millisecond*time.Duration(timeouts)) 拉取最先返回的后果返回
doc
- forking_cluster