乐趣区

关于dubbo:聊聊dubbogo的DubboPackage

本文只有钻研一下 dubbo-go 的 DubboPackage

DubboPackage

dubbo-go-v1.4.2/protocol/dubbo/codec.go

//CallType call type
type CallType int32

const (
    // CT_UNKNOWN unknown call type
    CT_UNKNOWN CallType = 0
    // CT_OneWay call one way
    CT_OneWay CallType = 1
    // CT_TwoWay call in request/response
    CT_TwoWay CallType = 2
)

// SequenceType ...
type SequenceType int64

// DubboPackage ...
type DubboPackage struct {
    Header  hessian.DubboHeader
    Service hessian.Service
    Body    interface{}
    Err     error
}
  • DubboPackage 定义了 Header、Service、Body、Err 属性

Marshal

dubbo-go-v1.4.2/protocol/dubbo/codec.go

// Marshal ...
func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {codec := hessian.NewHessianCodec(nil)

    pkg, err := codec.Write(p.Service, p.Header, p.Body)
    if err != nil {return nil, perrors.WithStack(err)
    }

    return bytes.NewBuffer(pkg), nil
}
  • Marshal 办法通过 hessian.NewHessianCodec(nil) 创立 codec,之后执行 bytes.NewBuffer(pkg)

Unmarshal

dubbo-go-v1.4.2/protocol/dubbo/codec.go

// Unmarshal ...
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
    // fix issue https://github.com/apache/dubbo-go/issues/380
    bufLen := buf.Len()
    if bufLen < hessian.HEADER_LENGTH {return perrors.WithStack(hessian.ErrHeaderNotEnough)
    }

    codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))

    // read header
    err := codec.ReadHeader(&p.Header)
    if err != nil {return perrors.WithStack(err)
    }

    if len(opts) != 0 { // for client
        client, ok := opts[0].(*Client)
        if !ok {return perrors.Errorf("opts[0] is not of type *Client")
        }

        if p.Header.Type&hessian.PackageRequest != 0x00 {
            // size of this array must be '7'
            // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
            p.Body = make([]interface{}, 7)
        } else {pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
            if !ok {return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
            }
            p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply}
        }
    }

    // read body
    err = codec.ReadBody(p.Body)
    return perrors.WithStack(err)
}
  • Unmarshal 办法先通过 hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen)) 创立 codec,而后执行 codec.ReadHeader(&p.Header) 读取 header,最初通过 codec.ReadBody(p.Body) 读取 body

PendingResponse

dubbo-go-v1.4.2/protocol/dubbo/codec.go

// PendingResponse ...
type PendingResponse struct {
    seq       uint64
    err       error
    start     time.Time
    readStart time.Time
    callback  common.AsyncCallback
    response  *Response
    done      chan struct{}}

// NewPendingResponse ...
func NewPendingResponse() *PendingResponse {
    return &PendingResponse{start:    time.Now(),
        response: &Response{},
        done:     make(chan struct{}),
    }
}
  • NewPendingResponse 实例化了 PendingResponse

GetCallResponse

dubbo-go-v1.4.2/protocol/dubbo/codec.go

// GetCallResponse ...
func (r PendingResponse) GetCallResponse() common.CallbackResponse {
    return AsyncCallbackResponse{
        Cause:     r.err,
        Start:     r.start,
        ReadStart: r.readStart,
        Reply:     r.response,
    }
}
  • GetCallResponse 办法实例化 AsyncCallbackResponse

小结

DubboPackage 定义了 Header、Service、Body、Err 属性;codec.go 提供了 Marshal、Unmarshal 办法用于读写 DubboPackage

doc

  • codec
退出移动版