关于dubbo:聊聊dubbogo的failbackCluster

51次阅读

共计 6088 个字符,预计需要花费 16 分钟才能阅读完成。

本文次要钻研一下 dubbo-go 的 failbackCluster

failbackCluster

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster.go

type failbackCluster struct{}

const failback = "failback"

func init() {extension.SetCluster(failback, NewFailbackCluster)
}

// NewFailbackCluster ...
func NewFailbackCluster() cluster.Cluster {return &failbackCluster{}
}

func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {return newFailbackClusterInvoker(directory)
}
  • failbackCluster 的 join 办法执行 newFailbackClusterInvoker

newFailbackClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

type failbackClusterInvoker struct {
    baseClusterInvoker

    once          sync.Once
    ticker        *time.Ticker
    maxRetries    int64
    failbackTasks int64
    taskList      *queue.Queue
}

func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
    invoker := &failbackClusterInvoker{baseClusterInvoker: newBaseClusterInvoker(directory),
    }
    retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
    retries, err := strconv.Atoi(retriesConfig)
    if err != nil || retries < 0 {logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.")
        retries = constant.DEFAULT_FAILBACK_TIMES_INT
    }

    failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)
    if failbackTasksConfig <= 0 {failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS}
    invoker.maxRetries = int64(retries)
    invoker.failbackTasks = failbackTasksConfig
    return invoker
}
  • newFailbackClusterInvoker 办法创立 failbackClusterInvoker,并设置其 maxRetries、failbackTasks 属性

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {invokers := invoker.directory.List(invocation)
    err := invoker.checkInvokers(invokers, invocation)
    if err != nil {
        logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
            invocation.MethodName(), invoker.GetUrl().Service(), err)
        return &protocol.RPCResult{}}
    url := invokers[0].GetUrl()
    methodName := invocation.MethodName()
    //Get the service loadbalance config
    lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)

    //Get the service method loadbalance config if have
    if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v !="" {lb = v}
    loadbalance := extension.GetLoadbalance(lb)

    invoked := make([]protocol.Invoker, 0, len(invokers))
    var result protocol.Result

    ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
    //DO INVOKE
    result = ivk.Invoke(ctx, invocation)
    if result.Error() != nil {invoker.once.Do(func() {invoker.taskList = queue.New(invoker.failbackTasks)
            go invoker.process(ctx)
        })

        taskLen := invoker.taskList.Len()
        if taskLen >= invoker.failbackTasks {logger.Warnf("tasklist is too full > %d.\n", taskLen)
            return &protocol.RPCResult{}}

        timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)
        invoker.taskList.Put(timerTask)

        logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
            methodName, url.Service(), result.Error().Error())
        // ignore
        return &protocol.RPCResult{}}

    return result
}
  • Invoke 办法先通过 invoker.directory.List(invocation) 获取 invokers,之后通过 extension.GetLoadbalance(lb) 获取 loadbalance,而后通过 invoker.doSelect(loadbalance, invocation, invokers, invoked) 抉择 invoker,之后执行其 Invoke 办法,如果出现异常则设置 invoker.taskList,异步执行 invoker.process(ctx),之后通过 newRetryTimerTask 创立 timerTask,增加到 invoker.taskList

Destroy

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) Destroy() {invoker.baseClusterInvoker.Destroy()

    // stop ticker
    if invoker.ticker != nil {invoker.ticker.Stop()
    }

    _ = invoker.taskList.Dispose()}
  • Destroy 办法执行 invoker.baseClusterInvoker.Destroy()、invoker.ticker.Stop()、invoker.taskList.Dispose()

process

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) process(ctx context.Context) {invoker.ticker = time.NewTicker(time.Second * 1)
    for range invoker.ticker.C {
        // check each timeout task and re-run
        for {value, err := invoker.taskList.Peek()
            if err == queue.ErrDisposed {return}
            if err == queue.ErrEmptyQueue {break}

            retryTask := value.(*retryTimerTask)
            if time.Since(retryTask.lastT).Seconds() < 5 {break}

            // ignore return. the get must success.
            _, err = invoker.taskList.Get(1)
            if err != nil {logger.Warnf("get task found err: %v\n", err)
                break
            }

            go func(retryTask *retryTimerTask) {invoked := make([]protocol.Invoker, 0)
                invoked = append(invoked, retryTask.lastInvoker)

                retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
                var result protocol.Result
                result = retryInvoker.Invoke(ctx, retryTask.invocation)
                if result.Error() != nil {
                    retryTask.lastInvoker = retryInvoker
                    invoker.checkRetry(retryTask, result.Error())
                }
            }(retryTask)

        }
    }
}
  • process 办法通过 time.NewTicker(time.Second * 1) 创立 invoker.ticker,之后从 invoker.taskList.Peek() 获取 retryTask( 之后 Get 办法进行 poll),而后异步执行 invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) 选取 retryInvoker,而后执行 retryInvoker.Invoke(ctx, retryTask.invocation);如果执行出现异常,则通过 invoker.checkRetry(retryTask, result.Error()) 进行 check

checkRetry

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {
    logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n",
        retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error())
    retryTask.retries++
    retryTask.lastT = time.Now()
    if retryTask.retries > invoker.maxRetries {logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n",
            retryTask.retries, retryTask.invocation)
    } else {invoker.taskList.Put(retryTask)
    }
}
  • checkRetry 办法会递增 retryTask.retries,而后判断是否超过 invoker.maxRetries,超过则记录 error 日志,不超过则再次将 retryTask 增加到 invoker.taskList

小结

newFailbackClusterInvoker 办法创立 failbackClusterInvoker,并设置其 maxRetries、failbackTasks 属性;其 Invoke 办法先通过 invoker.directory.List(invocation) 获取 invokers,之后通过 extension.GetLoadbalance(lb) 获取 loadbalance,而后通过 invoker.doSelect(loadbalance, invocation, invokers, invoked) 抉择 invoker,之后执行其 Invoke 办法,如果出现异常则设置 invoker.taskList,异步执行 invoker.process(ctx),之后通过 newRetryTimerTask 创立 timerTask,增加到 invoker.taskList

failbackCluster 疏忽 result,针对失败的会退出队列重试 maxRetries 次,适宜 fireAndForget 的通信模式

doc

  • failback_cluster

正文完
 0