序本文主要研究一下puma的DefaultDataHandler
DataHandlerpuma/puma/src/main/java/com/dianping/puma/datahandler/DataHandler.java
public interface DataHandler extends LifeCycle { DataHandlerResult process(BinlogEvent binlogEvent, PumaContext context);}DataHandler继承了LifeCycle,它定义了process方法AbstractDataHandlerpuma/puma/src/main/java/com/dianping/puma/datahandler/AbstractDataHandler.java
@ThreadUnSafepublic abstract class AbstractDataHandler implements DataHandler { private static final Logger log = Logger.getLogger(AbstractDataHandler.class); private TableMetaInfoFetcher tableMetasInfoFetcher; /** * @return the tableMetasInfoFetcher */ public TableMetaInfoFetcher getTableMetasInfoFetcher() { return tableMetasInfoFetcher; } /** * @param tableMetasInfoFetcher the tableMetasInfoFetcher to set */ public void setTableMetasInfoFetcher(TableMetaInfoFetcher tableMetasInfoFetcher) { this.tableMetasInfoFetcher = tableMetasInfoFetcher; } @Override public void start() { } @Override public void stop() { } protected Object convertUnsignedValueIfNeeded(int pos, Object value, TableMetaInfo tableMeta) { Object newValue = value; if (value != null) { switch (tableMeta.getRawTypeCodes().get(pos)) { case BinlogConstants.MYSQL_TYPE_TINY: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Integer.valueOf((Integer) value + (1 << 8)); } break; case BinlogConstants.MYSQL_TYPE_INT24: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Integer.valueOf((Integer) value + (1 << 24)); } break; case BinlogConstants.MYSQL_TYPE_SHORT: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Integer.valueOf((Integer) value + (1 << 16)); } break; case BinlogConstants.MYSQL_TYPE_INT: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Long.valueOf((Integer) value) + (1L << 32); } else { if (value instanceof Integer) { newValue = Long.valueOf((Integer) value); } } break; case BinlogConstants.MYSQL_TYPE_LONGLONG: if ((value instanceof Long) && (Long) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = BigInteger.valueOf((Long) value).add(BigInteger.ONE.shiftLeft(64)); } else { if (value instanceof Long) { newValue = BigInteger.valueOf((Long) value); } } break; default: break; } } return newValue; } @Override public DataHandlerResult process(BinlogEvent binlogEvent, PumaContext context) { DataHandlerResult result = new DataHandlerResult(); if (binlogEvent instanceof PumaIgnoreEvent) { log.info("Ingore one unknown event. eventType: " + binlogEvent.getHeader().getEventType()); result.setEmpty(true); result.setFinished(true); return result; } byte eventType = binlogEvent.getHeader().getEventType(); if (log.isDebugEnabled()) { log.debug("event#" + eventType); } if (eventType == BinlogConstants.STOP_EVENT || eventType == BinlogConstants.ROTATE_EVENT) { result.setEmpty(true); result.setFinished(true); } else if (eventType == BinlogConstants.FORMAT_DESCRIPTION_EVENT) { result.setEmpty(true); result.setFinished(true); } else if (eventType == BinlogConstants.QUERY_EVENT) { handleQueryEvent(binlogEvent, result); } else { doProcess(result, binlogEvent, context, eventType); } if (result != null && !result.isEmpty() && result.getData() != null) { BinlogInfo binlogInfo = new BinlogInfo(context.getDBServerId(), context.getBinlogFileName(), context.getBinlogStartPos(), context.getEventIndex(), binlogEvent.getHeader().getTimestamp()); result.getData().setBinlogInfo(binlogInfo); result.getData().setServerId(binlogEvent.getHeader().getServerId()); } return result; } protected void handleQueryEvent(BinlogEvent binlogEvent, DataHandlerResult result) { QueryEvent queryEvent = (QueryEvent) binlogEvent; String sql = StringUtils.normalizeSpace(queryEvent.getSql()); if (StringUtils.startsWithIgnoreCase(sql, "ALTER ") || StringUtils.startsWithIgnoreCase(sql, "CREATE ") || StringUtils.startsWithIgnoreCase(sql, "DROP ") || StringUtils.startsWithIgnoreCase(sql, "RENAME ") || StringUtils.startsWithIgnoreCase(sql, "TRUNCATE ")) { handleDDlEvent(result, queryEvent, sql); } else if (StringUtils.equalsIgnoreCase(sql, "BEGIN")) { handleTransactionBeginEvent(binlogEvent, result, queryEvent); } else { result.setEmpty(true); result.setFinished(true); // log.info("QueryEvent sql=" + queryEvent.getSql()); } } protected void handleTransactionBeginEvent(BinlogEvent binlogEvent, DataHandlerResult result, QueryEvent queryEvent) { // BEGIN事件,发送一个begin transaction的事件 ChangedEvent dataChangedEvent = new RowChangedEvent(); ((RowChangedEvent) dataChangedEvent).setTransactionBegin(true); dataChangedEvent.setExecuteTime(binlogEvent.getHeader().getTimestamp()); dataChangedEvent.setDatabase(queryEvent.getDatabaseName()); result.setData(dataChangedEvent); result.setEmpty(false); result.setFinished(true); } /** * @param result * @param queryEvent * @param sql */ protected void handleDDlEvent(DataHandlerResult result, QueryEvent queryEvent, String sql) { ChangedEvent dataChangedEvent = new DdlEvent(); DdlEvent ddlEvent = (DdlEvent) dataChangedEvent; ddlEvent.setSql(sql); ddlEvent.setDdlEventType(SimpleDdlParser.getEventType(sql)); ddlEvent.setDdlEventSubType(SimpleDdlParser.getEventSubType(ddlEvent.getDdlEventType(), sql)); if (ddlEvent.getDdlEventType() == DdlEventType.DDL_DEFAULT || ddlEvent.getDdlEventSubType() == DdlEventSubType.DDL_SUB_DEFAULT) { log.info("DdlEvent Type do not found. ddl sql=" + sql); } SimpleDdlParser.DdlResult ddlResult = SimpleDdlParser .getDdlResult(ddlEvent.getDdlEventType(), ddlEvent.getDdlEventSubType(), sql); if (ddlResult != null) { ddlEvent.setDatabase(StringUtils.isNotBlank(ddlResult.getDatabase()) ? ddlResult.getDatabase() : StringUtils.EMPTY); ddlEvent.setTable(StringUtils.isNotBlank(ddlResult.getTable()) ? ddlResult.getTable() : StringUtils.EMPTY); if (ddlEvent.getDdlEventType() != DdlEventType.DDL_CREATE) { log.info("DDL event, sql=" + sql + " ,database =" + ddlResult.getDatabase() + " table =" + ddlResult.getTable() + " queryEvent.getDatabaseName()" + queryEvent.getDatabaseName()); } } if (StringUtils.isBlank(ddlEvent.getDatabase())) { ddlEvent.setDatabase(queryEvent.getDatabaseName()); } if (ddlEvent.getDdlEventType() == DdlEventType.DDL_ALTER && ddlEvent.getDdlEventSubType() == DdlEventSubType.DDL_ALTER_TABLE) { ddlEvent.setDDLType(DDLType.ALTER_TABLE); } tableMetasInfoFetcher.refreshTableMeta(ddlEvent.getDatabase(), ddlEvent.getTable()); ddlEvent.setExecuteTime(queryEvent.getHeader().getTimestamp()); result.setData(dataChangedEvent); result.setEmpty(false); result.setFinished(true); } protected abstract void doProcess(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context, byte eventType);}AbstractDataHandler声明实现了DataHandler接口,其process方法针对BinlogConstants.QUERY_EVENT执行handleQueryEvent,非STOP_EVENT、ROTATE_EVENT、FORMAT_DESCRIPTION_EVENT的event执行doProcess,该方法由子类去实现;handleQueryEvent方法针对ALTER、CREATE、DROP、RENAME、TRUNCATE的执行handleDDlEvent方法,针对BEGIN的执行handleTransactionBeginEvent方法;handleDDlEvent方法会执行tableMetasInfoFetcher.refreshTableMeta;handleTransactionBeginEvent方法标记result的finished为trueDefaultDataHandlerpuma/puma/src/main/java/com/dianping/puma/datahandler/DefaultDataHandler.java
...