本文次要钻研一下dubbo-go的DubboInvoker

Invoker

dubbo-go-v1.4.2/protocol/invoker.go

// Extension - Invokertype Invoker interface {    common.Node    Invoke(context.Context, Invocation) Result}/////////////////////////////// base invoker/////////////////////////////// BaseInvoker ...type BaseInvoker struct {    url       common.URL    available bool    destroyed bool}// NewBaseInvoker ...func NewBaseInvoker(url common.URL) *BaseInvoker {    return &BaseInvoker{        url:       url,        available: true,        destroyed: false,    }}// GetUrl ...func (bi *BaseInvoker) GetUrl() common.URL {    return bi.url}// IsAvailable ...func (bi *BaseInvoker) IsAvailable() bool {    return bi.available}// IsDestroyed ...func (bi *BaseInvoker) IsDestroyed() bool {    return bi.destroyed}// Invoke ...func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result {    return &RPCResult{}}// Destroy ...func (bi *BaseInvoker) Destroy() {    logger.Infof("Destroy invoker: %s", bi.GetUrl().String())    bi.destroyed = true    bi.available = false}
  • Invoker定义了Invoke办法;BaseInvoker定义了url、available、destroyed属性;NewBaseInvoker办法实例化了BaseInvoker,其available为true,destroyed为false;Destroy办法设置available为false,destroyed为true

DubboInvoker

dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go

var (    // ErrNoReply ...    ErrNoReply          = perrors.New("request need @response")    ErrDestroyedInvoker = perrors.New("request Destroyed invoker"))var (    attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY})// DubboInvoker ...type DubboInvoker struct {    protocol.BaseInvoker    client   *Client    quitOnce sync.Once    // Used to record the number of requests. -1 represent this DubboInvoker is destroyed    reqNum int64}
  • DubboInvoker定义了client、quitOnce、reqNum属性

NewDubboInvoker

dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go

// NewDubboInvoker ...func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {    return &DubboInvoker{        BaseInvoker: *protocol.NewBaseInvoker(url),        client:      client,        reqNum:      0,    }}
  • NewDubboInvoker办法实例化DubboInvoker

Invoke

dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go

// Invoke ...func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {    var (        err    error        result protocol.RPCResult    )    if di.reqNum < 0 {        // Generally, the case will not happen, because the invoker has been removed        // from the invoker list before destroy,so no new request will enter the destroyed invoker        logger.Warnf("this dubboInvoker is destroyed")        result.Err = ErrDestroyedInvoker        return &result    }    atomic.AddInt64(&(di.reqNum), 1)    defer atomic.AddInt64(&(di.reqNum), -1)    inv := invocation.(*invocation_impl.RPCInvocation)    for _, k := range attachmentKey {        if v := di.GetUrl().GetParam(k, ""); len(v) > 0 {            inv.SetAttachments(k, v)        }    }    // put the ctx into attachment    di.appendCtx(ctx, inv)    url := di.GetUrl()    // async    async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false"))    if err != nil {        logger.Errorf("ParseBool - error: %v", err)        async = false    }    response := NewResponse(inv.Reply(), nil)    if async {        if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {            result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)        } else {            result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))        }    } else {        if inv.Reply() == nil {            result.Err = ErrNoReply        } else {            result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)        }    }    if result.Err == nil {        result.Rest = inv.Reply()        result.Attrs = response.atta    }    logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest)    return &result}
  • Invoke办法先通过atomic.AddInt64递增reqNum,之后遍历attachmentKey设置到invocation;之后读取constant.ASYNC_KEY属性,若async为true,则执行di.client.AsyncCall或di.client.CallOneway;若async为false则执行di.client.Call;最初返回result

Destroy

dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go

// Destroy ...func (di *DubboInvoker) Destroy() {    di.quitOnce.Do(func() {        for {            if di.reqNum == 0 {                di.reqNum = -1                logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())                di.BaseInvoker.Destroy()                if di.client != nil {                    di.client.Close()                    di.client = nil                }                break            }            logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())            time.Sleep(1 * time.Second)        }    })}
  • Destroy办法在di.reqNum为0时执行di.BaseInvoker.Destroy()及di.client.Close()

小结

Invoker定义了Invoke办法;BaseInvoker定义了url、available、destroyed属性;NewBaseInvoker办法实例化了BaseInvoker,其available为true,destroyed为false;Destroy办法设置available为false,destroyed为true;DubboInvoker的Invoke办法先通过atomic.AddInt64递增reqNum,之后遍历attachmentKey设置到invocation;之后读取constant.ASYNC_KEY属性,若async为true,则执行di.client.AsyncCall或di.client.CallOneway;若async为false则执行di.client.Call;最初返回result

doc

  • dubbo_invoker