tinyrpc 是一个高性能的基于 protocol buffer
的 rpc 框架。我的项目代码非常少,很适宜初学者进行 golang 的学习。
tinyrpc 性能
tinyrpc
基于 TCP 协定,反对各种压缩格局,基于 protocol buffer
的序列化协定。其 rpc 是基于 golang 原生的 net/rpc
开发而成。
tinyrpc 我的项目构造
tinyrpc
基于 net/rpc
开发而成,在此基础上集成了额定的能力。我的项目构造如图:
性能目录如下:
- codec 编码模块
- compressor 压缩模块
- header 申请 / 响应头模块
- protoc-gen-tinyrpc 代码生成插件
- serializer 序列化模块
tinyrpc 源码解读
客户端和服务端构建
客户端是以 net/rpc
的rpc.Client
为根底构建,在此基础上定义了 Option
以配置压缩形式和序列化形式:
type Option func(o *options)
type options struct {
compressType compressor.CompressType
serializer serializer.Serializer
}
在创立客户端的时候将配置好的压缩算法和序列化形式作为创立客户端的参数:
func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
options := options{
compressType: compressor.Raw,
serializer: serializer.Proto,
}
for _, option := range opts {option(&options)
}
return &Client{rpc.NewClientWithCodec(codec.NewClientCodec(conn, options.compressType, options.serializer))}
}
服务端是以 net/rpc
的rpc.Server
为根底构建,在此基础上扩大了 Server
的定义:
type Server struct {
*rpc.Server
serializer.Serializer
}
在创立客户端和开启服务时传入序列化形式:
func NewServer(opts ...Option) *Server {
options := options{serializer: serializer.Proto,}
for _, option := range opts {option(&options)
}
return &Server{&rpc.Server{}, options.serializer}
}
func (s *Server) Serve(lis net.Listener) {log.Printf("tinyrpc started on: %s", lis.Addr().String())
for {conn, err := lis.Accept()
if err != nil {continue}
go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))
}
}
压缩算法 compressor
压缩算法的实现中首先是定义了压缩的接口:
type Compressor interface {Zip([]byte) ([]byte, error)
Unzip([]byte) ([]byte, error)
}
压缩的接口蕴含压缩和解压办法。
压缩算法应用的是 uint
类型,应用 iota
来初始化,并且应用 map 来进行所有压缩算法实现的治理:
type CompressType uint16
const (
Raw CompressType = iota
Gzip
Snappy
Zlib
)
// Compressors which supported by rpc
var Compressors = map[CompressType]Compressor{Raw: RawCompressor{},
Gzip: GzipCompressor{},
Snappy: SnappyCompressor{},
Zlib: ZlibCompressor{},}
序列化 serializer
序列化局部代码非常简单,提供了一个接口:
type Serializer interface {Marshal(message interface{}) ([]byte, error)
Unmarshal(data []byte, message interface{}) error
}
目前只有 ProtoSerializer
一个实现,ProtoSerializer
外部的实现是基于 "google.golang.org/protobuf/proto"
来实现的,并没有什么非凡的解决,因而就不破费笔墨详述了。
申请 / 响应头 header
tinyrpc
定义了本人的申请头和响应头:
// RequestHeader request header structure looks like:
// +--------------+----------------+----------+------------+----------+
// | CompressType | Method | ID | RequestLen | Checksum |
// +--------------+----------------+----------+------------+----------+
// | uint16 | uvarint+string | uvarint | uvarint | uint32 |
// +--------------+----------------+----------+------------+----------+
type RequestHeader struct {
sync.RWMutex
CompressType compressor.CompressType
Method string
ID uint64
RequestLen uint32
Checksum uint32
}
申请头由压缩类型,办法,id,申请长度和校验码组成。
// ResponseHeader request header structure looks like:
// +--------------+---------+----------------+-------------+----------+
// | CompressType | ID | Error | ResponseLen | Checksum |
// +--------------+---------+----------------+-------------+----------+
// | uint16 | uvarint | uvarint+string | uvarint | uint32 |
// +--------------+---------+----------------+-------------+----------+
type ResponseHeader struct {
sync.RWMutex
CompressType compressor.CompressType
ID uint64
Error string
ResponseLen uint32
Checksum uint32
}
响应头由压缩类型,id,错误信息,返回长度和校验码组成。
为了实现头的重用,tinyrpc
为头构建了缓存池:
var (
RequestPool sync.Pool
ResponsePool sync.Pool
)
func init() {RequestPool = sync.Pool{New: func() interface{} {return &RequestHeader{}
}}
ResponsePool = sync.Pool{New: func() interface{} {return &ResponseHeader{}
}}
}
在应用时 get 进去,生命周期完结后放回池子,并且在 put 之前须要进行重置:
h := header.RequestPool.Get().(*header.RequestHeader)
defer func() {h.ResetHeader()
header.RequestPool.Put(h)
}()
// ResetHeader reset request header
func (r *RequestHeader) ResetHeader() {r.Lock()
defer r.Unlock()
r.ID = 0
r.Checksum = 0
r.Method = ""
r.CompressType = 0
r.RequestLen = 0
}
// ResetHeader reset response header
func (r *ResponseHeader) ResetHeader() {r.Lock()
defer r.Unlock()
r.Error = ""
r.ID = 0
r.CompressType = 0
r.Checksum = 0
r.ResponseLen = 0
}
搞清楚了头的构造以及对象池的复用逻辑,那么具体的头的编码与解码就是很简略的拆装工作,就不在此一行一行解析了,大家有趣味能够自行去浏览。
编码 codec
因为 tinyrpc
是基于 net/rpc
开发,那么其 codec
模块天然也是依赖于 net/rpc
的ClientCodec
和 ServerCodec
接口来实现的。
客户端实现
客户端是基于 ClientCodec
实现的能力:
type ClientCodec interface {WriteRequest(*Request, any) error
ReadResponseHeader(*Response) error
ReadResponseBody(any) error
Close() error}
client
定义了一个 clientCodec
类型,并且实现了 ClientCodec
的接口办法:
type clientCodec struct {
r io.Reader
w io.Writer
c io.Closer
compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
serializer serializer.Serializer
response header.ResponseHeader // rpc response header
mutex sync.Mutex // protect pending map
pending map[uint64]string
}
WriteRequest
实现:
// WriteRequest Write the rpc request header and body to the io stream
func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {c.mutex.Lock()
c.pending[r.Seq] = r.ServiceMethod
c.mutex.Unlock()
if _, ok := compressor.Compressors[c.compressor]; !ok {return NotFoundCompressorError}
reqBody, err := c.serializer.Marshal(param)
if err != nil {return err}
compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)
if err != nil {return err}
h := header.RequestPool.Get().(*header.RequestHeader)
defer func() {h.ResetHeader()
header.RequestPool.Put(h)
}()
h.ID = r.Seq
h.Method = r.ServiceMethod
h.RequestLen = uint32(len(compressedReqBody))
h.CompressType = compressor.CompressType(c.compressor)
h.Checksum = crc32.ChecksumIEEE(compressedReqBody)
if err := sendFrame(c.w, h.Marshal()); err != nil {return err}
if err := write(c.w, compressedReqBody); err != nil {return err}
c.w.(*bufio.Writer).Flush()
return nil
}
能够看到代码的实现还是比拟清晰的,次要分为几个步骤:
- 将数据进行序列化形成申请体
- 抉择相应的压缩算法进行压缩
- 从 Pool 中获取申请头实例将数据全副填入其中形成最初的申请头
- 别离通过 io 操作发送解决过的申请头和申请体
ReadResponseHeader
实现:
// ReadResponseHeader read the rpc response header from the io stream
func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {c.response.ResetHeader()
data, err := recvFrame(c.r)
if err != nil {return err}
err = c.response.Unmarshal(data)
if err != nil {return err}
c.mutex.Lock()
r.Seq = c.response.ID
r.Error = c.response.Error
r.ServiceMethod = c.pending[r.Seq]
delete(c.pending, r.Seq)
c.mutex.Unlock()
return nil
}
此办法作用是读取返回的响应头,并解析成具体的构造体
ReadResponseBody
实现:
func (c *clientCodec) ReadResponseBody(param interface{}) error {
if param == nil {
if c.response.ResponseLen != 0 {if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {return err}
}
return nil
}
respBody := make([]byte, c.response.ResponseLen)
err := read(c.r, respBody)
if err != nil {return err}
if c.response.Checksum != 0 {if crc32.ChecksumIEEE(respBody) != c.response.Checksum {return UnexpectedChecksumError}
}
if c.response.GetCompressType() != c.compressor {return CompressorTypeMismatchError}
resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)
if err != nil {return err}
return c.serializer.Unmarshal(resp, param)
}
此办法是用于读取返回的响应构造体,流程如下:
- 读取流获取响应体
- 依据响应头中的校验码来比对响应体是否残缺
- 依据压缩算法来解压具体的构造体
- 进行反序列化
服务端实现
服务端是基于 ServerCodec
实现的能力:
type ServerCodec interface {ReadRequestHeader(*Request) error
ReadRequestBody(any) error
WriteResponse(*Response, any) error
// Close can be called multiple times and must be idempotent.
Close() error}
和客户端相似,server
定义了一个 serverCodec
类型,并且实现了 ServerCodec
的接口办法:
type serverCodec struct {
r io.Reader
w io.Writer
c io.Closer
request header.RequestHeader
serializer serializer.Serializer
mutex sync.Mutex // protects seq, pending
seq uint64
pending map[uint64]*reqCtx
}
ReadRequestHeader
实现:
// ReadRequestHeader read the rpc request header from the io stream
func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {s.request.ResetHeader()
data, err := recvFrame(s.r)
if err != nil {return err}
err = s.request.Unmarshal(data)
if err != nil {return err}
s.mutex.Lock()
s.seq++
s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()}
r.ServiceMethod = s.request.Method
r.Seq = s.seq
s.mutex.Unlock()
return nil
}
此办法用于读取申请头并解析成构造体
ReadRequestBody
实现:
// ReadRequestBody read the rpc request body from the io stream
func (s *serverCodec) ReadRequestBody(param interface{}) error {
if param == nil {
if s.request.RequestLen != 0 {if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {return err}
}
return nil
}
reqBody := make([]byte, s.request.RequestLen)
err := read(s.r, reqBody)
if err != nil {return err}
if s.request.Checksum != 0 {if crc32.ChecksumIEEE(reqBody) != s.request.Checksum {return UnexpectedChecksumError}
}
if _, ok := compressor.
Compressors[s.request.GetCompressType()]; !ok {return NotFoundCompressorError}
req, err := compressor.
Compressors[s.request.GetCompressType()].Unzip(reqBody)
if err != nil {return err}
return s.serializer.Unmarshal(req, param)
}
此办法用于读取申请体,流程和读取响应体差不多,大抵如下:
- 读取流并解析成申请体
- 依据申请头中的校验码进行校验
- 依据压缩算法进行解压
- 反序列化
WriteResponse
实现:
// WriteResponse Write the rpc response header and body to the io stream
func (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error {s.mutex.Lock()
reqCtx, ok := s.pending[r.Seq]
if !ok {s.mutex.Unlock()
return InvalidSequenceError
}
delete(s.pending, r.Seq)
s.mutex.Unlock()
if r.Error != "" {param = nil}
if _, ok := compressor.
Compressors[reqCtx.compareType]; !ok {return NotFoundCompressorError}
var respBody []byte
var err error
if param != nil {respBody, err = s.serializer.Marshal(param)
if err != nil {return err}
}
compressedRespBody, err := compressor.
Compressors[reqCtx.compareType].Zip(respBody)
if err != nil {return err}
h := header.ResponsePool.Get().(*header.ResponseHeader)
defer func() {h.ResetHeader()
header.ResponsePool.Put(h)
}()
h.ID = reqCtx.requestID
h.Error = r.Error
h.ResponseLen = uint32(len(compressedRespBody))
h.Checksum = crc32.ChecksumIEEE(compressedRespBody)
h.CompressType = reqCtx.compareType
if err = sendFrame(s.w, h.Marshal()); err != nil {return err}
if err = write(s.w, compressedRespBody); err != nil {return err}
s.w.(*bufio.Writer).Flush()
return nil
}
此办法用于写入响应体,大抵与写入申请体差不多,流程如下:
- 将响应体序列化
- 应用压缩算法将响应体进行压缩
- 应用 Pool 治理响应头
- 别离发送返回头和返回体
总结
tinyrpc
是基于 golang
原生的 net/rpc
包实现,在此基础上实现了压缩和序列化等能力扩大。整体来看 tinyrpc
的代码非常简单,比拟适宜刚接触 golang
的程序员来进行浏览学习,学习一些 golang
的根底的开发技巧和一些语言个性。