关于dubbo:聊聊dubbogo的forkingCluster

本文次要钻研一下dubbo-go的forkingCluster

forkingCluster

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster.go

type forkingCluster struct{}

const forking = "forking"

func init() {
    extension.SetCluster(forking, NewForkingCluster)
}

// NewForkingCluster ...
func NewForkingCluster() cluster.Cluster {
    return &forkingCluster{}
}

func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {
    return newForkingClusterInvoker(directory)
}
  • forkingCluster的Join办法执行newForkingClusterInvoker

newForkingClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go

type forkingClusterInvoker struct {
    baseClusterInvoker
}

func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
    return &forkingClusterInvoker{
        baseClusterInvoker: newBaseClusterInvoker(directory),
    }
}
  • newForkingClusterInvoker创立了forkingClusterInvoker

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go

// Invoke ...
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
    err := invoker.checkWhetherDestroyed()
    if err != nil {
        return &protocol.RPCResult{Err: err}
    }

    invokers := invoker.directory.List(invocation)
    err = invoker.checkInvokers(invokers, invocation)
    if err != nil {
        return &protocol.RPCResult{Err: err}
    }

    var selected []protocol.Invoker
    forks := int(invoker.GetUrl().GetParamInt(constant.FORKS_KEY, constant.DEFAULT_FORKS))
    timeouts := invoker.GetUrl().GetParamInt(constant.TIMEOUT_KEY, constant.DEFAULT_TIMEOUT)
    if forks < 0 || forks > len(invokers) {
        selected = invokers
    } else {
        selected = make([]protocol.Invoker, 0)
        loadbalance := getLoadBalance(invokers[0], invocation)
        for i := 0; i < forks; i++ {
            ivk := invoker.doSelect(loadbalance, invocation, invokers, selected)
            if ivk != nil {
                selected = append(selected, ivk)
            }
        }
    }

    resultQ := queue.New(1)
    for _, ivk := range selected {
        go func(k protocol.Invoker) {
            result := k.Invoke(ctx, invocation)
            err := resultQ.Put(result)
            if err != nil {
                logger.Errorf("resultQ put failed with exception: %v.\n", err)
            }
        }(ivk)
    }

    rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))
    if err != nil {
        return &protocol.RPCResult{
            Err: fmt.Errorf("failed to forking invoke provider %v, "+
                "but no luck to perform the invocation. Last error is: %v", selected, err),
        }
    }
    if len(rsps) == 0 {
        return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but no resp", selected)}
    }

    result, ok := rsps[0].(protocol.Result)
    if !ok {
        return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)}
    }

    return result
}
  • Invoke办法先通过invoker.directory.List(invocation)获取invokers,之后从invoker.GetUrl()获取forks及timeouts参数,而后循环forks次通过invoker.doSelect(loadbalance, invocation, invokers, selected)选出selected的invokers;之后遍历selected异步执行其Invoke办法,并将后果放到resultQ中;最初通过resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))拉取最先返回的后果返回

小结

forkingCluster的Join办法执行newForkingClusterInvoker;其Invoke办法循环forks次通过invoker.doSelect(loadbalance, invocation, invokers, selected)选出selected的invokers;之后遍历selected异步执行其Invoke办法,并将后果放到resultQ中;最初通过resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))拉取最先返回的后果返回

doc

  • forking_cluster

【腾讯云】轻量 2核2G4M,首年65元

阿里云限时活动-云数据库 RDS MySQL  1核2G配置 1.88/月 速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

您可能还喜欢...

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据