序
本文主要研究一下puma的DefaultDataHandler
DataHandler
puma/puma/src/main/java/com/dianping/puma/datahandler/DataHandler.java
public interface DataHandler extends LifeCycle { DataHandlerResult process(BinlogEvent binlogEvent, PumaContext context);}
- DataHandler继承了LifeCycle,它定义了process方法
AbstractDataHandler
puma/puma/src/main/java/com/dianping/puma/datahandler/AbstractDataHandler.java
@ThreadUnSafepublic abstract class AbstractDataHandler implements DataHandler { private static final Logger log = Logger.getLogger(AbstractDataHandler.class); private TableMetaInfoFetcher tableMetasInfoFetcher; /** * @return the tableMetasInfoFetcher */ public TableMetaInfoFetcher getTableMetasInfoFetcher() { return tableMetasInfoFetcher; } /** * @param tableMetasInfoFetcher the tableMetasInfoFetcher to set */ public void setTableMetasInfoFetcher(TableMetaInfoFetcher tableMetasInfoFetcher) { this.tableMetasInfoFetcher = tableMetasInfoFetcher; } @Override public void start() { } @Override public void stop() { } protected Object convertUnsignedValueIfNeeded(int pos, Object value, TableMetaInfo tableMeta) { Object newValue = value; if (value != null) { switch (tableMeta.getRawTypeCodes().get(pos)) { case BinlogConstants.MYSQL_TYPE_TINY: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Integer.valueOf((Integer) value + (1 << 8)); } break; case BinlogConstants.MYSQL_TYPE_INT24: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Integer.valueOf((Integer) value + (1 << 24)); } break; case BinlogConstants.MYSQL_TYPE_SHORT: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Integer.valueOf((Integer) value + (1 << 16)); } break; case BinlogConstants.MYSQL_TYPE_INT: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Long.valueOf((Integer) value) + (1L << 32); } else { if (value instanceof Integer) { newValue = Long.valueOf((Integer) value); } } break; case BinlogConstants.MYSQL_TYPE_LONGLONG: if ((value instanceof Long) && (Long) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = BigInteger.valueOf((Long) value).add(BigInteger.ONE.shiftLeft(64)); } else { if (value instanceof Long) { newValue = BigInteger.valueOf((Long) value); } } break; default: break; } } return newValue; } @Override public DataHandlerResult process(BinlogEvent binlogEvent, PumaContext context) { DataHandlerResult result = new DataHandlerResult(); if (binlogEvent instanceof PumaIgnoreEvent) { log.info("Ingore one unknown event. eventType: " + binlogEvent.getHeader().getEventType()); result.setEmpty(true); result.setFinished(true); return result; } byte eventType = binlogEvent.getHeader().getEventType(); if (log.isDebugEnabled()) { log.debug("event#" + eventType); } if (eventType == BinlogConstants.STOP_EVENT || eventType == BinlogConstants.ROTATE_EVENT) { result.setEmpty(true); result.setFinished(true); } else if (eventType == BinlogConstants.FORMAT_DESCRIPTION_EVENT) { result.setEmpty(true); result.setFinished(true); } else if (eventType == BinlogConstants.QUERY_EVENT) { handleQueryEvent(binlogEvent, result); } else { doProcess(result, binlogEvent, context, eventType); } if (result != null && !result.isEmpty() && result.getData() != null) { BinlogInfo binlogInfo = new BinlogInfo(context.getDBServerId(), context.getBinlogFileName(), context.getBinlogStartPos(), context.getEventIndex(), binlogEvent.getHeader().getTimestamp()); result.getData().setBinlogInfo(binlogInfo); result.getData().setServerId(binlogEvent.getHeader().getServerId()); } return result; } protected void handleQueryEvent(BinlogEvent binlogEvent, DataHandlerResult result) { QueryEvent queryEvent = (QueryEvent) binlogEvent; String sql = StringUtils.normalizeSpace(queryEvent.getSql()); if (StringUtils.startsWithIgnoreCase(sql, "ALTER ") || StringUtils.startsWithIgnoreCase(sql, "CREATE ") || StringUtils.startsWithIgnoreCase(sql, "DROP ") || StringUtils.startsWithIgnoreCase(sql, "RENAME ") || StringUtils.startsWithIgnoreCase(sql, "TRUNCATE ")) { handleDDlEvent(result, queryEvent, sql); } else if (StringUtils.equalsIgnoreCase(sql, "BEGIN")) { handleTransactionBeginEvent(binlogEvent, result, queryEvent); } else { result.setEmpty(true); result.setFinished(true); // log.info("QueryEvent sql=" + queryEvent.getSql()); } } protected void handleTransactionBeginEvent(BinlogEvent binlogEvent, DataHandlerResult result, QueryEvent queryEvent) { // BEGIN事件,发送一个begin transaction的事件 ChangedEvent dataChangedEvent = new RowChangedEvent(); ((RowChangedEvent) dataChangedEvent).setTransactionBegin(true); dataChangedEvent.setExecuteTime(binlogEvent.getHeader().getTimestamp()); dataChangedEvent.setDatabase(queryEvent.getDatabaseName()); result.setData(dataChangedEvent); result.setEmpty(false); result.setFinished(true); } /** * @param result * @param queryEvent * @param sql */ protected void handleDDlEvent(DataHandlerResult result, QueryEvent queryEvent, String sql) { ChangedEvent dataChangedEvent = new DdlEvent(); DdlEvent ddlEvent = (DdlEvent) dataChangedEvent; ddlEvent.setSql(sql); ddlEvent.setDdlEventType(SimpleDdlParser.getEventType(sql)); ddlEvent.setDdlEventSubType(SimpleDdlParser.getEventSubType(ddlEvent.getDdlEventType(), sql)); if (ddlEvent.getDdlEventType() == DdlEventType.DDL_DEFAULT || ddlEvent.getDdlEventSubType() == DdlEventSubType.DDL_SUB_DEFAULT) { log.info("DdlEvent Type do not found. ddl sql=" + sql); } SimpleDdlParser.DdlResult ddlResult = SimpleDdlParser .getDdlResult(ddlEvent.getDdlEventType(), ddlEvent.getDdlEventSubType(), sql); if (ddlResult != null) { ddlEvent.setDatabase(StringUtils.isNotBlank(ddlResult.getDatabase()) ? ddlResult.getDatabase() : StringUtils.EMPTY); ddlEvent.setTable(StringUtils.isNotBlank(ddlResult.getTable()) ? ddlResult.getTable() : StringUtils.EMPTY); if (ddlEvent.getDdlEventType() != DdlEventType.DDL_CREATE) { log.info("DDL event, sql=" + sql + " ,database =" + ddlResult.getDatabase() + " table =" + ddlResult.getTable() + " queryEvent.getDatabaseName()" + queryEvent.getDatabaseName()); } } if (StringUtils.isBlank(ddlEvent.getDatabase())) { ddlEvent.setDatabase(queryEvent.getDatabaseName()); } if (ddlEvent.getDdlEventType() == DdlEventType.DDL_ALTER && ddlEvent.getDdlEventSubType() == DdlEventSubType.DDL_ALTER_TABLE) { ddlEvent.setDDLType(DDLType.ALTER_TABLE); } tableMetasInfoFetcher.refreshTableMeta(ddlEvent.getDatabase(), ddlEvent.getTable()); ddlEvent.setExecuteTime(queryEvent.getHeader().getTimestamp()); result.setData(dataChangedEvent); result.setEmpty(false); result.setFinished(true); } protected abstract void doProcess(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context, byte eventType);}
- AbstractDataHandler声明实现了DataHandler接口,其process方法针对BinlogConstants.QUERY_EVENT执行handleQueryEvent,非STOP_EVENT、ROTATE_EVENT、FORMAT_DESCRIPTION_EVENT的event执行doProcess,该方法由子类去实现;handleQueryEvent方法针对ALTER、CREATE、DROP、RENAME、TRUNCATE的执行handleDDlEvent方法,针对BEGIN的执行handleTransactionBeginEvent方法;handleDDlEvent方法会执行tableMetasInfoFetcher.refreshTableMeta;handleTransactionBeginEvent方法标记result的finished为true
DefaultDataHandler
puma/puma/src/main/java/com/dianping/puma/datahandler/DefaultDataHandler.java
@ThreadUnSafepublic class DefaultDataHandler extends AbstractDataHandler { private Logger log = Logger.getLogger(DefaultDataHandler.class); private Map<Long, TableMetaInfo> tableMetaInfos; private int rowPos = 0; @Override protected void doProcess(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context, byte eventType) { if (log.isDebugEnabled()) { log.debug("event:" + eventType); } switch (eventType) { case BinlogConstants.TABLE_MAP_EVENT: TableMapEvent tableMapEvent = (TableMapEvent) binlogEvent; if (tableMetaInfos == null) { tableMetaInfos = new HashMap<Long, TableMetaInfo>(); } TableMetaInfo tableMetaInfo = getTableMetasInfoFetcher().getTableMetaInfo(tableMapEvent.getDatabaseName(), tableMapEvent.getTableName()); if (tableMetaInfo != null) { tableMetaInfos.put(tableMapEvent.getTableId(), tableMetaInfo); if (log.isDebugEnabled()) { log.debug("put meta info for table id:" + tableMapEvent.getTableId()); } } else { if (log.isDebugEnabled()) { log.debug("meta info not found for:" + tableMapEvent.getDatabaseName() + "-" + tableMapEvent.getTableName()); } skipEvent(BinlogConstants.TABLE_MAP_EVENT, result, context); return; } fillRawTypeCodes(tableMapEvent, tableMetaInfo); fillRawNullAbilities(tableMapEvent, tableMetaInfo); rowPos = 0; result.setEmpty(true); result.setFinished(true); break; case BinlogConstants.WRITE_ROWS_EVENT_V1: case BinlogConstants.WRITE_ROWS_EVENT: if (tableMetaInfos == null || tableMetaInfos.isEmpty()) { skipEvent(BinlogConstants.WRITE_ROWS_EVENT, result, context); return; } processWriteRowEvent(result, binlogEvent, context); break; case BinlogConstants.UPDATE_ROWS_EVENT_V1: case BinlogConstants.UPDATE_ROWS_EVENT: if (tableMetaInfos == null || tableMetaInfos.isEmpty()) { skipEvent(BinlogConstants.UPDATE_ROWS_EVENT, result, context); return; } processUpdateRowEvent(result, binlogEvent, context); break; case BinlogConstants.DELETE_ROWS_EVENT_V1: case BinlogConstants.DELETE_ROWS_EVENT: if (tableMetaInfos == null || tableMetaInfos.isEmpty()) { skipEvent(BinlogConstants.DELETE_ROWS_EVENT, result, context); return; } processDeleteRowEvent(result, binlogEvent, context); break; case BinlogConstants.XID_EVENT: if (tableMetaInfos == null || tableMetaInfos.isEmpty()) { skipEvent(BinlogConstants.XID_EVENT, result, context); return; } processTransactionCommitEvent(binlogEvent, result); break; default: result.setEmpty(true); result.setFinished(true); break; } } //......}
- DefaultDataHandler继承了AbstractDataHandler,其doProcess方法针对不同的eventType做不同的处理;对于TABLE_MAP_EVENT更新tableMetaInfo;对于WRITE_ROWS_EVENT_V1、WRITE_ROWS_EVENT执行processWriteRowEvent;对于UPDATE_ROWS_EVENT_V1、UPDATE_ROWS_EVENT执行processUpdateRowEvent;对于DELETE_ROWS_EVENT_V1、DELETE_ROWS_EVENT执行processDeleteRowEvent;对于XID_EVENT执行processTransactionCommitEvent
processWriteRowEvent
protected void processWriteRowEvent(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context) { WriteRowsEvent writeRowsEvent = (WriteRowsEvent) binlogEvent; if (rowPos >= writeRowsEvent.getRows().size()) { rowPos = 0; result.setEmpty(true); result.setFinished(true); } else { TableMetaInfo tableMetaInfo = tableMetaInfos.get(writeRowsEvent.getTableId()); if (tableMetaInfo == null) { skipEvent(BinlogConstants.WRITE_ROWS_EVENT, result, context); return; } RowChangedEvent rowChangedEvent = new RowChangedEvent(); Map<String, ColumnInfo> columns = initColumns(writeRowsEvent, rowChangedEvent, DMLType.INSERT, tableMetaInfo); for (int columnPos = 0, columnIndex = 0; columnPos < writeRowsEvent.getColumnCount().intValue(); columnPos++) { if (writeRowsEvent.getUsedColumns().get(columnPos)) { Column binlogColumn = writeRowsEvent.getRows().get(rowPos).getColumns().get(columnIndex); String columnName = tableMetaInfo.getColumns().get(columnPos + 1); if (!checkUnknownColumnName(result, context, columnName, columnPos + 1)) { return; } ColumnInfo columnInfo = new ColumnInfo(tableMetaInfo.getKeys().contains(columnName), null, convertUnsignedValueIfNeeded(columnPos + 1, binlogColumn.getValue(), tableMetaInfo)); columns.put(columnName, columnInfo); columnIndex++; } } rowPos++; result.setData(rowChangedEvent); result.setEmpty(false); result.setFinished(false); } }
- processWriteRowEvent方法主要构建rowChangedEvent,并填充其columns
processUpdateRowEvent
protected void processUpdateRowEvent(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context) { UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent) binlogEvent; if (rowPos >= updateRowsEvent.getRows().size()) { rowPos = 0; result.setEmpty(true); result.setFinished(true); } else { TableMetaInfo tableMetaInfo = tableMetaInfos.get(updateRowsEvent.getTableId()); if (tableMetaInfo == null) { skipEvent(BinlogConstants.UPDATE_ROWS_EVENT, result, context); return; } RowChangedEvent rowChangedEvent = new RowChangedEvent(); Map<String, ColumnInfo> columns = initColumns(updateRowsEvent, rowChangedEvent, DMLType.UPDATE, tableMetaInfo); if (log.isDebugEnabled()) { log.debug("update from " + tableMetaInfo.getDatabase() + "." + tableMetaInfo.getTable()); } for (int columnPos = 0, columnAfterIndex = 0, columnBeforeIndex = 0; columnPos < updateRowsEvent .getColumnCount().intValue(); columnPos++) { String columnName = tableMetaInfo.getColumns().get(columnPos + 1); if (!checkUnknownColumnName(result, context, columnName, columnPos + 1)) { return; } Column afterColumn = null; Column beforeColumn = null; if (updateRowsEvent.getUsedColumnsAfter().get(columnPos)) { afterColumn = updateRowsEvent.getRows().get(rowPos).getAfter().getColumns().get(columnAfterIndex); columnAfterIndex++; } if (updateRowsEvent.getUsedColumnsBefore().get(columnPos)) { beforeColumn = updateRowsEvent.getRows().get(rowPos).getBefore().getColumns().get(columnBeforeIndex); columnBeforeIndex++; } ColumnInfo columnInfo = new ColumnInfo(tableMetaInfo.getKeys().contains(columnName), beforeColumn == null ? null : convertUnsignedValueIfNeeded(columnPos + 1, beforeColumn.getValue(), tableMetaInfo), afterColumn == null ? null : convertUnsignedValueIfNeeded(columnPos + 1, afterColumn.getValue(), tableMetaInfo)); columns.put(columnName, columnInfo); } rowPos++; result.setData(rowChangedEvent); result.setEmpty(false); result.setFinished(false); } }
- processUpdateRowEvent主要构建rowChangedEvent,并填充其columns
processDeleteRowEvent
protected void processDeleteRowEvent(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context) { DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent) binlogEvent; if (rowPos >= deleteRowsEvent.getRows().size()) { rowPos = 0; result.setEmpty(true); result.setFinished(true); } else { TableMetaInfo tableMetaInfo = tableMetaInfos.get(deleteRowsEvent.getTableId()); if (tableMetaInfo == null) { skipEvent(BinlogConstants.DELETE_ROWS_EVENT, result, context); return; } RowChangedEvent rowChangedEvent = new RowChangedEvent(); Map<String, ColumnInfo> columns = initColumns(deleteRowsEvent, rowChangedEvent, DMLType.DELETE, tableMetaInfo); for (int columnPos = 0, columnIndex = 0; columnPos < deleteRowsEvent.getColumnCount().intValue(); columnPos++) { if (deleteRowsEvent.getUsedColumns().get(columnPos)) { Column binlogColumn = deleteRowsEvent.getRows().get(rowPos).getColumns().get(columnIndex); String columnName = tableMetaInfo.getColumns().get(columnPos + 1); if (!checkUnknownColumnName(result, context, columnName, columnPos + 1)) { return; } ColumnInfo columnInfo = new ColumnInfo(tableMetaInfo.getKeys().contains(columnName), convertUnsignedValueIfNeeded(columnPos + 1, binlogColumn.getValue(), tableMetaInfo), null); columns.put(columnName, columnInfo); columnIndex++; } } rowPos++; result.setData(rowChangedEvent); result.setEmpty(false); result.setFinished(false); } }
- processDeleteRowEvent主要构建rowChangedEvent,并填充其columns
processTransactionCommitEvent
protected void processTransactionCommitEvent(BinlogEvent binlogEvent, DataHandlerResult result) { // commit事件,发送一个commit transaction的事件 ChangedEvent dataChangedEvent = new RowChangedEvent(); ((RowChangedEvent) dataChangedEvent).setTransactionCommit(true); dataChangedEvent.setExecuteTime(binlogEvent.getHeader().getTimestamp()); dataChangedEvent.setDatabase(tableMetaInfos.values().iterator().next().getDatabase()); result.setData(dataChangedEvent); result.setEmpty(false); result.setFinished(true); tableMetaInfos.clear(); tableMetaInfos = null; }
- processTransactionCommitEvent方法主要是构建dataChangedEvent,并标记result的finished为true,并清空tableMetaInfos
小结
DefaultDataHandler继承了AbstractDataHandler,其doProcess方法针对不同的eventType做不同的处理;对于TABLE_MAP_EVENT更新tableMetaInfo;对于WRITE_ROWS_EVENT_V1、WRITE_ROWS_EVENT执行processWriteRowEvent;对于UPDATE_ROWS_EVENT_V1、UPDATE_ROWS_EVENT执行processUpdateRowEvent;对于DELETE_ROWS_EVENT_V1、DELETE_ROWS_EVENT执行processDeleteRowEvent;对于XID_EVENT执行processTransactionCommitEvent
doc
- DefaultDataHandler