简介: Java 的世界里,大家宽泛应用的一个高性能网络通信框架 netty,很多 RPC 框架都是基于 netty 来实现的。在 golang 的世界里,getty 也是一个相似 netty 的高性能网络通信库。getty 最后由 dubbogo 我的项目负责人于雨开发,作为底层通信库在 dubbo-go 中应用。随着 dubbo-go 募捐给 apache 基金会,在社区小伙伴的共同努力下,getty 也最终进入到 apache 这个小家庭,并改名 dubbo-getty 。

作者 | 刘晓敏 于雨

一、简介

Java 的世界里,大家宽泛应用的一个高性能网络通信框架 netty,很多 RPC 框架都是基于 netty 来实现的。在 golang 的世界里,getty 也是一个相似 netty 的高性能网络通信库。getty 最后由 dubbogo 我的项目负责人于雨开发,作为底层通信库在 dubbo-go 中应用。随着 dubbo-go 募捐给 apache 基金会,在社区小伙伴的共同努力下,getty 也最终进入到 apache 这个小家庭,并改名 dubbo-getty 。

18 年的时候,我在公司里实际微服务,过后遇到最大的问题就是分布式事务问题。同年,阿里在社区开源他们的分布式事务解决方案,我也很快关注到这个我的项目,起初还叫 fescar,起初更名 seata。因为我对开源技术很感兴趣,加了很多社区群,过后也很关注 dubbo-go 这个我的项目,在外面默默潜水。随着对 seata 的理解,逐步萌发了做一个 go 版本的分布式事务框架的想法。

要做一个 golang 版的分布式事务框架,首要的一个问题就是如何实现 RPC 通信。dubbo-go 就是很好的一个例子摆在眼前,遂开始钻研 dubbo-go 的底层 getty。

二、如何基于 getty 实现 RPC 通信

getty 框架的整体模型图如下:

上面联合相干代码,详述 seata-golang 的 RPC 通信过程。

1. 建设连贯

实现 RPC 通信,首先要建设网络连接吧,咱们从 client.go 开始看起。

func (c *client) connect() {    var (        err error        ss  Session    )    for {        _// 建设一个 session 连贯_        ss = c.dial()        if ss == nil {            _// client has been closed_            break        }        err = c.newSession(ss)        if err == nil {            _// 收发报文_            ss.(*session).run()            _// 此处省略局部代码_                  break        }        _// don't distinguish between tcp connection and websocket connection. Because_        _// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()_        ss.Conn().Close()    }}

connect() 办法通过 dial() 办法失去了一个 session 连贯,进入 dial() 办法:

func (c *client) dial() Session {    switch c.endPointType {    case TCP_CLIENT:        return c.dialTCP()    case UDP_CLIENT:        return c.dialUDP()    case WS_CLIENT:        return c.dialWS()    case WSS_CLIENT:        return c.dialWSS()    }    return nil}

咱们关注的是 TCP 连贯,所以持续进入 c.dialTCP() 办法:

func (c *client) dialTCP() Session {    var (        err  error        conn net.Conn    )    for {        if c.IsClosed() {            return nil        }        if c.sslEnabled {            if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {                d := &net.Dialer{Timeout: connectTimeout}                _// 建设加密连贯_                conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)            }        } else {            _// 建设 tcp 连贯_            conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)        }        if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {            conn.Close()            err = errSelfConnect        }        if err == nil {            _// 返回一个 TCPSession_            return newTCPSession(conn, c)        }        log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))        <-wheel.After(connectInterval)    }}

至此,咱们晓得了 getty 如何建设 TCP 连贯,并返回 TCPSession。

2. 收发报文

那它是怎么收发报文的呢,咱们回到 connection 办法接着往下看,有这样一行 ss.(*session).run(),在这行代码之后代码都是很简略的操作,咱们猜想这行代码运行的逻辑外面肯定蕴含收发报文的逻辑,接着进入 run() 办法:

func (s *session) run() {    _// 省略局部代码_      go s.handleLoop()    go s.handlePackage()}

这里起了两个 goroutine,handleLoophandlePackage,看字面意思合乎咱们的猜测,进入 handleLoop() 办法:

func (s *session) handleLoop() {    _// 省略局部代码_      for {        _// A select blocks until one of its cases is ready to run._        _// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready._        select {        _// 省略局部代码_              case outPkg, ok = <-s.wQ:            _// 省略局部代码_            iovec = iovec[:0]            for idx := 0; idx < maxIovecNum; idx++ {        _// 通过 s.writer 将 interface{} 类型的 outPkg 编码成二进制的比特_                pkgBytes, err = s.writer.Write(s, outPkg)                _// 省略局部代码_                        iovec = append(iovec, pkgBytes)                _//省略局部代码_            }            _// 将这些二进制比特发送进来_            err = s.WriteBytesArray(iovec[:]...)            if err != nil {                log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",                    s.sessionToken(), len(iovec), perrors.WithStack(err))                s.stop()                _// break LOOP_                flag = false            }        case <-wheel.After(s.period):            if flag {                if wsFlag {                    err := wsConn.writePing()                    if err != nil {                        log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))                    }                }                _// 定时执行的逻辑,心跳等_                s.listener.OnCron(s)            }        }    }}

通过下面的代码,咱们不难发现,handleLoop() 办法解决的是发送报文的逻辑,RPC 须要发送的音讯首先由 s.writer 编码成二进制比特,而后通过建设的 TCP 连贯发送进来。这个 s.writer 对应的 Writer 接口是 RPC 框架必须要实现的一个接口。

持续看 handlePackage() 办法:

func (s *session) handlePackage() {    _// 省略局部代码_    if _, ok := s.Connection.(*gettyTCPConn); ok {        if s.reader == nil {            errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)            log.Error(errStr)            panic(errStr)        }        err = s.handleTCPPackage()    } else if _, ok := s.Connection.(*gettyWSConn); ok {        err = s.handleWSPackage()    } else if _, ok := s.Connection.(*gettyUDPConn); ok {        err = s.handleUDPPackage()    } else {        panic(fmt.Sprintf("unknown type session{%#v}", s))    }}

进入 handleTCPPackage() 办法:

func (s *session) handleTCPPackage() error {    _// 省略局部代码_    conn = s.Connection.(*gettyTCPConn)    for {        _// 省略局部代码_        bufLen = 0        for {            _// for clause for the network timeout condition check_            _// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))_            _// 从 TCP 连贯中收到报文_            bufLen, err = conn.recv(buf)            _// 省略局部代码_                  break        }        _// 省略局部代码_            _// 将收到的报文二进制比特写入 pkgBuf_        pktBuf.Write(buf[:bufLen])        for {            if pktBuf.Len() <= 0 {                break            }            _// 通过 s.reader 将收到的报文解码成 RPC 音讯_            pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())            _// 省略局部代码_      s.UpdateActive()            _// 将收到的音讯放入 TaskQueue 供 RPC 生产端生产_            s.addTask(pkg)            pktBuf.Next(pkgLen)            _// continue to handle case 5_        }        if exit {            break        }    }    return perrors.WithStack(err)}

从下面的代码逻辑咱们剖析出,RPC 生产端须要将从 TCP 连贯收到的二进制比特报文解码成 RPC 能生产的音讯,这个工作由 s.reader 实现,所以,咱们要构建 RPC 通信层也须要实现 s.reader 对应的 Reader 接口。

3. 底层解决网络报文的逻辑如何与业务逻辑解耦

咱们都晓得,netty 通过 boss 线程和 worker 线程实现了底层网络逻辑和业务逻辑的解耦。那么,getty 是如何实现的呢?

handlePackage() 办法最初,咱们看到,收到的音讯被放入了 s.addTask(pkg) 这个办法,接着往下剖析:

func (s *session) addTask(pkg interface{}) {    f := func() {        s.listener.OnMessage(s, pkg)        s.incReadPkgNum()    }    if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {        taskPool.AddTaskAlways(f)        return    }    f()}

pkg 参数传递到了一个匿名办法,这个办法最终放入了 taskPool。这个办法很要害,在我起初写 seata-golang 代码的时候,就遇到了一个坑,这个坑前面剖析。

接着咱们看一下 taskPool 的定义:

_// NewTaskPoolSimple build a simple task pool_func NewTaskPoolSimple(size int) GenericTaskPool {    if size < 1 {        size = runtime.NumCPU() * 100    }    return &taskPoolSimple{        work: make(chan task),        sem:  make(chan struct{}, size),        done: make(chan struct{}),    }}

构建了一个缓冲大小为 size (默认为  runtime.NumCPU() * 100) 的 channel sem。再看办法 AddTaskAlways(t task)

func (p *taskPoolSimple) AddTaskAlways(t task) {    select {    case <-p.done:        return    default:    }    select {    case p.work <- t:        return    default:    }    select {    case p.work <- t:    case p.sem <- struct{}{}:        p.wg.Add(1)        go p.worker(t)    default:        goSafely(t)    }}

退出的工作,会先由 len(p.sem) 个 goroutine 去生产,如果没有 goroutine 闲暇,则会启动一个长期的 goroutine 去运行 t()。相当于有  len(p.sem) 个 goroutine 组成了 goroutine pool,pool 中的 goroutine 去解决业务逻辑,而不是由解决网络报文的 goroutine 去运行业务逻辑,从而实现理解耦。写 seata-golang 时遇到的一个坑,就是遗记设置 taskPool 造成了解决业务逻辑和解决底层网络报文逻辑的 goroutine 是同一个,我在业务逻辑中阻塞期待一个工作实现时,阻塞了整个 goroutine,使得阻塞期间收不到任何报文。

4. 具体实现

上面的代码见 getty.go:

_// Reader is used to unmarshal a complete pkg from buffer_type Reader interface {    Read(Session, []byte) (interface{}, int, error)}_// Writer is used to marshal pkg and write to session_type Writer interface {    _// if @Session is udpGettySession, the second parameter is UDPContext._    Write(Session, interface{}) ([]byte, error)}_// ReadWriter interface use for handle application packages_type ReadWriter interface {    Reader    Writer}
_// EventListener is used to process pkg that received from remote session_type EventListener interface {    _// invoked when session opened_    _// If the return error is not nil, @Session will be closed._    OnOpen(Session) error    _// invoked when session closed._    OnClose(Session)    _// invoked when got error._    OnError(Session, error)    _// invoked periodically, its period can be set by (Session)SetCronPeriod_    OnCron(Session)    _// invoked when getty received a package. Pls attention that do not handle long time_    _// logic processing in this func. You'd better set the package's maximum length._    _// If the message's length is greater than it, u should should return err in_    _// Reader{Read} and getty will close this connection soon._    _//_    _// If ur logic processing in this func will take a long time, u should start a goroutine_    _// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u_    _// can do the logic processing in other asynchronous way._    _// !!!In short, ur OnMessage callback func should return asap._    _//_    _// If this is a udp event listener, the second parameter type is UDPContext._    OnMessage(Session, interface{})}

通过对整个 getty 代码的剖析,咱们只有实现  ReadWriter 来对 RPC  音讯编解码,再实现 EventListener 来解决 RPC 音讯的对应的具体逻辑,将 ReadWriter 实现和 EventLister 实现注入到 RPC 的 Client 和 Server 端,则可实现 RPC 通信。

4.1 编解码协定实现

上面是 seata 协定的定义:

在 ReadWriter 接口的实现 RpcPackageHandler 中,调用 Codec 办法对音讯体依照下面的格局编解码:

_// 音讯编码为二进制比特_func MessageEncoder(codecType byte, in interface{}) []byte {    switch codecType {    case SEATA:        return SeataEncoder(in)    default:        log.Errorf("not support codecType, %s", codecType)        return nil    }}_// 二进制比特解码为音讯体_func MessageDecoder(codecType byte, in []byte) (interface{}, int) {    switch codecType {    case SEATA:        return SeataDecoder(in)    default:        log.Errorf("not support codecType, %s", codecType)        return nil, 0    }}

4.2 Client 端实现

再来看 client 端 EventListener 的实现 RpcRemotingClient

func (client *RpcRemoteClient) OnOpen(session getty.Session) error {    go func()         request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{            ApplicationId:           client.conf.ApplicationId,            TransactionServiceGroup: client.conf.TransactionServiceGroup,        }}    _// 建设连贯后向 Transaction Coordinator 发动注册 TransactionManager 的申请_        _, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)        if err == nil {      _// 将与 Transaction Coordinator 建设的连贯保留在连接池供后续应用_            clientSessionManager.RegisterGettySession(session)            client.GettySessionOnOpenChannel <- session.RemoteAddr()        }    }()    return nil}_// OnError ..._func (client *RpcRemoteClient) OnError(session getty.Session, err error) {    clientSessionManager.ReleaseGettySession(session)}_// OnClose ..._func (client *RpcRemoteClient) OnClose(session getty.Session) {    clientSessionManager.ReleaseGettySession(session)}_// OnMessage ..._func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {    log.Info("received message:{%v}", pkg)    rpcMessage, ok := pkg.(protocal.RpcMessage)    if ok {        heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)        if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong {            log.Debugf("received PONG from %s", session.RemoteAddr())        }    }    if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||        rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {        log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)              _// 处理事务音讯,提交 or 回滚_        client.onMessage(rpcMessage, session.RemoteAddr())    } else {        resp, loaded := client.futures.Load(rpcMessage.Id)        if loaded {            response := resp.(*getty2.MessageFuture)            response.Response = rpcMessage.Body            response.Done <- true            client.futures.Delete(rpcMessage.Id)        }    }}_// OnCron ..._func (client *RpcRemoteClient) OnCron(session getty.Session) {  _// 发送心跳_    client.defaultSendRequest(session, protocal.HeartBeatMessagePing)}

clientSessionManager.RegisterGettySession(session) 的逻辑将在下文中剖析。

4.3 Server 端 Transaction Coordinator 实现

代码见 DefaultCoordinator

func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {    log.Infof("got getty_session:%s", session.Stat())    return nil}func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {    _// 开释 TCP 连贯_  SessionManager.ReleaseGettySession(session)    session.Close()    log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err)}func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {    log.Info("getty_session{%s} is closing......", session.Stat())}func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {    log.Debugf("received message:{%v}", pkg)    rpcMessage, ok := pkg.(protocal.RpcMessage)    if ok {        _, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)        if isRegTM {      _// 将 TransactionManager 信息和 TCP 连贯建设映射关系_            coordinator.OnRegTmMessage(rpcMessage, session)            return        }        heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)        if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing {            coordinator.OnCheckMessage(rpcMessage, session)            return        }        if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||            rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {            log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)            _, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)            if isRegRM {        _// 将 ResourceManager 信息和 TCP 连贯建设映射关系_                coordinator.OnRegRmMessage(rpcMessage, session)            } else {                if SessionManager.IsRegistered(session) {                    defer func() {                        if err := recover(); err != nil {                            log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err)                        }                    }()          _// 处理事务音讯,全局事务注册、分支事务注册、分支事务提交、全局事务回滚等_                    coordinator.OnTrxMessage(rpcMessage, session)                } else {                    session.Close()                    log.Infof("close a unhandled connection! [%v]", session)                }            }        } else {            resp, loaded := coordinator.futures.Load(rpcMessage.Id)            if loaded {                response := resp.(*getty2.MessageFuture)                response.Response = rpcMessage.Body                response.Done <- true                coordinator.futures.Delete(rpcMessage.Id)            }        }    }}func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {}

coordinator.OnRegTmMessage(rpcMessage, session) 注册 Transaction Manager,coordinator.OnRegRmMessage(rpcMessage, session) 注册 Resource Manager。具体逻辑剖析见下文。

音讯进入 coordinator.OnTrxMessage(rpcMessage, session) 办法,将依照音讯的类型码路由到具体的逻辑当中:

 switch msg.GetTypeCode() {    case protocal.TypeGlobalBegin:        req := msg.(protocal.GlobalBeginRequest)        resp := coordinator.doGlobalBegin(req, ctx)        return resp    case protocal.TypeGlobalStatus:        req := msg.(protocal.GlobalStatusRequest)        resp := coordinator.doGlobalStatus(req, ctx)        return resp    case protocal.TypeGlobalReport:        req := msg.(protocal.GlobalReportRequest)        resp := coordinator.doGlobalReport(req, ctx)        return resp    case protocal.TypeGlobalCommit:        req := msg.(protocal.GlobalCommitRequest)        resp := coordinator.doGlobalCommit(req, ctx)        return resp    case protocal.TypeGlobalRollback:        req := msg.(protocal.GlobalRollbackRequest)        resp := coordinator.doGlobalRollback(req, ctx)        return resp    case protocal.TypeBranchRegister:        req := msg.(protocal.BranchRegisterRequest)        resp := coordinator.doBranchRegister(req, ctx)        return resp    case protocal.TypeBranchStatusReport:        req := msg.(protocal.BranchReportRequest)        resp := coordinator.doBranchReport(req, ctx)        return resp    default:        return nil    }

4.4 session manager 剖析

Client 端同 Transaction Coordinator 建设连贯起连贯后,通过 clientSessionManager.RegisterGettySession(session) 将连贯保留在 serverSessions = sync.Map{} 这个 map 中。map 的 key 为从 session 中获取的 RemoteAddress 即 Transaction Coordinator 的地址,value 为 session。这样,Client 端就能够通过 map 中的一个 session 来向 Transaction Coordinator 注册 Transaction Manager 和 Resource Manager 了。具体代码见 getty_client_session_manager.go

Transaction Manager 和 Resource Manager 注册到 Transaction Coordinator 后,一个连贯既有可能用来发送 TM 音讯也有可能用来发送 RM 音讯。咱们通过 RpcContext 来标识一个连贯信息:

type RpcContext struct {    Version                 string    TransactionServiceGroup string    ClientRole              meta.TransactionRole    ApplicationId           string    ClientId                string    ResourceSets            *model.Set    Session                 getty.Session}

当收到事务音讯时,咱们须要结构这样一个 RpcContext 供后续事务处理逻辑应用。所以,咱们会结构下列 map 来缓存映射关系:

var (    _// session -> transactionRole_    _// TM will register before RM, if a session is not the TM registered,_    _// it will be the RM registered_    session_transactionroles = sync.Map{}    _// session -> applicationId_    identified_sessions = sync.Map{}    _// applicationId -> ip -> port -> session_    client_sessions = sync.Map{}    _// applicationId -> resourceIds_    client_resources = sync.Map{})

这样,Transaction Manager 和 Resource Manager 别离通过 coordinator.OnRegTmMessage(rpcMessage, session)coordinator.OnRegRmMessage(rpcMessage, session) 注册到 Transaction Coordinator 时,会在上述 client_sessions map 中缓存 applicationId、ip、port 与 session 的关系,在 client_resources map 中缓存 applicationId 与 resourceIds(一个利用可能存在多个 Resource Manager) 的关系。在须要时,咱们就能够通过上述映射关系结构一个 RpcContext。这部分的实现和 java 版 seata 有很大的不同,感兴趣的能够深刻理解一下。具体代码见 getty_session_manager.go

至此,咱们就剖析完了 seata-golang 整个 RPC 通信模型的机制。

三、seata-golang 的将来

seata-golang  从往年 4 月份开始开发,到 8 月份根本实现和 java 版 seata 1.2 协定的互通,对 mysql 数据库实现了 AT 模式(主动协调分布式事务的提交回滚),实现了 TCC 模式,TC 端应用 mysql 存储数据,使 TC 变成一个无状态利用反对高可用部署。下图展现了 AT 模式的原理:

后续,还有许多工作能够做,比方:对注册核心的反对、对配置核心的反对、和 java 版 seata 1.4 的协定互通、其余数据库的反对、raft transaction coordinator 的实现等,心愿对分布式事务问题感兴趣的开发者能够退出进来一起来打造一个欠缺的 golang 的分布式事务框架。如果你有任何疑难,欢送退出交换群【钉钉群号 33069364】。

另外,欢送对 dubbogo 感兴趣的敌人到 dubbogo 社区钉钉群(钉钉群号 31363295)沟通 dubbogo 技术问题。

原文链接
本文为阿里云原创内容,未经容许不得转载。