序
本文主要研究一下SpinalTap的MysqlEventFilter
MysqlEventFilter
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/filter/MysqlEventFilter.java
public abstract class MysqlEventFilter implements Filter<BinlogEvent> { public static Filter<BinlogEvent> create( @NonNull final TableCache tableCache, @NonNull final Set<String> tableNames, @NonNull final AtomicReference<SourceState> state) { return ChainedFilter.<BinlogEvent>builder() .addFilter(new EventTypeFilter()) .addFilter(new TableFilter(tableCache, tableNames)) .addFilter(new DuplicateFilter(state)) .build(); }}
- MysqlEventFilter提供了create方法,使用ChainedFilter来构造BinlogEvent的Filter,默认添加了EventTypeFilter、TableFilter、DuplicateFilter
EventTypeFilter
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/filter/EventTypeFilter.java
@RequiredArgsConstructorfinal class EventTypeFilter extends MysqlEventFilter { @SuppressWarnings("unchecked") private static final Set<Class<? extends BinlogEvent>> WHITELISTED_EVENT_TYPES = ImmutableSet.of( TableMapEvent.class, WriteEvent.class, UpdateEvent.class, DeleteEvent.class, XidEvent.class, QueryEvent.class, StartEvent.class, GTIDEvent.class); public boolean apply(@NonNull final BinlogEvent event) { return WHITELISTED_EVENT_TYPES.contains(event.getClass()); }}
- EventTypeFilter继承了MysqlEventFilter,其apply方法通过WHITELISTED_EVENT_TYPES来进行过滤
TableFilter
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/filter/TableFilter.java
@RequiredArgsConstructorfinal class TableFilter extends MysqlEventFilter { @NonNull private final TableCache tableCache; @NonNull private final Set<String> tableNames; public boolean apply(@NonNull final BinlogEvent event) { if (event instanceof TableMapEvent) { TableMapEvent tableMap = (TableMapEvent) event; return tableNames.contains( Table.canonicalNameOf(tableMap.getDatabase(), tableMap.getTable())); } else if (event.isMutation()) { return tableCache.contains(event.getTableId()); } return true; }}
- TableFilter继承了TableFilter,其apply方法在event为TableMapEvent是,通过database、table来判断,在event.isMutation()为true是通过tableCache.contains(event.getTableId())来判断,其余情况返回true
DuplicateFilter
SpinalTap/spinaltap-mysql/src/main/java/com/airbnb/spinaltap/mysql/event/filter/DuplicateFilter.java
@RequiredArgsConstructorpublic final class DuplicateFilter extends MysqlEventFilter { @NonNull private final AtomicReference<SourceState> state; public boolean apply(@NonNull final BinlogEvent event) { // Only applies to mutation events if (!event.isMutation()) { return true; } // We need to tell if position in `event` and in `state` are from the same source // MySQL server, because a failover may have happened and we are currently streaming // from the new master. // If they are from the same source server, we can just use the binlog filename and // position (offset) to tell whether we should skip this event. BinlogFilePos eventBinlogPos = event.getBinlogFilePos(); BinlogFilePos savedBinlogPos = state.get().getLastPosition(); // Use the same logic in BinlogFilePos.compareTo() here... if (BinlogFilePos.shouldCompareUsingFilePosition(eventBinlogPos, savedBinlogPos)) { return event.getOffset() > state.get().getLastOffset(); } // If this point is reached, a master failover might have happened. // We can only use GTIDSet to tell whether this event should be skipped. // We should only skip this event if GTIDSet in event is a "proper subset" of the GTIDSet // in saved state, because it is possible that the last transaction we streamed before the // failover is in the middle of a transaction. GtidSet eventGtidSet = eventBinlogPos.getGtidSet(); GtidSet savedGtidSet = savedBinlogPos.getGtidSet(); return !eventGtidSet.isContainedWithin(savedGtidSet) && !eventGtidSet.equals(savedGtidSet); }}
- DuplicateFilter继承了MysqlEventFilter,它会通过binlog的offset来判断
小结
MysqlEventFilter提供了create方法,使用ChainedFilter来构造BinlogEvent的Filter,默认添加了EventTypeFilter、TableFilter、DuplicateFilter
doc
- MysqlEventFilter