序
本文主要研究一下 canal-go 的 SimpleCanalConnector
SimpleCanalConnector
canal-go-v1.0.7/client/simple_canal_connector.go
type SimpleCanalConnector struct {
Address string
Port int
UserName string
PassWord string
SoTime int32
IdleTimeOut int32
ClientIdentity pb.ClientIdentity
Connected bool
Running bool
Filter string
RollbackOnConnect bool
LazyParseEntry bool
}
- SimpleCanalConnector 定义了 Address、Port、UserName、PassWord、SoTime、IdleTimeOut、ClientIdentity、Connected、Running、Filter、RollbackOnConnect、LazyParseEntry 属性
NewSimpleCanalConnector
canal-go-v1.0.7/client/simple_canal_connector.go
//NewSimpleCanalConnector 创建 SimpleCanalConnector 实例
func NewSimpleCanalConnector(address string, port int, username string, password string, destination string, soTimeOut int32, idleTimeOut int32) *SimpleCanalConnector {
s := &SimpleCanalConnector{
Address: address,
Port: port,
UserName: username,
PassWord: password,
ClientIdentity: pb.ClientIdentity{Destination: destination, ClientId: 1001},
SoTime: soTimeOut,
IdleTimeOut: idleTimeOut,
RollbackOnConnect: true,
}
return s
}
- NewSimpleCanalConnector 方法创建了 SimpleCanalConnector 实例
Connect
canal-go-v1.0.7/client/simple_canal_connector.go
//Connect 连接 Canal-server
func (c *SimpleCanalConnector) Connect() error {
if c.Connected {return nil}
if c.Running {return nil}
err := c.doConnect()
if err != nil {return err}
if c.Filter != "" {c.Subscribe(c.Filter)
}
if c.RollbackOnConnect {c.waitClientRunning()
c.RollBack(0)
}
c.Connected = true
return nil
}
- Connect 方法主要执行 c.doConnect()、c.Subscribe(c.Filter) 方法,若 RollbackOnConnect 为 true 则再执行 c.waitClientRunning() 及 c.RollBack(0) 方法
DisConnection
canal-go-v1.0.7/client/simple_canal_connector.go
//DisConnection 关闭连接
func (c *SimpleCanalConnector) DisConnection() {
if c.RollbackOnConnect && c.Connected == true {c.RollBack(0)
}
c.Connected = false
quitelyClose()}
//quitelyClose 优雅关闭
func quitelyClose() {
if conn != nil {conn.Close()
}
}
- DisConnection 方法主要是执行 conn.Close()
doConnect
canal-go-v1.0.7/client/simple_canal_connector.go
//doConnect 去连接 Canal-Server
func (c SimpleCanalConnector) doConnect() error {address := c.Address + ":" + fmt.Sprintf("%d", c.Port)
con, err := net.Dial("tcp", address)
if err != nil {return err}
conn = con
p := new(pb.Packet)
data, err := readNextPacket()
if err != nil {return err}
err = proto.Unmarshal(data, p)
if err != nil {return err}
if p != nil {if p.GetVersion() != 1 {panic("unsupported version at this client.")
}
if p.GetType() != pb.PacketType_HANDSHAKE {panic("expect handshake but found other type.")
}
handshake := &pb.Handshake{}
err = proto.Unmarshal(p.GetBody(), handshake)
if err != nil {return err}
pas := []byte(c.PassWord)
ca := &pb.ClientAuth{
Username: c.UserName,
Password: pas,
NetReadTimeoutPresent: &pb.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},
NetWriteTimeoutPresent: &pb.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},
}
caByteArray, _ := proto.Marshal(ca)
packet := &pb.Packet{
Type: pb.PacketType_CLIENTAUTHENTICATION,
Body: caByteArray,
}
packArray, _ := proto.Marshal(packet)
WriteWithHeader(packArray)
pp, err := readNextPacket()
if err != nil {return err}
pk := &pb.Packet{}
err = proto.Unmarshal(pp, pk)
if err != nil {return err}
if pk.Type != pb.PacketType_ACK {panic("unexpected packet type when ack is expected")
}
ackBody := &pb.Ack{}
err = proto.Unmarshal(pk.GetBody(), ackBody)
if err != nil {return err}
if ackBody.GetErrorCode() > 0 {panic(errors.New(fmt.Sprintf("something goes wrong when doing authentication:%s", ackBody.GetErrorMessage())))
}
c.Connected = true
}
return nil
}
- doConnect 方法通过 net.Dial(“tcp”, address) 建立连接,然后通过 readNextPacket 读取 data,然后通过 proto.Unmarshal(data, p) 来解析,之后发送 PacketType_CLIENTAUTHENTICATION 数据进行鉴权,若 ack 成功则设置 c.Connected 为 true
GetWithOutAck
canal-go-v1.0.7/client/simple_canal_connector.go
//GetWithOutAck 获取数据不 Ack
func (c *SimpleCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) {c.waitClientRunning()
if !c.Running {return nil, nil}
var size int32
if batchSize < 0 {size = 1000} else {size = batchSize}
var time *int64
var t int64
t = -1
if timeOut == nil {time = &t} else {time = timeOut}
var i int32
i = -1
if units == nil {units = &i}
get := new(pb.Get)
get.AutoAckPresent = &pb.Get_AutoAck{AutoAck: false}
get.Destination = c.ClientIdentity.Destination
get.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
get.FetchSize = size
get.TimeoutPresent = &pb.Get_Timeout{Timeout: *time}
get.UnitPresent = &pb.Get_Unit{Unit: *units}
getBody, err := proto.Marshal(get)
if err != nil {return nil, err}
packet := new(pb.Packet)
packet.Type = pb.PacketType_GET
packet.Body = getBody
pa, err := proto.Marshal(packet)
if err != nil {return nil, err}
WriteWithHeader(pa)
message, err := c.receiveMessages()
if err != nil {return nil, err}
return message, nil
}
- GetWithOutAck 方法主要是执行 WriteWithHeader(pa) 以及 c.receiveMessages()
Get
canal-go-v1.0.7/client/simple_canal_connector.go
//Get 获取数据并且 Ack 数据
func (c *SimpleCanalConnector) Get(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) {message, err := c.GetWithOutAck(batchSize, timeOut, units)
if err != nil {return nil, err}
err = c.Ack(message.Id)
if err != nil {return nil, err}
return message, nil
}
- Get 方法先执行 c.GetWithOutAck(batchSize, timeOut, units),再执行 c.Ack(message.Id)
Ack
canal-go-v1.0.7/client/simple_canal_connector.go
//Ack Ack Canal-server 的数据(就是昨晚某些逻辑操作后删除 canal-server 端的数据)func (c *SimpleCanalConnector) Ack(batchId int64) error {c.waitClientRunning()
if !c.Running {return nil}
ca := new(pb.ClientAck)
ca.Destination = c.ClientIdentity.Destination
ca.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
ca.BatchId = batchId
clientAck, err := proto.Marshal(ca)
if err != nil {return err}
pa := new(pb.Packet)
pa.Type = pb.PacketType_CLIENTACK
pa.Body = clientAck
pack, err := proto.Marshal(pa)
if err != nil {return err}
WriteWithHeader(pack)
return nil
}
- Ack 方法主要是发送 pb.PacketType_CLIENTACK
Subscribe
canal-go-v1.0.7/client/simple_canal_connector.go
//Subscribe 订阅
func (c *SimpleCanalConnector) Subscribe(filter string) error {c.waitClientRunning()
if !c.Running {return nil}
body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
pack := new(pb.Packet)
pack.Type = pb.PacketType_SUBSCRIPTION
pack.Body = body
packet, _ := proto.Marshal(pack)
WriteWithHeader(packet)
p := new(pb.Packet)
paBytes, err := readNextPacket()
if err != nil {return err}
err = proto.Unmarshal(paBytes, p)
if err != nil {return err}
ack := new(pb.Ack)
err = proto.Unmarshal(p.Body, ack)
if err != nil {return err}
if ack.GetErrorCode() > 0 {return fmt.Errorf("failed to subscribe with reason::%s", ack.GetErrorMessage())
}
c.Filter = filter
return nil
}
- Subscribe 方法主要是发送 pb.PacketType_SUBSCRIPTION
UnSubscribe
canal-go-v1.0.7/client/simple_canal_connector.go
//UnSubscribe 取消订阅
func (c *SimpleCanalConnector) UnSubscribe() error {c.waitClientRunning()
if c.Running {return nil}
us := new(pb.Unsub)
us.Destination = c.ClientIdentity.Destination
us.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
unSub, err := proto.Marshal(us)
if err != nil {return err}
pa := new(pb.Packet)
pa.Type = pb.PacketType_UNSUBSCRIPTION
pa.Body = unSub
pack, err := proto.Marshal(pa)
WriteWithHeader(pack)
p, err := readNextPacket()
if err != nil {return err}
pa = nil
err = proto.Unmarshal(p, pa)
if err != nil {return err}
ack := new(pb.Ack)
err = proto.Unmarshal(pa.Body, ack)
if err != nil {return err}
if ack.GetErrorCode() > 0 {panic(errors.New(fmt.Sprintf("failed to unSubscribe with reason:%s", ack.GetErrorMessage())))
}
return nil
}
- UnSubscribe 方法主要是发送 pb.PacketType_UNSUBSCRIPTION
RollBack
canal-go-v1.0.7/client/simple_canal_connector.go
//RollBack 回滚操作
func (c *SimpleCanalConnector) RollBack(batchId int64) error {c.waitClientRunning()
cb := new(pb.ClientRollback)
cb.Destination = c.ClientIdentity.Destination
cb.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
cb.BatchId = batchId
clientBollBack, err := proto.Marshal(cb)
if err != nil {return err}
pa := new(pb.Packet)
pa.Type = pb.PacketType_CLIENTROLLBACK
pa.Body = clientBollBack
pack, err := proto.Marshal(pa)
if err != nil {return err}
WriteWithHeader(pack)
return nil
}
- RollBack 方法主要是发送 pb.PacketType_CLIENTROLLBACK
小结
SimpleCanalConnector 定义了 Address、Port、UserName、PassWord、SoTime、IdleTimeOut、ClientIdentity、Connected、Running、Filter、RollbackOnConnect、LazyParseEntry 属性;它提供了 Connect、DisConnection、GetWithOutAck、Get、Ack、Subscribe、UnSubscribe、RollBack 方法
doc
- simple_canal_connector