序
本文次要钻研一下dubbo-go的DubboInvoker
Invoker
dubbo-go-v1.4.2/protocol/invoker.go
// Extension - Invokertype Invoker interface { common.Node Invoke(context.Context, Invocation) Result}/////////////////////////////// base invoker/////////////////////////////// BaseInvoker ...type BaseInvoker struct { url common.URL available bool destroyed bool}// NewBaseInvoker ...func NewBaseInvoker(url common.URL) *BaseInvoker { return &BaseInvoker{ url: url, available: true, destroyed: false, }}// GetUrl ...func (bi *BaseInvoker) GetUrl() common.URL { return bi.url}// IsAvailable ...func (bi *BaseInvoker) IsAvailable() bool { return bi.available}// IsDestroyed ...func (bi *BaseInvoker) IsDestroyed() bool { return bi.destroyed}// Invoke ...func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result { return &RPCResult{}}// Destroy ...func (bi *BaseInvoker) Destroy() { logger.Infof("Destroy invoker: %s", bi.GetUrl().String()) bi.destroyed = true bi.available = false}
- Invoker定义了Invoke办法;BaseInvoker定义了url、available、destroyed属性;NewBaseInvoker办法实例化了BaseInvoker,其available为true,destroyed为false;Destroy办法设置available为false,destroyed为true
DubboInvoker
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
var ( // ErrNoReply ... ErrNoReply = perrors.New("request need @response") ErrDestroyedInvoker = perrors.New("request Destroyed invoker"))var ( attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY})// DubboInvoker ...type DubboInvoker struct { protocol.BaseInvoker client *Client quitOnce sync.Once // Used to record the number of requests. -1 represent this DubboInvoker is destroyed reqNum int64}
- DubboInvoker定义了client、quitOnce、reqNum属性
NewDubboInvoker
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
// NewDubboInvoker ...func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { return &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), client: client, reqNum: 0, }}
- NewDubboInvoker办法实例化DubboInvoker
Invoke
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
// Invoke ...func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( err error result protocol.RPCResult ) if di.reqNum < 0 { // Generally, the case will not happen, because the invoker has been removed // from the invoker list before destroy,so no new request will enter the destroyed invoker logger.Warnf("this dubboInvoker is destroyed") result.Err = ErrDestroyedInvoker return &result } atomic.AddInt64(&(di.reqNum), 1) defer atomic.AddInt64(&(di.reqNum), -1) inv := invocation.(*invocation_impl.RPCInvocation) for _, k := range attachmentKey { if v := di.GetUrl().GetParam(k, ""); len(v) > 0 { inv.SetAttachments(k, v) } } // put the ctx into attachment di.appendCtx(ctx, inv) url := di.GetUrl() // async async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false")) if err != nil { logger.Errorf("ParseBool - error: %v", err) async = false } response := NewResponse(inv.Reply(), nil) if async { if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) } else { result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) } } else { if inv.Reply() == nil { result.Err = ErrNoReply } else { result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response) } } if result.Err == nil { result.Rest = inv.Reply() result.Attrs = response.atta } logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest) return &result}
- Invoke办法先通过atomic.AddInt64递增reqNum,之后遍历attachmentKey设置到invocation;之后读取constant.ASYNC_KEY属性,若async为true,则执行di.client.AsyncCall或di.client.CallOneway;若async为false则执行di.client.Call;最初返回result
Destroy
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
// Destroy ...func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { for { if di.reqNum == 0 { di.reqNum = -1 logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key()) di.BaseInvoker.Destroy() if di.client != nil { di.client.Close() di.client = nil } break } logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key()) time.Sleep(1 * time.Second) } })}
- Destroy办法在di.reqNum为0时执行di.BaseInvoker.Destroy()及di.client.Close()
小结
Invoker定义了Invoke办法;BaseInvoker定义了url、available、destroyed属性;NewBaseInvoker办法实例化了BaseInvoker,其available为true,destroyed为false;Destroy办法设置available为false,destroyed为true;DubboInvoker的Invoke办法先通过atomic.AddInt64递增reqNum,之后遍历attachmentKey设置到invocation;之后读取constant.ASYNC_KEY属性,若async为true,则执行di.client.AsyncCall或di.client.CallOneway;若async为false则执行di.client.Call;最初返回result
doc
- dubbo_invoker