本文主要研究一下puma的DefaultDataHandler

DataHandler

puma/puma/src/main/java/com/dianping/puma/datahandler/DataHandler.java

public interface DataHandler extends LifeCycle {    DataHandlerResult process(BinlogEvent binlogEvent, PumaContext context);}
  • DataHandler继承了LifeCycle,它定义了process方法

AbstractDataHandler

puma/puma/src/main/java/com/dianping/puma/datahandler/AbstractDataHandler.java

@ThreadUnSafepublic abstract class AbstractDataHandler implements DataHandler {    private static final Logger log = Logger.getLogger(AbstractDataHandler.class);    private TableMetaInfoFetcher tableMetasInfoFetcher;    /**     * @return the tableMetasInfoFetcher     */    public TableMetaInfoFetcher getTableMetasInfoFetcher() {        return tableMetasInfoFetcher;    }    /**     * @param tableMetasInfoFetcher the tableMetasInfoFetcher to set     */    public void setTableMetasInfoFetcher(TableMetaInfoFetcher tableMetasInfoFetcher) {        this.tableMetasInfoFetcher = tableMetasInfoFetcher;    }    @Override    public void start() {    }    @Override    public void stop() {    }    protected Object convertUnsignedValueIfNeeded(int pos, Object value, TableMetaInfo tableMeta) {        Object newValue = value;        if (value != null) {            switch (tableMeta.getRawTypeCodes().get(pos)) {                case BinlogConstants.MYSQL_TYPE_TINY:                    if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) {                        newValue = Integer.valueOf((Integer) value + (1 << 8));                    }                    break;                case BinlogConstants.MYSQL_TYPE_INT24:                    if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) {                        newValue = Integer.valueOf((Integer) value + (1 << 24));                    }                    break;                case BinlogConstants.MYSQL_TYPE_SHORT:                    if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) {                        newValue = Integer.valueOf((Integer) value + (1 << 16));                    }                    break;                case BinlogConstants.MYSQL_TYPE_INT:                    if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) {                        newValue = Long.valueOf((Integer) value) + (1L << 32);                    } else {                        if (value instanceof Integer) {                            newValue = Long.valueOf((Integer) value);                        }                    }                    break;                case BinlogConstants.MYSQL_TYPE_LONGLONG:                    if ((value instanceof Long) && (Long) value < 0 && !tableMeta.getSignedInfos().get(pos)) {                        newValue = BigInteger.valueOf((Long) value).add(BigInteger.ONE.shiftLeft(64));                    } else {                        if (value instanceof Long) {                            newValue = BigInteger.valueOf((Long) value);                        }                    }                    break;                default:                    break;            }        }        return newValue;    }    @Override    public DataHandlerResult process(BinlogEvent binlogEvent, PumaContext context) {        DataHandlerResult result = new DataHandlerResult();        if (binlogEvent instanceof PumaIgnoreEvent) {            log.info("Ingore one unknown event. eventType: " + binlogEvent.getHeader().getEventType());            result.setEmpty(true);            result.setFinished(true);            return result;        }        byte eventType = binlogEvent.getHeader().getEventType();        if (log.isDebugEnabled()) {            log.debug("event#" + eventType);        }        if (eventType == BinlogConstants.STOP_EVENT || eventType == BinlogConstants.ROTATE_EVENT) {            result.setEmpty(true);            result.setFinished(true);        } else if (eventType == BinlogConstants.FORMAT_DESCRIPTION_EVENT) {            result.setEmpty(true);            result.setFinished(true);        } else if (eventType == BinlogConstants.QUERY_EVENT) {            handleQueryEvent(binlogEvent, result);        } else {            doProcess(result, binlogEvent, context, eventType);        }        if (result != null && !result.isEmpty() && result.getData() != null) {            BinlogInfo binlogInfo = new BinlogInfo(context.getDBServerId(), context.getBinlogFileName(),                    context.getBinlogStartPos(), context.getEventIndex(), binlogEvent.getHeader().getTimestamp());            result.getData().setBinlogInfo(binlogInfo);            result.getData().setServerId(binlogEvent.getHeader().getServerId());        }        return result;    }    protected void handleQueryEvent(BinlogEvent binlogEvent, DataHandlerResult result) {        QueryEvent queryEvent = (QueryEvent) binlogEvent;        String sql = StringUtils.normalizeSpace(queryEvent.getSql());        if (StringUtils.startsWithIgnoreCase(sql, "ALTER ") || StringUtils.startsWithIgnoreCase(sql, "CREATE ")                || StringUtils.startsWithIgnoreCase(sql, "DROP ") || StringUtils.startsWithIgnoreCase(sql, "RENAME ")                || StringUtils.startsWithIgnoreCase(sql, "TRUNCATE ")) {            handleDDlEvent(result, queryEvent, sql);        } else if (StringUtils.equalsIgnoreCase(sql, "BEGIN")) {            handleTransactionBeginEvent(binlogEvent, result, queryEvent);        } else {            result.setEmpty(true);            result.setFinished(true);            // log.info("QueryEvent  sql=" + queryEvent.getSql());        }    }    protected void handleTransactionBeginEvent(BinlogEvent binlogEvent, DataHandlerResult result,                                               QueryEvent queryEvent) {        // BEGIN事件,发送一个begin transaction的事件        ChangedEvent dataChangedEvent = new RowChangedEvent();        ((RowChangedEvent) dataChangedEvent).setTransactionBegin(true);        dataChangedEvent.setExecuteTime(binlogEvent.getHeader().getTimestamp());        dataChangedEvent.setDatabase(queryEvent.getDatabaseName());        result.setData(dataChangedEvent);        result.setEmpty(false);        result.setFinished(true);    }    /**     * @param result     * @param queryEvent     * @param sql     */    protected void handleDDlEvent(DataHandlerResult result, QueryEvent queryEvent, String sql) {        ChangedEvent dataChangedEvent = new DdlEvent();        DdlEvent ddlEvent = (DdlEvent) dataChangedEvent;        ddlEvent.setSql(sql);        ddlEvent.setDdlEventType(SimpleDdlParser.getEventType(sql));        ddlEvent.setDdlEventSubType(SimpleDdlParser.getEventSubType(ddlEvent.getDdlEventType(), sql));        if (ddlEvent.getDdlEventType() == DdlEventType.DDL_DEFAULT                || ddlEvent.getDdlEventSubType() == DdlEventSubType.DDL_SUB_DEFAULT) {            log.info("DdlEvent Type do not found. ddl sql=" + sql);        }        SimpleDdlParser.DdlResult ddlResult = SimpleDdlParser                .getDdlResult(ddlEvent.getDdlEventType(), ddlEvent.getDdlEventSubType(), sql);        if (ddlResult != null) {            ddlEvent.setDatabase(StringUtils.isNotBlank(ddlResult.getDatabase()) ? ddlResult.getDatabase()                    : StringUtils.EMPTY);            ddlEvent.setTable(StringUtils.isNotBlank(ddlResult.getTable()) ? ddlResult.getTable() : StringUtils.EMPTY);            if (ddlEvent.getDdlEventType() != DdlEventType.DDL_CREATE) {                log.info("DDL event, sql=" + sql + "  ,database =" + ddlResult.getDatabase() + " table ="                        + ddlResult.getTable() + " queryEvent.getDatabaseName()" + queryEvent.getDatabaseName());            }        }        if (StringUtils.isBlank(ddlEvent.getDatabase())) {            ddlEvent.setDatabase(queryEvent.getDatabaseName());        }        if (ddlEvent.getDdlEventType() == DdlEventType.DDL_ALTER                && ddlEvent.getDdlEventSubType() == DdlEventSubType.DDL_ALTER_TABLE) {            ddlEvent.setDDLType(DDLType.ALTER_TABLE);        }        tableMetasInfoFetcher.refreshTableMeta(ddlEvent.getDatabase(), ddlEvent.getTable());        ddlEvent.setExecuteTime(queryEvent.getHeader().getTimestamp());        result.setData(dataChangedEvent);        result.setEmpty(false);        result.setFinished(true);    }    protected abstract void doProcess(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context,                                      byte eventType);}
  • AbstractDataHandler声明实现了DataHandler接口,其process方法针对BinlogConstants.QUERY_EVENT执行handleQueryEvent,非STOP_EVENT、ROTATE_EVENT、FORMAT_DESCRIPTION_EVENT的event执行doProcess,该方法由子类去实现;handleQueryEvent方法针对ALTER、CREATE、DROP、RENAME、TRUNCATE的执行handleDDlEvent方法,针对BEGIN的执行handleTransactionBeginEvent方法;handleDDlEvent方法会执行tableMetasInfoFetcher.refreshTableMeta;handleTransactionBeginEvent方法标记result的finished为true

DefaultDataHandler

puma/puma/src/main/java/com/dianping/puma/datahandler/DefaultDataHandler.java

@ThreadUnSafepublic class DefaultDataHandler extends AbstractDataHandler {    private Logger log = Logger.getLogger(DefaultDataHandler.class);    private Map<Long, TableMetaInfo> tableMetaInfos;    private int rowPos = 0;    @Override    protected void doProcess(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context, byte eventType) {        if (log.isDebugEnabled()) {            log.debug("event:" + eventType);        }        switch (eventType) {            case BinlogConstants.TABLE_MAP_EVENT:                TableMapEvent tableMapEvent = (TableMapEvent) binlogEvent;                if (tableMetaInfos == null) {                    tableMetaInfos = new HashMap<Long, TableMetaInfo>();                }                TableMetaInfo tableMetaInfo = getTableMetasInfoFetcher().getTableMetaInfo(tableMapEvent.getDatabaseName(),                        tableMapEvent.getTableName());                if (tableMetaInfo != null) {                    tableMetaInfos.put(tableMapEvent.getTableId(), tableMetaInfo);                    if (log.isDebugEnabled()) {                        log.debug("put meta info for table id:" + tableMapEvent.getTableId());                    }                } else {                    if (log.isDebugEnabled()) {                        log.debug("meta info not found for:" + tableMapEvent.getDatabaseName() + "-"                                + tableMapEvent.getTableName());                    }                    skipEvent(BinlogConstants.TABLE_MAP_EVENT, result, context);                    return;                }                fillRawTypeCodes(tableMapEvent, tableMetaInfo);                fillRawNullAbilities(tableMapEvent, tableMetaInfo);                rowPos = 0;                result.setEmpty(true);                result.setFinished(true);                break;            case BinlogConstants.WRITE_ROWS_EVENT_V1:            case BinlogConstants.WRITE_ROWS_EVENT:                if (tableMetaInfos == null || tableMetaInfos.isEmpty()) {                    skipEvent(BinlogConstants.WRITE_ROWS_EVENT, result, context);                    return;                }                processWriteRowEvent(result, binlogEvent, context);                break;            case BinlogConstants.UPDATE_ROWS_EVENT_V1:            case BinlogConstants.UPDATE_ROWS_EVENT:                if (tableMetaInfos == null || tableMetaInfos.isEmpty()) {                    skipEvent(BinlogConstants.UPDATE_ROWS_EVENT, result, context);                    return;                }                processUpdateRowEvent(result, binlogEvent, context);                break;            case BinlogConstants.DELETE_ROWS_EVENT_V1:            case BinlogConstants.DELETE_ROWS_EVENT:                if (tableMetaInfos == null || tableMetaInfos.isEmpty()) {                    skipEvent(BinlogConstants.DELETE_ROWS_EVENT, result, context);                    return;                }                processDeleteRowEvent(result, binlogEvent, context);                break;            case BinlogConstants.XID_EVENT:                if (tableMetaInfos == null || tableMetaInfos.isEmpty()) {                    skipEvent(BinlogConstants.XID_EVENT, result, context);                    return;                }                processTransactionCommitEvent(binlogEvent, result);                break;            default:                result.setEmpty(true);                result.setFinished(true);                break;        }    }    //......}
  • DefaultDataHandler继承了AbstractDataHandler,其doProcess方法针对不同的eventType做不同的处理;对于TABLE_MAP_EVENT更新tableMetaInfo;对于WRITE_ROWS_EVENT_V1、WRITE_ROWS_EVENT执行processWriteRowEvent;对于UPDATE_ROWS_EVENT_V1、UPDATE_ROWS_EVENT执行processUpdateRowEvent;对于DELETE_ROWS_EVENT_V1、DELETE_ROWS_EVENT执行processDeleteRowEvent;对于XID_EVENT执行processTransactionCommitEvent

processWriteRowEvent

    protected void processWriteRowEvent(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context) {        WriteRowsEvent writeRowsEvent = (WriteRowsEvent) binlogEvent;        if (rowPos >= writeRowsEvent.getRows().size()) {            rowPos = 0;            result.setEmpty(true);            result.setFinished(true);        } else {            TableMetaInfo tableMetaInfo = tableMetaInfos.get(writeRowsEvent.getTableId());            if (tableMetaInfo == null) {                skipEvent(BinlogConstants.WRITE_ROWS_EVENT, result, context);                return;            }            RowChangedEvent rowChangedEvent = new RowChangedEvent();            Map<String, ColumnInfo> columns = initColumns(writeRowsEvent, rowChangedEvent, DMLType.INSERT,                    tableMetaInfo);            for (int columnPos = 0, columnIndex = 0; columnPos < writeRowsEvent.getColumnCount().intValue(); columnPos++) {                if (writeRowsEvent.getUsedColumns().get(columnPos)) {                    Column binlogColumn = writeRowsEvent.getRows().get(rowPos).getColumns().get(columnIndex);                    String columnName = tableMetaInfo.getColumns().get(columnPos + 1);                    if (!checkUnknownColumnName(result, context, columnName, columnPos + 1)) {                        return;                    }                    ColumnInfo columnInfo = new ColumnInfo(tableMetaInfo.getKeys().contains(columnName), null,                            convertUnsignedValueIfNeeded(columnPos + 1, binlogColumn.getValue(), tableMetaInfo));                    columns.put(columnName, columnInfo);                    columnIndex++;                }            }            rowPos++;            result.setData(rowChangedEvent);            result.setEmpty(false);            result.setFinished(false);        }    }
  • processWriteRowEvent方法主要构建rowChangedEvent,并填充其columns

processUpdateRowEvent

    protected void processUpdateRowEvent(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context) {        UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent) binlogEvent;        if (rowPos >= updateRowsEvent.getRows().size()) {            rowPos = 0;            result.setEmpty(true);            result.setFinished(true);        } else {            TableMetaInfo tableMetaInfo = tableMetaInfos.get(updateRowsEvent.getTableId());            if (tableMetaInfo == null) {                skipEvent(BinlogConstants.UPDATE_ROWS_EVENT, result, context);                return;            }            RowChangedEvent rowChangedEvent = new RowChangedEvent();            Map<String, ColumnInfo> columns = initColumns(updateRowsEvent, rowChangedEvent, DMLType.UPDATE,                    tableMetaInfo);            if (log.isDebugEnabled()) {                log.debug("update from " + tableMetaInfo.getDatabase() + "." + tableMetaInfo.getTable());            }            for (int columnPos = 0, columnAfterIndex = 0, columnBeforeIndex = 0; columnPos < updateRowsEvent                    .getColumnCount().intValue(); columnPos++) {                String columnName = tableMetaInfo.getColumns().get(columnPos + 1);                if (!checkUnknownColumnName(result, context, columnName, columnPos + 1)) {                    return;                }                Column afterColumn = null;                Column beforeColumn = null;                if (updateRowsEvent.getUsedColumnsAfter().get(columnPos)) {                    afterColumn = updateRowsEvent.getRows().get(rowPos).getAfter().getColumns().get(columnAfterIndex);                    columnAfterIndex++;                }                if (updateRowsEvent.getUsedColumnsBefore().get(columnPos)) {                    beforeColumn = updateRowsEvent.getRows().get(rowPos).getBefore().getColumns().get(columnBeforeIndex);                    columnBeforeIndex++;                }                ColumnInfo columnInfo = new ColumnInfo(tableMetaInfo.getKeys().contains(columnName),                        beforeColumn == null ? null : convertUnsignedValueIfNeeded(columnPos + 1, beforeColumn.getValue(),                                tableMetaInfo), afterColumn == null ? null : convertUnsignedValueIfNeeded(columnPos + 1,                        afterColumn.getValue(), tableMetaInfo));                columns.put(columnName, columnInfo);            }            rowPos++;            result.setData(rowChangedEvent);            result.setEmpty(false);            result.setFinished(false);        }    }
  • processUpdateRowEvent主要构建rowChangedEvent,并填充其columns

processDeleteRowEvent

    protected void processDeleteRowEvent(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context) {        DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent) binlogEvent;        if (rowPos >= deleteRowsEvent.getRows().size()) {            rowPos = 0;            result.setEmpty(true);            result.setFinished(true);        } else {            TableMetaInfo tableMetaInfo = tableMetaInfos.get(deleteRowsEvent.getTableId());            if (tableMetaInfo == null) {                skipEvent(BinlogConstants.DELETE_ROWS_EVENT, result, context);                return;            }            RowChangedEvent rowChangedEvent = new RowChangedEvent();            Map<String, ColumnInfo> columns = initColumns(deleteRowsEvent, rowChangedEvent, DMLType.DELETE,                    tableMetaInfo);            for (int columnPos = 0, columnIndex = 0; columnPos < deleteRowsEvent.getColumnCount().intValue(); columnPos++) {                if (deleteRowsEvent.getUsedColumns().get(columnPos)) {                    Column binlogColumn = deleteRowsEvent.getRows().get(rowPos).getColumns().get(columnIndex);                    String columnName = tableMetaInfo.getColumns().get(columnPos + 1);                    if (!checkUnknownColumnName(result, context, columnName, columnPos + 1)) {                        return;                    }                    ColumnInfo columnInfo = new ColumnInfo(tableMetaInfo.getKeys().contains(columnName),                            convertUnsignedValueIfNeeded(columnPos + 1, binlogColumn.getValue(), tableMetaInfo), null);                    columns.put(columnName, columnInfo);                    columnIndex++;                }            }            rowPos++;            result.setData(rowChangedEvent);            result.setEmpty(false);            result.setFinished(false);        }    }
  • processDeleteRowEvent主要构建rowChangedEvent,并填充其columns

processTransactionCommitEvent

    protected void processTransactionCommitEvent(BinlogEvent binlogEvent, DataHandlerResult result) {        // commit事件,发送一个commit transaction的事件        ChangedEvent dataChangedEvent = new RowChangedEvent();        ((RowChangedEvent) dataChangedEvent).setTransactionCommit(true);        dataChangedEvent.setExecuteTime(binlogEvent.getHeader().getTimestamp());        dataChangedEvent.setDatabase(tableMetaInfos.values().iterator().next().getDatabase());        result.setData(dataChangedEvent);        result.setEmpty(false);        result.setFinished(true);        tableMetaInfos.clear();        tableMetaInfos = null;    }
  • processTransactionCommitEvent方法主要是构建dataChangedEvent,并标记result的finished为true,并清空tableMetaInfos

小结

DefaultDataHandler继承了AbstractDataHandler,其doProcess方法针对不同的eventType做不同的处理;对于TABLE_MAP_EVENT更新tableMetaInfo;对于WRITE_ROWS_EVENT_V1、WRITE_ROWS_EVENT执行processWriteRowEvent;对于UPDATE_ROWS_EVENT_V1、UPDATE_ROWS_EVENT执行processUpdateRowEvent;对于DELETE_ROWS_EVENT_V1、DELETE_ROWS_EVENT执行processDeleteRowEvent;对于XID_EVENT执行processTransactionCommitEvent

doc

  • DefaultDataHandler