本文次要钻研一下dubbo-go的failfastCluster

failfastCluster

dubbo-go-v1.4.2/cluster/cluster_impl/failfast_cluster.go

type failfastCluster struct{}const failfast = "failfast"func init() {    extension.SetCluster(failfast, NewFailFastCluster)}// NewFailFastCluster ...func NewFailFastCluster() cluster.Cluster {    return &failfastCluster{}}func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker {    return newFailFastClusterInvoker(directory)}
  • failfastCluster的Join办法执行newFailFastClusterInvoker(directory)

newFailFastClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/failfast_cluster_invoker.go

type failfastClusterInvoker struct {    baseClusterInvoker}func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker {    return &failfastClusterInvoker{        baseClusterInvoker: newBaseClusterInvoker(directory),    }}
  • newFailFastClusterInvoker办法创立了failfastClusterInvoker

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/failfast_cluster_invoker.go

func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {    invokers := invoker.directory.List(invocation)    err := invoker.checkInvokers(invokers, invocation)    if err != nil {        return &protocol.RPCResult{Err: err}    }    loadbalance := getLoadBalance(invokers[0], invocation)    err = invoker.checkWhetherDestroyed()    if err != nil {        return &protocol.RPCResult{Err: err}    }    ivk := invoker.doSelect(loadbalance, invocation, invokers, nil)    return ivk.Invoke(ctx, invocation)}
  • Invoke办法先通过invoker.directory.List(invocation)获取invokers,之后通过getLoadBalance(invokers[0], invocation)获取loadbalance,再通过invoker.doSelect(loadbalance, invocation, invokers, nil)抉择ivk,最初执行ivk.Invoke(ctx, invocation)

getLoadBalance

dubbo-go-v1.4.2/cluster/cluster_impl/base_cluster_invoker.go

func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance {    url := invoker.GetUrl()    methodName := invocation.MethodName()    //Get the service loadbalance config    lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)    //Get the service method loadbalance config if have    if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); len(v) > 0 {        lb = v    }    return extension.GetLoadbalance(lb)}
  • getLoadBalance办法通过url获取constant.LOADBALANCE_KEY,而后通过extension.GetLoadbalance(lb)获取cluster.LoadBalance

doSelect

dubbo-go-v1.4.2/cluster/cluster_impl/base_cluster_invoker.go

func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {    var selectedInvoker protocol.Invoker    url := invokers[0].GetUrl()    sticky := url.GetParamBool(constant.STICKY_KEY, false)    //Get the service method sticky config if have    sticky = url.GetMethodParamBool(invocation.MethodName(), constant.STICKY_KEY, sticky)    if invoker.stickyInvoker != nil && !isInvoked(invoker.stickyInvoker, invokers) {        invoker.stickyInvoker = nil    }    if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {        if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() {            return invoker.stickyInvoker        }    }    selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)    if sticky {        invoker.stickyInvoker = selectedInvoker    }    return selectedInvoker}
  • doSelect办法先判断是否sticky,是的话且invoker.stickyInvoker不为nil且available,则返回stickyInvoker,否则通过invoker.doSelectInvoker(lb, invocation, invokers, invoked)获取selectedInvoker

doSelectInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/base_cluster_invoker.go

func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {    if len(invokers) == 1 {        return invokers[0]    }    selectedInvoker := lb.Select(invokers, invocation)    //judge to if the selectedInvoker is invoked    if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {        // do reselect        var reslectInvokers []protocol.Invoker        for _, invoker := range invokers {            if !invoker.IsAvailable() {                continue            }            if !isInvoked(invoker, invoked) {                reslectInvokers = append(reslectInvokers, invoker)            }        }        if len(reslectInvokers) > 0 {            selectedInvoker = lb.Select(reslectInvokers, invocation)        } else {            return nil        }    }    return selectedInvoker}
  • doSelectInvoker办法通过lb.Select(invokers, invocation)抉择selectedInvoker,如果selectedInvoker曾经invoked或者非available,则遍历invokers从新结构reslectInvokers,再通过lb.Select(reslectInvokers, invocation)抉择selectedInvoker

小结

failfastCluster的Join办法执行newFailFastClusterInvoker(directory);failfastClusterInvoker的Invoke办法先通过invoker.directory.List(invocation)获取invokers,之后通过getLoadBalance(invokers[0], invocation)获取loadbalance,再通过invoker.doSelect(loadbalance, invocation, invokers, nil)抉择ivk,最初执行ivk.Invoke(ctx, invocation)

doc

  • failfast_cluster