本文主要研究一下SpinalTap的BinlogEvent

BinlogEvent

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/BinlogEvent.java

@Getter@ToStringpublic abstract class BinlogEvent extends SourceEvent {  private final long tableId;  private final long serverId;  private final BinlogFilePos binlogFilePos;  public BinlogEvent(long tableId, long serverId, long timestamp, BinlogFilePos binlogFilePos) {    super(timestamp);    this.tableId = tableId;    this.serverId = serverId;    this.binlogFilePos = binlogFilePos;  }  public long getOffset() {    return (binlogFilePos.getFileNumber() << 32) | binlogFilePos.getPosition();  }  public boolean isMutation() {    return this instanceof WriteEvent || this instanceof DeleteEvent || this instanceof UpdateEvent;  }}
  • BinlogEvent继承了SourceEvent,它定义了tableId、serverId、binlogFilePos属性

StartEvent

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/StartEvent.java

public class StartEvent extends BinlogEvent {  public StartEvent(long serverId, long timestamp, BinlogFilePos filePos) {    super(0L, serverId, timestamp, filePos);  }}
  • StartEvent继承了BinlogEvent,其构造器需要serverId、timestamp、filePos参数

TableMapEvent

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/TableMapEvent.java

@Getterpublic final class TableMapEvent extends BinlogEvent {  private final String database;  private final String table;  private final List<ColumnDataType> columnTypes;  public TableMapEvent(      long tableId,      long serverId,      long timestamp,      BinlogFilePos filePos,      String database,      String table,      byte[] columnTypeCodes) {    super(tableId, serverId, timestamp, filePos);    this.database = database;    this.table = table;    this.columnTypes = new ArrayList<>();    for (byte code : columnTypeCodes) {      columnTypes.add(ColumnDataType.byCode(code));    }  }}
  • TableMapEvent继承了BinlogEvent,它定义了database、table、columnTypes属性

WriteEvent

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/WriteEvent.java

@Getterpublic final class WriteEvent extends BinlogEvent {  private final List<Serializable[]> rows;  public WriteEvent(      long tableId,      long serverId,      long timestamp,      BinlogFilePos filePos,      List<Serializable[]> rows) {    super(tableId, serverId, timestamp, filePos);    this.rows = rows;  }  @Override  public int size() {    return rows.size();  }}
  • WriteEvent继承了BinlogEvent,它定义了类型为List<Serializable[]>的rows属性

DeleteEvent

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/DeleteEvent.java

@Getterpublic final class DeleteEvent extends BinlogEvent {  private final List<Serializable[]> rows;  public DeleteEvent(      long tableId,      long serverId,      long timestamp,      BinlogFilePos filePos,      List<Serializable[]> rows) {    super(tableId, serverId, timestamp, filePos);    this.rows = rows;  }  @Override  public int size() {    return rows.size();  }}
  • DeleteEvent继承了BinlogEvent,它定义了类型为List<Serializable[]>的rows属性

UpdateEvent

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/UpdateEvent.java

@Getterpublic class UpdateEvent extends BinlogEvent {  private final List<Map.Entry<Serializable[], Serializable[]>> rows;  public UpdateEvent(      long tableId,      long serverId,      long timestamp,      BinlogFilePos filePos,      List<Map.Entry<Serializable[], Serializable[]>> rows) {    super(tableId, serverId, timestamp, filePos);    this.rows = rows;  }  @Override  public int size() {    return rows.size();  }}
  • UpdateEvent继承了BinlogEvent,它定义了类型为List<Map.Entry<Serializable[], Serializable[]>>的rows属性

QueryEvent

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/QueryEvent.java

@Getterpublic class QueryEvent extends BinlogEvent {  private final String database;  private final String sql;  public QueryEvent(      long serverId, long timestamp, BinlogFilePos filePos, String database, String sql) {    super(0l, serverId, timestamp, filePos);    this.database = database;    this.sql = sql;  }}
  • QueryEvent继承了BinlogEvent,它定义了database、sql属性

XidEvent

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/XidEvent.java

@Getterpublic class XidEvent extends BinlogEvent {  private final long xid;  public XidEvent(long serverId, long timestamp, BinlogFilePos filePos, long xid) {    super(0l, serverId, timestamp, filePos);    this.xid = xid;  }}
  • XidEvent继承了BinlogEvent,它定义了xid属性

GTIDEvent

SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/GTIDEvent.java

@Getterpublic class GTIDEvent extends BinlogEvent {  private final String gtid;  public GTIDEvent(long serverId, long timestamp, BinlogFilePos filePos, String gtid) {    super(0, serverId, timestamp, filePos);    this.gtid = gtid;  }}
  • GTIDEvent继承了BinlogEvent,它定义了gtid属性

小结

BinlogEvent继承了SourceEvent,它定义了tableId、serverId、binlogFilePos属性;其子类主要有StartEvent、TableMapEvent、WriteEvent、DeleteEvent、UpdateEvent、QueryEvent、XidEvent、GTIDEvent

doc

  • BinlogEvent