序
本文次要钻研一下dubbo-go的failbackCluster
failbackCluster
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster.go
type failbackCluster struct{}const failback = "failback"func init() { extension.SetCluster(failback, NewFailbackCluster)}// NewFailbackCluster ...func NewFailbackCluster() cluster.Cluster { return &failbackCluster{}}func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker { return newFailbackClusterInvoker(directory)}
- failbackCluster的join办法执行newFailbackClusterInvoker
newFailbackClusterInvoker
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go
type failbackClusterInvoker struct { baseClusterInvoker once sync.Once ticker *time.Ticker maxRetries int64 failbackTasks int64 taskList *queue.Queue}func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { invoker := &failbackClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), } retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) retries, err := strconv.Atoi(retriesConfig) if err != nil || retries < 0 { logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.") retries = constant.DEFAULT_FAILBACK_TIMES_INT } failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS) if failbackTasksConfig <= 0 { failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS } invoker.maxRetries = int64(retries) invoker.failbackTasks = failbackTasksConfig return invoker}
- newFailbackClusterInvoker办法创立failbackClusterInvoker,并设置其maxRetries、failbackTasks属性
Invoke
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) err := invoker.checkInvokers(invokers, invocation) if err != nil { logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", invocation.MethodName(), invoker.GetUrl().Service(), err) return &protocol.RPCResult{} } url := invokers[0].GetUrl() methodName := invocation.MethodName() //Get the service loadbalance config lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) //Get the service method loadbalance config if have if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { lb = v } loadbalance := extension.GetLoadbalance(lb) invoked := make([]protocol.Invoker, 0, len(invokers)) var result protocol.Result ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) //DO INVOKE result = ivk.Invoke(ctx, invocation) if result.Error() != nil { invoker.once.Do(func() { invoker.taskList = queue.New(invoker.failbackTasks) go invoker.process(ctx) }) taskLen := invoker.taskList.Len() if taskLen >= invoker.failbackTasks { logger.Warnf("tasklist is too full > %d.\n", taskLen) return &protocol.RPCResult{} } timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk) invoker.taskList.Put(timerTask) logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", methodName, url.Service(), result.Error().Error()) // ignore return &protocol.RPCResult{} } return result}
- Invoke办法先通过invoker.directory.List(invocation)获取invokers,之后通过extension.GetLoadbalance(lb)获取loadbalance,而后通过invoker.doSelect(loadbalance, invocation, invokers, invoked)抉择invoker,之后执行其Invoke办法,如果出现异常则设置invoker.taskList,异步执行invoker.process(ctx),之后通过newRetryTimerTask创立timerTask,增加到invoker.taskList
Destroy
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go
func (invoker *failbackClusterInvoker) Destroy() { invoker.baseClusterInvoker.Destroy() // stop ticker if invoker.ticker != nil { invoker.ticker.Stop() } _ = invoker.taskList.Dispose()}
- Destroy办法执行invoker.baseClusterInvoker.Destroy()、invoker.ticker.Stop()、invoker.taskList.Dispose()
process
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go
func (invoker *failbackClusterInvoker) process(ctx context.Context) { invoker.ticker = time.NewTicker(time.Second * 1) for range invoker.ticker.C { // check each timeout task and re-run for { value, err := invoker.taskList.Peek() if err == queue.ErrDisposed { return } if err == queue.ErrEmptyQueue { break } retryTask := value.(*retryTimerTask) if time.Since(retryTask.lastT).Seconds() < 5 { break } // ignore return. the get must success. _, err = invoker.taskList.Get(1) if err != nil { logger.Warnf("get task found err: %v\n", err) break } go func(retryTask *retryTimerTask) { invoked := make([]protocol.Invoker, 0) invoked = append(invoked, retryTask.lastInvoker) retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) var result protocol.Result result = retryInvoker.Invoke(ctx, retryTask.invocation) if result.Error() != nil { retryTask.lastInvoker = retryInvoker invoker.checkRetry(retryTask, result.Error()) } }(retryTask) } }}
- process办法通过time.NewTicker(time.Second * 1)创立invoker.ticker,之后从invoker.taskList.Peek()获取retryTask(
之后Get办法进行poll
),而后异步执行invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)选取retryInvoker,而后执行retryInvoker.Invoke(ctx, retryTask.invocation);如果执行出现异常,则通过invoker.checkRetry(retryTask, result.Error())进行check
checkRetry
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go
func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) { logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n", retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error()) retryTask.retries++ retryTask.lastT = time.Now() if retryTask.retries > invoker.maxRetries { logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n", retryTask.retries, retryTask.invocation) } else { invoker.taskList.Put(retryTask) }}
- checkRetry办法会递增retryTask.retries,而后判断是否超过invoker.maxRetries,超过则记录error日志,不超过则再次将retryTask增加到invoker.taskList
小结
newFailbackClusterInvoker办法创立failbackClusterInvoker,并设置其maxRetries、failbackTasks属性;其Invoke办法先通过invoker.directory.List(invocation)获取invokers,之后通过extension.GetLoadbalance(lb)获取loadbalance,而后通过invoker.doSelect(loadbalance, invocation, invokers, invoked)抉择invoker,之后执行其Invoke办法,如果出现异常则设置invoker.taskList,异步执行invoker.process(ctx),之后通过newRetryTimerTask创立timerTask,增加到invoker.taskList
failbackCluster疏忽result,针对失败的会退出队列重试maxRetries次,适宜fireAndForget的通信模式
doc
- failback_cluster