序
本文次要钻研一下dubbo-go的forkingCluster
forkingCluster
dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster.go
type forkingCluster struct{}const forking = "forking"func init() { extension.SetCluster(forking, NewForkingCluster)}// NewForkingCluster ...func NewForkingCluster() cluster.Cluster { return &forkingCluster{}}func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker { return newForkingClusterInvoker(directory)}
- forkingCluster的Join办法执行newForkingClusterInvoker
newForkingClusterInvoker
dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go
type forkingClusterInvoker struct { baseClusterInvoker}func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker { return &forkingClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), }}
- newForkingClusterInvoker创立了forkingClusterInvoker
Invoke
dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go
// Invoke ...func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { err := invoker.checkWhetherDestroyed() if err != nil { return &protocol.RPCResult{Err: err} } invokers := invoker.directory.List(invocation) err = invoker.checkInvokers(invokers, invocation) if err != nil { return &protocol.RPCResult{Err: err} } var selected []protocol.Invoker forks := int(invoker.GetUrl().GetParamInt(constant.FORKS_KEY, constant.DEFAULT_FORKS)) timeouts := invoker.GetUrl().GetParamInt(constant.TIMEOUT_KEY, constant.DEFAULT_TIMEOUT) if forks < 0 || forks > len(invokers) { selected = invokers } else { selected = make([]protocol.Invoker, 0) loadbalance := getLoadBalance(invokers[0], invocation) for i := 0; i < forks; i++ { ivk := invoker.doSelect(loadbalance, invocation, invokers, selected) if ivk != nil { selected = append(selected, ivk) } } } resultQ := queue.New(1) for _, ivk := range selected { go func(k protocol.Invoker) { result := k.Invoke(ctx, invocation) err := resultQ.Put(result) if err != nil { logger.Errorf("resultQ put failed with exception: %v.\n", err) } }(ivk) } rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts)) if err != nil { return &protocol.RPCResult{ Err: fmt.Errorf("failed to forking invoke provider %v, "+ "but no luck to perform the invocation. Last error is: %v", selected, err), } } if len(rsps) == 0 { return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but no resp", selected)} } result, ok := rsps[0].(protocol.Result) if !ok { return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)} } return result}
- Invoke办法先通过invoker.directory.List(invocation)获取invokers,之后从invoker.GetUrl()获取forks及timeouts参数,而后循环forks次通过invoker.doSelect(loadbalance, invocation, invokers, selected)选出selected的invokers;之后遍历selected异步执行其Invoke办法,并将后果放到resultQ中;最初通过resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))拉取最先返回的后果返回
小结
forkingCluster的Join办法执行newForkingClusterInvoker;其Invoke办法循环forks次通过invoker.doSelect(loadbalance, invocation, invokers, selected)选出selected的invokers;之后遍历selected异步执行其Invoke办法,并将后果放到resultQ中;最初通过resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))拉取最先返回的后果返回
doc
- forking_cluster