乐趣区

聊聊canalgo的SimpleCanalConnector

本文主要研究一下 canal-go 的 SimpleCanalConnector

SimpleCanalConnector

canal-go-v1.0.7/client/simple_canal_connector.go

type SimpleCanalConnector struct {
    Address           string
    Port              int
    UserName          string
    PassWord          string
    SoTime            int32
    IdleTimeOut       int32
    ClientIdentity    pb.ClientIdentity
    Connected         bool
    Running           bool
    Filter            string
    RollbackOnConnect bool
    LazyParseEntry    bool
}
  • SimpleCanalConnector 定义了 Address、Port、UserName、PassWord、SoTime、IdleTimeOut、ClientIdentity、Connected、Running、Filter、RollbackOnConnect、LazyParseEntry 属性

NewSimpleCanalConnector

canal-go-v1.0.7/client/simple_canal_connector.go

//NewSimpleCanalConnector 创建 SimpleCanalConnector 实例
func NewSimpleCanalConnector(address string, port int, username string, password string, destination string, soTimeOut int32, idleTimeOut int32) *SimpleCanalConnector {
    s := &SimpleCanalConnector{
        Address:           address,
        Port:              port,
        UserName:          username,
        PassWord:          password,
        ClientIdentity:    pb.ClientIdentity{Destination: destination, ClientId: 1001},
        SoTime:            soTimeOut,
        IdleTimeOut:       idleTimeOut,
        RollbackOnConnect: true,
    }
    return s

}
  • NewSimpleCanalConnector 方法创建了 SimpleCanalConnector 实例

Connect

canal-go-v1.0.7/client/simple_canal_connector.go

//Connect 连接 Canal-server
func (c *SimpleCanalConnector) Connect() error {
    if c.Connected {return nil}

    if c.Running {return nil}

    err := c.doConnect()
    if err != nil {return err}
    if c.Filter != "" {c.Subscribe(c.Filter)
    }

    if c.RollbackOnConnect {c.waitClientRunning()

        c.RollBack(0)
    }

    c.Connected = true
    return nil

}
  • Connect 方法主要执行 c.doConnect()、c.Subscribe(c.Filter) 方法,若 RollbackOnConnect 为 true 则再执行 c.waitClientRunning() 及 c.RollBack(0) 方法

DisConnection

canal-go-v1.0.7/client/simple_canal_connector.go

//DisConnection 关闭连接
func (c *SimpleCanalConnector) DisConnection() {
    if c.RollbackOnConnect && c.Connected == true {c.RollBack(0)
    }
    c.Connected = false
    quitelyClose()}

//quitelyClose 优雅关闭
func quitelyClose() {
    if conn != nil {conn.Close()
    }
}
  • DisConnection 方法主要是执行 conn.Close()

doConnect

canal-go-v1.0.7/client/simple_canal_connector.go

//doConnect 去连接 Canal-Server
func (c SimpleCanalConnector) doConnect() error {address := c.Address + ":" + fmt.Sprintf("%d", c.Port)
    con, err := net.Dial("tcp", address)
    if err != nil {return err}
    conn = con

    p := new(pb.Packet)
    data, err := readNextPacket()
    if err != nil {return err}
    err = proto.Unmarshal(data, p)
    if err != nil {return err}
    if p != nil {if p.GetVersion() != 1 {panic("unsupported version at this client.")
        }

        if p.GetType() != pb.PacketType_HANDSHAKE {panic("expect handshake but found other type.")
        }

        handshake := &pb.Handshake{}
        err = proto.Unmarshal(p.GetBody(), handshake)
        if err != nil {return err}
        pas := []byte(c.PassWord)
        ca := &pb.ClientAuth{
            Username:               c.UserName,
            Password:               pas,
            NetReadTimeoutPresent:  &pb.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},
            NetWriteTimeoutPresent: &pb.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},
        }
        caByteArray, _ := proto.Marshal(ca)
        packet := &pb.Packet{
            Type: pb.PacketType_CLIENTAUTHENTICATION,
            Body: caByteArray,
        }

        packArray, _ := proto.Marshal(packet)

        WriteWithHeader(packArray)

        pp, err := readNextPacket()
        if err != nil {return err}
        pk := &pb.Packet{}

        err = proto.Unmarshal(pp, pk)
        if err != nil {return err}

        if pk.Type != pb.PacketType_ACK {panic("unexpected packet type when ack is expected")
        }

        ackBody := &pb.Ack{}
        err = proto.Unmarshal(pk.GetBody(), ackBody)
        if err != nil {return err}
        if ackBody.GetErrorCode() > 0 {panic(errors.New(fmt.Sprintf("something goes wrong when doing authentication:%s", ackBody.GetErrorMessage())))
        }

        c.Connected = true

    }
    return nil

}
  • doConnect 方法通过 net.Dial(“tcp”, address) 建立连接,然后通过 readNextPacket 读取 data,然后通过 proto.Unmarshal(data, p) 来解析,之后发送 PacketType_CLIENTAUTHENTICATION 数据进行鉴权,若 ack 成功则设置 c.Connected 为 true

GetWithOutAck

canal-go-v1.0.7/client/simple_canal_connector.go

//GetWithOutAck 获取数据不 Ack
func (c *SimpleCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) {c.waitClientRunning()
    if !c.Running {return nil, nil}
    var size int32

    if batchSize < 0 {size = 1000} else {size = batchSize}
    var time *int64
    var t int64
    t = -1
    if timeOut == nil {time = &t} else {time = timeOut}
    var i int32
    i = -1
    if units == nil {units = &i}
    get := new(pb.Get)
    get.AutoAckPresent = &pb.Get_AutoAck{AutoAck: false}
    get.Destination = c.ClientIdentity.Destination
    get.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
    get.FetchSize = size
    get.TimeoutPresent = &pb.Get_Timeout{Timeout: *time}
    get.UnitPresent = &pb.Get_Unit{Unit: *units}

    getBody, err := proto.Marshal(get)
    if err != nil {return nil, err}
    packet := new(pb.Packet)
    packet.Type = pb.PacketType_GET
    packet.Body = getBody
    pa, err := proto.Marshal(packet)
    if err != nil {return nil, err}
    WriteWithHeader(pa)
    message, err := c.receiveMessages()
    if err != nil {return nil, err}
    return message, nil
}
  • GetWithOutAck 方法主要是执行 WriteWithHeader(pa) 以及 c.receiveMessages()

Get

canal-go-v1.0.7/client/simple_canal_connector.go

//Get 获取数据并且 Ack 数据
func (c *SimpleCanalConnector) Get(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) {message, err := c.GetWithOutAck(batchSize, timeOut, units)
    if err != nil {return nil, err}
    err = c.Ack(message.Id)
    if err != nil {return nil, err}
    return message, nil
}
  • Get 方法先执行 c.GetWithOutAck(batchSize, timeOut, units),再执行 c.Ack(message.Id)

Ack

canal-go-v1.0.7/client/simple_canal_connector.go

//Ack Ack Canal-server 的数据(就是昨晚某些逻辑操作后删除 canal-server 端的数据)func (c *SimpleCanalConnector) Ack(batchId int64) error {c.waitClientRunning()
    if !c.Running {return nil}

    ca := new(pb.ClientAck)
    ca.Destination = c.ClientIdentity.Destination
    ca.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
    ca.BatchId = batchId

    clientAck, err := proto.Marshal(ca)
    if err != nil {return err}
    pa := new(pb.Packet)
    pa.Type = pb.PacketType_CLIENTACK
    pa.Body = clientAck
    pack, err := proto.Marshal(pa)
    if err != nil {return err}
    WriteWithHeader(pack)
    return nil

}
  • Ack 方法主要是发送 pb.PacketType_CLIENTACK

Subscribe

canal-go-v1.0.7/client/simple_canal_connector.go

//Subscribe 订阅
func (c *SimpleCanalConnector) Subscribe(filter string) error {c.waitClientRunning()
    if !c.Running {return nil}
    body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
    pack := new(pb.Packet)
    pack.Type = pb.PacketType_SUBSCRIPTION
    pack.Body = body

    packet, _ := proto.Marshal(pack)
    WriteWithHeader(packet)

    p := new(pb.Packet)

    paBytes, err := readNextPacket()
    if err != nil {return err}
    err = proto.Unmarshal(paBytes, p)
    if err != nil {return err}
    ack := new(pb.Ack)
    err = proto.Unmarshal(p.Body, ack)
    if err != nil {return err}

    if ack.GetErrorCode() > 0 {return fmt.Errorf("failed to subscribe with reason::%s", ack.GetErrorMessage())
    }

    c.Filter = filter

    return nil
}
  • Subscribe 方法主要是发送 pb.PacketType_SUBSCRIPTION

UnSubscribe

canal-go-v1.0.7/client/simple_canal_connector.go

//UnSubscribe 取消订阅
func (c *SimpleCanalConnector) UnSubscribe() error {c.waitClientRunning()
    if c.Running {return nil}

    us := new(pb.Unsub)
    us.Destination = c.ClientIdentity.Destination
    us.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)

    unSub, err := proto.Marshal(us)
    if err != nil {return err}

    pa := new(pb.Packet)
    pa.Type = pb.PacketType_UNSUBSCRIPTION
    pa.Body = unSub

    pack, err := proto.Marshal(pa)
    WriteWithHeader(pack)

    p, err := readNextPacket()
    if err != nil {return err}
    pa = nil
    err = proto.Unmarshal(p, pa)
    if err != nil {return err}
    ack := new(pb.Ack)
    err = proto.Unmarshal(pa.Body, ack)
    if err != nil {return err}
    if ack.GetErrorCode() > 0 {panic(errors.New(fmt.Sprintf("failed to unSubscribe with reason:%s", ack.GetErrorMessage())))
    }
    return nil
}
  • UnSubscribe 方法主要是发送 pb.PacketType_UNSUBSCRIPTION

RollBack

canal-go-v1.0.7/client/simple_canal_connector.go

//RollBack 回滚操作
func (c *SimpleCanalConnector) RollBack(batchId int64) error {c.waitClientRunning()
    cb := new(pb.ClientRollback)
    cb.Destination = c.ClientIdentity.Destination
    cb.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
    cb.BatchId = batchId

    clientBollBack, err := proto.Marshal(cb)
    if err != nil {return err}

    pa := new(pb.Packet)
    pa.Type = pb.PacketType_CLIENTROLLBACK
    pa.Body = clientBollBack
    pack, err := proto.Marshal(pa)
    if err != nil {return err}
    WriteWithHeader(pack)
    return nil
}
  • RollBack 方法主要是发送 pb.PacketType_CLIENTROLLBACK

小结

SimpleCanalConnector 定义了 Address、Port、UserName、PassWord、SoTime、IdleTimeOut、ClientIdentity、Connected、Running、Filter、RollbackOnConnect、LazyParseEntry 属性;它提供了 Connect、DisConnection、GetWithOutAck、Get、Ack、Subscribe、UnSubscribe、RollBack 方法

doc

  • simple_canal_connector
退出移动版