共计 4330 个字符,预计需要花费 11 分钟才能阅读完成。
序
本文主要研究一下 SpinalTap 的 BinlogEvent
BinlogEvent
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/BinlogEvent.java
@Getter
@ToString
public abstract class BinlogEvent extends SourceEvent {
private final long tableId;
private final long serverId;
private final BinlogFilePos binlogFilePos;
public BinlogEvent(long tableId, long serverId, long timestamp, BinlogFilePos binlogFilePos) {super(timestamp);
this.tableId = tableId;
this.serverId = serverId;
this.binlogFilePos = binlogFilePos;
}
public long getOffset() {return (binlogFilePos.getFileNumber() << 32) | binlogFilePos.getPosition();}
public boolean isMutation() {return this instanceof WriteEvent || this instanceof DeleteEvent || this instanceof UpdateEvent;}
}
- BinlogEvent 继承了 SourceEvent,它定义了 tableId、serverId、binlogFilePos 属性
StartEvent
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/StartEvent.java
public class StartEvent extends BinlogEvent {public StartEvent(long serverId, long timestamp, BinlogFilePos filePos) {super(0L, serverId, timestamp, filePos);
}
}
- StartEvent 继承了 BinlogEvent,其构造器需要 serverId、timestamp、filePos 参数
TableMapEvent
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/TableMapEvent.java
@Getter
public final class TableMapEvent extends BinlogEvent {
private final String database;
private final String table;
private final List<ColumnDataType> columnTypes;
public TableMapEvent(
long tableId,
long serverId,
long timestamp,
BinlogFilePos filePos,
String database,
String table,
byte[] columnTypeCodes) {super(tableId, serverId, timestamp, filePos);
this.database = database;
this.table = table;
this.columnTypes = new ArrayList<>();
for (byte code : columnTypeCodes) {columnTypes.add(ColumnDataType.byCode(code));
}
}
}
- TableMapEvent 继承了 BinlogEvent,它定义了 database、table、columnTypes 属性
WriteEvent
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/WriteEvent.java
@Getter
public final class WriteEvent extends BinlogEvent {private final List<Serializable[]> rows;
public WriteEvent(
long tableId,
long serverId,
long timestamp,
BinlogFilePos filePos,
List<Serializable[]> rows) {super(tableId, serverId, timestamp, filePos);
this.rows = rows;
}
@Override
public int size() {return rows.size();
}
}
- WriteEvent 继承了 BinlogEvent,它定义了类型为
List<Serializable[]>
的 rows 属性
DeleteEvent
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/DeleteEvent.java
@Getter
public final class DeleteEvent extends BinlogEvent {private final List<Serializable[]> rows;
public DeleteEvent(
long tableId,
long serverId,
long timestamp,
BinlogFilePos filePos,
List<Serializable[]> rows) {super(tableId, serverId, timestamp, filePos);
this.rows = rows;
}
@Override
public int size() {return rows.size();
}
}
- DeleteEvent 继承了 BinlogEvent,它定义了类型为
List<Serializable[]>
的 rows 属性
UpdateEvent
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/UpdateEvent.java
@Getter
public class UpdateEvent extends BinlogEvent {private final List<Map.Entry<Serializable[], Serializable[]>> rows;
public UpdateEvent(
long tableId,
long serverId,
long timestamp,
BinlogFilePos filePos,
List<Map.Entry<Serializable[], Serializable[]>> rows) {super(tableId, serverId, timestamp, filePos);
this.rows = rows;
}
@Override
public int size() {return rows.size();
}
}
- UpdateEvent 继承了 BinlogEvent,它定义了类型为
List<Map.Entry<Serializable[], Serializable[]>>
的 rows 属性
QueryEvent
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/QueryEvent.java
@Getter
public class QueryEvent extends BinlogEvent {
private final String database;
private final String sql;
public QueryEvent(long serverId, long timestamp, BinlogFilePos filePos, String database, String sql) {super(0l, serverId, timestamp, filePos);
this.database = database;
this.sql = sql;
}
}
- QueryEvent 继承了 BinlogEvent,它定义了 database、sql 属性
XidEvent
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/XidEvent.java
@Getter
public class XidEvent extends BinlogEvent {
private final long xid;
public XidEvent(long serverId, long timestamp, BinlogFilePos filePos, long xid) {super(0l, serverId, timestamp, filePos);
this.xid = xid;
}
}
- XidEvent 继承了 BinlogEvent,它定义了 xid 属性
GTIDEvent
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/GTIDEvent.java
@Getter
public class GTIDEvent extends BinlogEvent {
private final String gtid;
public GTIDEvent(long serverId, long timestamp, BinlogFilePos filePos, String gtid) {super(0, serverId, timestamp, filePos);
this.gtid = gtid;
}
}
- GTIDEvent 继承了 BinlogEvent,它定义了 gtid 属性
小结
BinlogEvent 继承了 SourceEvent,它定义了 tableId、serverId、binlogFilePos 属性;其子类主要有 StartEvent、TableMapEvent、WriteEvent、DeleteEvent、UpdateEvent、QueryEvent、XidEvent、GTIDEvent
doc
- BinlogEvent