本文主要研究一下canal-go的SimpleCanalConnector

SimpleCanalConnector

canal-go-v1.0.7/client/simple_canal_connector.go

type SimpleCanalConnector struct {    Address           string    Port              int    UserName          string    PassWord          string    SoTime            int32    IdleTimeOut       int32    ClientIdentity    pb.ClientIdentity    Connected         bool    Running           bool    Filter            string    RollbackOnConnect bool    LazyParseEntry    bool}
  • SimpleCanalConnector定义了Address、Port、UserName、PassWord、SoTime、IdleTimeOut、ClientIdentity、Connected、Running、Filter、RollbackOnConnect、LazyParseEntry属性

NewSimpleCanalConnector

canal-go-v1.0.7/client/simple_canal_connector.go

//NewSimpleCanalConnector 创建SimpleCanalConnector实例func NewSimpleCanalConnector(address string, port int, username string, password string, destination string, soTimeOut int32, idleTimeOut int32) *SimpleCanalConnector {    s := &SimpleCanalConnector{        Address:           address,        Port:              port,        UserName:          username,        PassWord:          password,        ClientIdentity:    pb.ClientIdentity{Destination: destination, ClientId: 1001},        SoTime:            soTimeOut,        IdleTimeOut:       idleTimeOut,        RollbackOnConnect: true,    }    return s}
  • NewSimpleCanalConnector方法创建了SimpleCanalConnector实例

Connect

canal-go-v1.0.7/client/simple_canal_connector.go

//Connect 连接Canal-serverfunc (c *SimpleCanalConnector) Connect() error {    if c.Connected {        return nil    }    if c.Running {        return nil    }    err := c.doConnect()    if err != nil {        return err    }    if c.Filter != "" {        c.Subscribe(c.Filter)    }    if c.RollbackOnConnect {        c.waitClientRunning()        c.RollBack(0)    }    c.Connected = true    return nil}
  • Connect方法主要执行c.doConnect()、c.Subscribe(c.Filter)方法,若RollbackOnConnect为true则再执行c.waitClientRunning()及c.RollBack(0)方法

DisConnection

canal-go-v1.0.7/client/simple_canal_connector.go

//DisConnection 关闭连接func (c *SimpleCanalConnector) DisConnection() {    if c.RollbackOnConnect && c.Connected == true {        c.RollBack(0)    }    c.Connected = false    quitelyClose()}//quitelyClose 优雅关闭func quitelyClose() {    if conn != nil {        conn.Close()    }}
  • DisConnection方法主要是执行conn.Close()

doConnect

canal-go-v1.0.7/client/simple_canal_connector.go

//doConnect 去连接Canal-Serverfunc (c SimpleCanalConnector) doConnect() error {    address := c.Address + ":" + fmt.Sprintf("%d", c.Port)    con, err := net.Dial("tcp", address)    if err != nil {        return err    }    conn = con    p := new(pb.Packet)    data, err := readNextPacket()    if err != nil {        return err    }    err = proto.Unmarshal(data, p)    if err != nil {        return err    }    if p != nil {        if p.GetVersion() != 1 {            panic("unsupported version at this client.")        }        if p.GetType() != pb.PacketType_HANDSHAKE {            panic("expect handshake but found other type.")        }        handshake := &pb.Handshake{}        err = proto.Unmarshal(p.GetBody(), handshake)        if err != nil {            return err        }        pas := []byte(c.PassWord)        ca := &pb.ClientAuth{            Username:               c.UserName,            Password:               pas,            NetReadTimeoutPresent:  &pb.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},            NetWriteTimeoutPresent: &pb.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},        }        caByteArray, _ := proto.Marshal(ca)        packet := &pb.Packet{            Type: pb.PacketType_CLIENTAUTHENTICATION,            Body: caByteArray,        }        packArray, _ := proto.Marshal(packet)        WriteWithHeader(packArray)        pp, err := readNextPacket()        if err != nil {            return err        }        pk := &pb.Packet{}        err = proto.Unmarshal(pp, pk)        if err != nil {            return err        }        if pk.Type != pb.PacketType_ACK {            panic("unexpected packet type when ack is expected")        }        ackBody := &pb.Ack{}        err = proto.Unmarshal(pk.GetBody(), ackBody)        if err != nil {            return err        }        if ackBody.GetErrorCode() > 0 {            panic(errors.New(fmt.Sprintf("something goes wrong when doing authentication:%s", ackBody.GetErrorMessage())))        }        c.Connected = true    }    return nil}
  • doConnect方法通过net.Dial("tcp", address)建立连接,然后通过readNextPacket读取data,然后通过proto.Unmarshal(data, p)来解析,之后发送PacketType_CLIENTAUTHENTICATION数据进行鉴权,若ack成功则设置c.Connected为true

GetWithOutAck

canal-go-v1.0.7/client/simple_canal_connector.go

//GetWithOutAck 获取数据不Ackfunc (c *SimpleCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) {    c.waitClientRunning()    if !c.Running {        return nil, nil    }    var size int32    if batchSize < 0 {        size = 1000    } else {        size = batchSize    }    var time *int64    var t int64    t = -1    if timeOut == nil {        time = &t    } else {        time = timeOut    }    var i int32    i = -1    if units == nil {        units = &i    }    get := new(pb.Get)    get.AutoAckPresent = &pb.Get_AutoAck{AutoAck: false}    get.Destination = c.ClientIdentity.Destination    get.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)    get.FetchSize = size    get.TimeoutPresent = &pb.Get_Timeout{Timeout: *time}    get.UnitPresent = &pb.Get_Unit{Unit: *units}    getBody, err := proto.Marshal(get)    if err != nil {        return nil, err    }    packet := new(pb.Packet)    packet.Type = pb.PacketType_GET    packet.Body = getBody    pa, err := proto.Marshal(packet)    if err != nil {        return nil, err    }    WriteWithHeader(pa)    message, err := c.receiveMessages()    if err != nil {        return nil, err    }    return message, nil}
  • GetWithOutAck方法主要是执行WriteWithHeader(pa)以及c.receiveMessages()

Get

canal-go-v1.0.7/client/simple_canal_connector.go

//Get 获取数据并且Ack数据func (c *SimpleCanalConnector) Get(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) {    message, err := c.GetWithOutAck(batchSize, timeOut, units)    if err != nil {        return nil, err    }    err = c.Ack(message.Id)    if err != nil {        return nil, err    }    return message, nil}
  • Get方法先执行c.GetWithOutAck(batchSize, timeOut, units),再执行c.Ack(message.Id)

Ack

canal-go-v1.0.7/client/simple_canal_connector.go

//Ack Ack Canal-server的数据(就是昨晚某些逻辑操作后删除canal-server端的数据)func (c *SimpleCanalConnector) Ack(batchId int64) error {    c.waitClientRunning()    if !c.Running {        return nil    }    ca := new(pb.ClientAck)    ca.Destination = c.ClientIdentity.Destination    ca.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)    ca.BatchId = batchId    clientAck, err := proto.Marshal(ca)    if err != nil {        return err    }    pa := new(pb.Packet)    pa.Type = pb.PacketType_CLIENTACK    pa.Body = clientAck    pack, err := proto.Marshal(pa)    if err != nil {        return err    }    WriteWithHeader(pack)    return nil}
  • Ack方法主要是发送pb.PacketType_CLIENTACK

Subscribe

canal-go-v1.0.7/client/simple_canal_connector.go

//Subscribe 订阅func (c *SimpleCanalConnector) Subscribe(filter string) error {    c.waitClientRunning()    if !c.Running {        return nil    }    body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})    pack := new(pb.Packet)    pack.Type = pb.PacketType_SUBSCRIPTION    pack.Body = body    packet, _ := proto.Marshal(pack)    WriteWithHeader(packet)    p := new(pb.Packet)    paBytes, err := readNextPacket()    if err != nil {        return err    }    err = proto.Unmarshal(paBytes, p)    if err != nil {        return err    }    ack := new(pb.Ack)    err = proto.Unmarshal(p.Body, ack)    if err != nil {        return err    }    if ack.GetErrorCode() > 0 {        return fmt.Errorf("failed to subscribe with reason::%s", ack.GetErrorMessage())    }    c.Filter = filter    return nil}
  • Subscribe方法主要是发送pb.PacketType_SUBSCRIPTION

UnSubscribe

canal-go-v1.0.7/client/simple_canal_connector.go

//UnSubscribe 取消订阅func (c *SimpleCanalConnector) UnSubscribe() error {    c.waitClientRunning()    if c.Running {        return nil    }    us := new(pb.Unsub)    us.Destination = c.ClientIdentity.Destination    us.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)    unSub, err := proto.Marshal(us)    if err != nil {        return err    }    pa := new(pb.Packet)    pa.Type = pb.PacketType_UNSUBSCRIPTION    pa.Body = unSub    pack, err := proto.Marshal(pa)    WriteWithHeader(pack)    p, err := readNextPacket()    if err != nil {        return err    }    pa = nil    err = proto.Unmarshal(p, pa)    if err != nil {        return err    }    ack := new(pb.Ack)    err = proto.Unmarshal(pa.Body, ack)    if err != nil {        return err    }    if ack.GetErrorCode() > 0 {        panic(errors.New(fmt.Sprintf("failed to unSubscribe with reason:%s", ack.GetErrorMessage())))    }    return nil}
  • UnSubscribe方法主要是发送pb.PacketType_UNSUBSCRIPTION

RollBack

canal-go-v1.0.7/client/simple_canal_connector.go

//RollBack 回滚操作func (c *SimpleCanalConnector) RollBack(batchId int64) error {    c.waitClientRunning()    cb := new(pb.ClientRollback)    cb.Destination = c.ClientIdentity.Destination    cb.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)    cb.BatchId = batchId    clientBollBack, err := proto.Marshal(cb)    if err != nil {        return err    }    pa := new(pb.Packet)    pa.Type = pb.PacketType_CLIENTROLLBACK    pa.Body = clientBollBack    pack, err := proto.Marshal(pa)    if err != nil {        return err    }    WriteWithHeader(pack)    return nil}
  • RollBack方法主要是发送pb.PacketType_CLIENTROLLBACK

小结

SimpleCanalConnector定义了Address、Port、UserName、PassWord、SoTime、IdleTimeOut、ClientIdentity、Connected、Running、Filter、RollbackOnConnect、LazyParseEntry属性;它提供了Connect、DisConnection、GetWithOutAck、Get、Ack、Subscribe、UnSubscribe、RollBack方法

doc

  • simple_canal_connector