序
本文次要钻研一下 dubbo-go 的 DubboInvoker
Invoker
dubbo-go-v1.4.2/protocol/invoker.go
// Extension - Invoker
type Invoker interface {
common.Node
Invoke(context.Context, Invocation) Result
}
/////////////////////////////
// base invoker
/////////////////////////////
// BaseInvoker ...
type BaseInvoker struct {
url common.URL
available bool
destroyed bool
}
// NewBaseInvoker ...
func NewBaseInvoker(url common.URL) *BaseInvoker {
return &BaseInvoker{
url: url,
available: true,
destroyed: false,
}
}
// GetUrl ...
func (bi *BaseInvoker) GetUrl() common.URL {return bi.url}
// IsAvailable ...
func (bi *BaseInvoker) IsAvailable() bool {return bi.available}
// IsDestroyed ...
func (bi *BaseInvoker) IsDestroyed() bool {return bi.destroyed}
// Invoke ...
func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result {return &RPCResult{}
}
// Destroy ...
func (bi *BaseInvoker) Destroy() {logger.Infof("Destroy invoker: %s", bi.GetUrl().String())
bi.destroyed = true
bi.available = false
}
- Invoker 定义了 Invoke 办法;BaseInvoker 定义了 url、available、destroyed 属性;NewBaseInvoker 办法实例化了 BaseInvoker,其 available 为 true,destroyed 为 false;Destroy 办法设置 available 为 false,destroyed 为 true
DubboInvoker
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
var (
// ErrNoReply ...
ErrNoReply = perrors.New("request need @response")
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)
var (attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
)
// DubboInvoker ...
type DubboInvoker struct {
protocol.BaseInvoker
client *Client
quitOnce sync.Once
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}
- DubboInvoker 定义了 client、quitOnce、reqNum 属性
NewDubboInvoker
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
// NewDubboInvoker ...
func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
return &DubboInvoker{BaseInvoker: *protocol.NewBaseInvoker(url),
client: client,
reqNum: 0,
}
}
- NewDubboInvoker 办法实例化 DubboInvoker
Invoke
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
// Invoke ...
func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
err error
result protocol.RPCResult
)
if di.reqNum < 0 {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = ErrDestroyedInvoker
return &result
}
atomic.AddInt64(&(di.reqNum), 1)
defer atomic.AddInt64(&(di.reqNum), -1)
inv := invocation.(*invocation_impl.RPCInvocation)
for _, k := range attachmentKey {if v := di.GetUrl().GetParam(k, ""); len(v) > 0 {inv.SetAttachments(k, v)
}
}
// put the ctx into attachment
di.appendCtx(ctx, inv)
url := di.GetUrl()
// async
async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false"))
if err != nil {logger.Errorf("ParseBool - error: %v", err)
async = false
}
response := NewResponse(inv.Reply(), nil)
if async {if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
} else {result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))
}
} else {if inv.Reply() == nil {result.Err = ErrNoReply} else {result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)
}
}
if result.Err == nil {result.Rest = inv.Reply()
result.Attrs = response.atta
}
logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest)
return &result
}
- Invoke 办法先通过 atomic.AddInt64 递增 reqNum,之后遍历 attachmentKey 设置到 invocation;之后读取 constant.ASYNC_KEY 属性,若 async 为 true,则执行 di.client.AsyncCall 或 di.client.CallOneway;若 async 为 false 则执行 di.client.Call;最初返回 result
Destroy
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
// Destroy ...
func (di *DubboInvoker) Destroy() {di.quitOnce.Do(func() {
for {
if di.reqNum == 0 {
di.reqNum = -1
logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
if di.client != nil {di.client.Close()
di.client = nil
}
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
time.Sleep(1 * time.Second)
}
})
}
- Destroy 办法在 di.reqNum 为 0 时执行 di.BaseInvoker.Destroy() 及 di.client.Close()
小结
Invoker 定义了 Invoke 办法;BaseInvoker 定义了 url、available、destroyed 属性;NewBaseInvoker 办法实例化了 BaseInvoker,其 available 为 true,destroyed 为 false;Destroy 办法设置 available 为 false,destroyed 为 true;DubboInvoker 的 Invoke 办法先通过 atomic.AddInt64 递增 reqNum,之后遍历 attachmentKey 设置到 invocation;之后读取 constant.ASYNC_KEY 属性,若 async 为 true,则执行 di.client.AsyncCall 或 di.client.CallOneway;若 async 为 false 则执行 di.client.Call;最初返回 result
doc
- dubbo_invoker