本文次要钻研一下dubbo-go的failbackCluster

failbackCluster

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster.go

type failbackCluster struct{}const failback = "failback"func init() {    extension.SetCluster(failback, NewFailbackCluster)}// NewFailbackCluster ...func NewFailbackCluster() cluster.Cluster {    return &failbackCluster{}}func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {    return newFailbackClusterInvoker(directory)}
  • failbackCluster的join办法执行newFailbackClusterInvoker

newFailbackClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

type failbackClusterInvoker struct {    baseClusterInvoker    once          sync.Once    ticker        *time.Ticker    maxRetries    int64    failbackTasks int64    taskList      *queue.Queue}func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {    invoker := &failbackClusterInvoker{        baseClusterInvoker: newBaseClusterInvoker(directory),    }    retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)    retries, err := strconv.Atoi(retriesConfig)    if err != nil || retries < 0 {        logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.")        retries = constant.DEFAULT_FAILBACK_TIMES_INT    }    failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)    if failbackTasksConfig <= 0 {        failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS    }    invoker.maxRetries = int64(retries)    invoker.failbackTasks = failbackTasksConfig    return invoker}
  • newFailbackClusterInvoker办法创立failbackClusterInvoker,并设置其maxRetries、failbackTasks属性

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {    invokers := invoker.directory.List(invocation)    err := invoker.checkInvokers(invokers, invocation)    if err != nil {        logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",            invocation.MethodName(), invoker.GetUrl().Service(), err)        return &protocol.RPCResult{}    }    url := invokers[0].GetUrl()    methodName := invocation.MethodName()    //Get the service loadbalance config    lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)    //Get the service method loadbalance config if have    if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {        lb = v    }    loadbalance := extension.GetLoadbalance(lb)    invoked := make([]protocol.Invoker, 0, len(invokers))    var result protocol.Result    ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)    //DO INVOKE    result = ivk.Invoke(ctx, invocation)    if result.Error() != nil {        invoker.once.Do(func() {            invoker.taskList = queue.New(invoker.failbackTasks)            go invoker.process(ctx)        })        taskLen := invoker.taskList.Len()        if taskLen >= invoker.failbackTasks {            logger.Warnf("tasklist is too full > %d.\n", taskLen)            return &protocol.RPCResult{}        }        timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)        invoker.taskList.Put(timerTask)        logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",            methodName, url.Service(), result.Error().Error())        // ignore        return &protocol.RPCResult{}    }    return result}
  • Invoke办法先通过invoker.directory.List(invocation)获取invokers,之后通过extension.GetLoadbalance(lb)获取loadbalance,而后通过invoker.doSelect(loadbalance, invocation, invokers, invoked)抉择invoker,之后执行其Invoke办法,如果出现异常则设置invoker.taskList,异步执行invoker.process(ctx),之后通过newRetryTimerTask创立timerTask,增加到invoker.taskList

Destroy

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) Destroy() {    invoker.baseClusterInvoker.Destroy()    // stop ticker    if invoker.ticker != nil {        invoker.ticker.Stop()    }    _ = invoker.taskList.Dispose()}
  • Destroy办法执行invoker.baseClusterInvoker.Destroy()、invoker.ticker.Stop()、invoker.taskList.Dispose()

process

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) process(ctx context.Context) {    invoker.ticker = time.NewTicker(time.Second * 1)    for range invoker.ticker.C {        // check each timeout task and re-run        for {            value, err := invoker.taskList.Peek()            if err == queue.ErrDisposed {                return            }            if err == queue.ErrEmptyQueue {                break            }            retryTask := value.(*retryTimerTask)            if time.Since(retryTask.lastT).Seconds() < 5 {                break            }            // ignore return. the get must success.            _, err = invoker.taskList.Get(1)            if err != nil {                logger.Warnf("get task found err: %v\n", err)                break            }            go func(retryTask *retryTimerTask) {                invoked := make([]protocol.Invoker, 0)                invoked = append(invoked, retryTask.lastInvoker)                retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)                var result protocol.Result                result = retryInvoker.Invoke(ctx, retryTask.invocation)                if result.Error() != nil {                    retryTask.lastInvoker = retryInvoker                    invoker.checkRetry(retryTask, result.Error())                }            }(retryTask)        }    }}
  • process办法通过time.NewTicker(time.Second * 1)创立invoker.ticker,之后从invoker.taskList.Peek()获取retryTask(之后Get办法进行poll),而后异步执行invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)选取retryInvoker,而后执行retryInvoker.Invoke(ctx, retryTask.invocation);如果执行出现异常,则通过invoker.checkRetry(retryTask, result.Error())进行check

checkRetry

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {    logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n",        retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error())    retryTask.retries++    retryTask.lastT = time.Now()    if retryTask.retries > invoker.maxRetries {        logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n",            retryTask.retries, retryTask.invocation)    } else {        invoker.taskList.Put(retryTask)    }}
  • checkRetry办法会递增retryTask.retries,而后判断是否超过invoker.maxRetries,超过则记录error日志,不超过则再次将retryTask增加到invoker.taskList

小结

newFailbackClusterInvoker办法创立failbackClusterInvoker,并设置其maxRetries、failbackTasks属性;其Invoke办法先通过invoker.directory.List(invocation)获取invokers,之后通过extension.GetLoadbalance(lb)获取loadbalance,而后通过invoker.doSelect(loadbalance, invocation, invokers, invoked)抉择invoker,之后执行其Invoke办法,如果出现异常则设置invoker.taskList,异步执行invoker.process(ctx),之后通过newRetryTimerTask创立timerTask,增加到invoker.taskList

failbackCluster疏忽result,针对失败的会退出队列重试maxRetries次,适宜fireAndForget的通信模式

doc

  • failback_cluster