聊聊puma的ClientPositionService

序本文主要研究一下puma的ClientPositionService ClientPositionServicepuma/puma/src/main/java/com/dianping/puma/biz/service/ClientPositionService.java public interface ClientPositionService { List<ClientPositionEntity> findAll(); ClientPositionEntity find(String clientName); void update(ClientPositionEntity clientPositionEntity, boolean flush); void flush(); void cleanUpTestClients();}ClientPositionService定义了findAll、find、update、flush、cleanUpTestClients方法ClientPositionServiceImplpuma/puma/src/main/java/com/dianping/puma/biz/service/impl/ClientPositionServiceImpl.java @Servicepublic class ClientPositionServiceImpl implements ClientPositionService { private final static Logger logger = LoggerFactory.getLogger(ClientPositionServiceImpl.class); @Autowired private ClientPositionDao clientPositionDao; private Map<String, ClientPositionEntity> positionEntityMap = new ConcurrentHashMap<String, ClientPositionEntity>(); @Override public List<ClientPositionEntity> findAll() { return clientPositionDao.findAll(); } @Override public ClientPositionEntity find(String clientName) { return clientPositionDao.findByClientName(clientName); } @Override public void update(ClientPositionEntity clientPositionEntity, boolean flush) { if (flush) { positionEntityMap.remove(clientPositionEntity.getClientName()); insertOrUpdate(clientPositionEntity); } else { positionEntityMap.put(clientPositionEntity.getClientName(), clientPositionEntity); } } @Scheduled(fixedDelay = 5000) public void flush() { Set<String> keys = positionEntityMap.keySet(); for (String key : keys) { ClientPositionEntity entity = positionEntityMap.remove(key); if (entity == null) { continue; } insertOrUpdate(entity); } } private void insertOrUpdate(ClientPositionEntity entity) { try { entity.setUpdateTime(new Date()); int updateRow = clientPositionDao.update(entity); if (updateRow == 0) { clientPositionDao.insert(entity); } } catch (Exception e) { logger.error(e.getMessage(), e); } } public void cleanUpTestClients() { List<ClientPositionEntity> clients = clientPositionDao.findOldTestClient(); for (ClientPositionEntity entity : clients) { clientPositionDao.delete(entity.getId()); } }}ClientPositionServiceImpl实现了ClientPositionService接口,其findAll方法执行clientPositionDao.findAll();其find方法执行clientPositionDao.findByClientName(clientName);其update方法在flush为true时执行positionEntityMap.remove及insertOrUpdate,在flush为false时执行positionEntityMap.put;其flush方法遍历positionEntityMap,挨个移除,然后执行insertOrUpdate(entity)ClientPositionDaopuma/biz/src/main/java/com/dianping/puma/biz/dao/ClientPositionDao.java ...

June 9, 2020 · 2 min · jiezi

聊聊puma的Sender

序本文主要研究一下puma的Sender Senderpuma/puma/src/main/java/com/dianping/puma/sender/Sender.java public interface Sender extends LifeCycle { String getName(); void send(ChangedEvent event, PumaContext context) throws SenderException;}Sender定义了getName、send方法AbstractSenderpuma/puma/src/main/java/com/dianping/puma/sender/AbstractSender.java public abstract class AbstractSender implements Sender { protected static final Logger LOG = LoggerFactory.getLogger(AbstractSender.class); private String name; private int maxTryTimes = 3; private boolean canMissEvent = false; private volatile boolean stopped = true; private final String MSG_SKIP = "[Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] "; private final String MSG_LOOP_FAILED = "[Can't Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] "; /** * @return the stop */ public boolean isStop() { return stopped; } /** * @return the maxTryTimes */ public int getMaxTryTimes() { return maxTryTimes; } /** * @param maxTryTimes the maxTryTimes to set */ public void setMaxTryTimes(int maxTryTimes) { this.maxTryTimes = maxTryTimes; } /** * @return the canMissEvent */ public boolean isCanMissEvent() { return canMissEvent; } /** * @param canMissEvent the canMissEvent to set */ public void setCanMissEvent(boolean canMissEvent) { this.canMissEvent = canMissEvent; } /* * (non-Javadoc) * * @see com.dianping.puma.common.LifeCycle#start() */ @Override public void start() { stopped = false; } /* * (non-Javadoc) * * @see com.dianping.puma.common.LifeCycle#stop() */ @Override public void stop() { stopped = true; } /* * (non-Javadoc) * * @see com.dianping.puma.sender.Sender#getName() */ @Override public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public void send(ChangedEvent event, PumaContext context) throws SenderException { long retryCount = 0; while (true) { if (isStop()) { break; } try { doSend(event, context); break; } catch (Exception e) { LOG.error("Send error!", e); if (retryCount++ > maxTryTimes) { if (canMissEvent) { LOG.error(String.format(MSG_SKIP, maxTryTimes, context.getPumaServerName(), context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos())); return; } else { if (retryCount % 100 == 0) { LOG.error(String.format(MSG_LOOP_FAILED, maxTryTimes, context.getPumaServerName(), context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos())); } } } try { Thread.sleep(((retryCount % 15) + 1) * 300); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); throw new SenderException("Interrupted", e1); } } } } protected abstract void doSend(ChangedEvent event, PumaContext context) throws SenderException;}AbstractSender声明实现了Sender接口,其send方法通过while循环执行doSend(event, context)方法,出现Exception时,在retryCount没有大于maxTryTimes时则sleep((retryCount % 15) + 1) * 300之后再次重试FileDumpSenderpuma/puma/src/main/java/com/dianping/puma/sender/FileDumpSender.java ...

June 7, 2020 · 3 min · jiezi

聊聊puma的Dispatcher

序本文主要研究一下puma的Dispatcher Dispatcherpuma/puma/src/main/java/com/dianping/puma/sender/dispatcher/Dispatcher.java public interface Dispatcher extends LifeCycle { String getName(); void dispatch(ChangedEvent event, PumaContext context) throws DispatcherException; List<Sender> getSenders();}Dispatcher定义了getName、dispatch、getSenders方法AbstractDispatcherpuma/puma/src/main/java/com/dianping/puma/sender/dispatcher/AbstractDispatcher.java public abstract class AbstractDispatcher implements Dispatcher { private String name; /* * (non-Javadoc) * * @see com.dianping.puma.common.LifeCycle#start() */ @Override public void start() { } /* * (non-Javadoc) * * @see com.dianping.puma.common.LifeCycle#stop() */ @Override public void stop() { } /* * (non-Javadoc) * * @see com.dianping.puma.sender.dispatcher.Dispatcher#getName() */ @Override public String getName() { return name; } public void setName(String name) { this.name = name; } protected void throwExceptionIfNeeded(List<Throwable> exceptionList) throws DispatcherException { if (exceptionList != null && !exceptionList.isEmpty()) { StringWriter buffer = new StringWriter(); PrintWriter out = null; try { out = new PrintWriter(buffer); for (Throwable exception : exceptionList) { exception.printStackTrace(out); } } finally { if (out != null) { out.close(); } } throw new DispatcherException(buffer.toString()); } }}AbstractDispatcher定义了throwExceptionIfNeeded方法,它将exceptionList转换为DispatcherExceptionSimpleDispatcherImplpuma/puma/src/main/java/com/dianping/puma/sender/dispatcher/SimpleDispatcherImpl.java ...

June 6, 2020 · 2 min · jiezi

聊聊puma的ChangedEvent

序本文主要研究一下puma的ChangedEvent Eventpuma/core/src/main/java/com/dianping/puma/core/event/Event.java public abstract class Event implements Serializable { private static final long serialVersionUID = 7986284681273254505L; private long seq; public void setSeq(long seq) { this.seq = seq; } public long getSeq() { return seq; } public abstract BinlogInfo getBinlogInfo(); public abstract EventType getEventType();}Event定义了getBinlogInfo、getEventType抽象方法ChangedEventpuma/core/src/main/java/com/dianping/puma/core/event/ChangedEvent.java public abstract class ChangedEvent extends Event implements Serializable { private static final long serialVersionUID = -2358086827502066009L; protected long executeTime; protected String database; protected String table; protected long serverId; protected BinlogInfo binlogInfo; //......}ChangedEvent定义了executeTime、database、table、serverId、binlogInfo属性DdlEventpuma/core/src/main/java/com/dianping/puma/core/event/DdlEvent.java ...

June 6, 2020 · 2 min · jiezi

聊聊puma的DefaultDataHandler

序本文主要研究一下puma的DefaultDataHandler DataHandlerpuma/puma/src/main/java/com/dianping/puma/datahandler/DataHandler.java public interface DataHandler extends LifeCycle { DataHandlerResult process(BinlogEvent binlogEvent, PumaContext context);}DataHandler继承了LifeCycle,它定义了process方法AbstractDataHandlerpuma/puma/src/main/java/com/dianping/puma/datahandler/AbstractDataHandler.java @ThreadUnSafepublic abstract class AbstractDataHandler implements DataHandler { private static final Logger log = Logger.getLogger(AbstractDataHandler.class); private TableMetaInfoFetcher tableMetasInfoFetcher; /** * @return the tableMetasInfoFetcher */ public TableMetaInfoFetcher getTableMetasInfoFetcher() { return tableMetasInfoFetcher; } /** * @param tableMetasInfoFetcher the tableMetasInfoFetcher to set */ public void setTableMetasInfoFetcher(TableMetaInfoFetcher tableMetasInfoFetcher) { this.tableMetasInfoFetcher = tableMetasInfoFetcher; } @Override public void start() { } @Override public void stop() { } protected Object convertUnsignedValueIfNeeded(int pos, Object value, TableMetaInfo tableMeta) { Object newValue = value; if (value != null) { switch (tableMeta.getRawTypeCodes().get(pos)) { case BinlogConstants.MYSQL_TYPE_TINY: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Integer.valueOf((Integer) value + (1 << 8)); } break; case BinlogConstants.MYSQL_TYPE_INT24: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Integer.valueOf((Integer) value + (1 << 24)); } break; case BinlogConstants.MYSQL_TYPE_SHORT: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Integer.valueOf((Integer) value + (1 << 16)); } break; case BinlogConstants.MYSQL_TYPE_INT: if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = Long.valueOf((Integer) value) + (1L << 32); } else { if (value instanceof Integer) { newValue = Long.valueOf((Integer) value); } } break; case BinlogConstants.MYSQL_TYPE_LONGLONG: if ((value instanceof Long) && (Long) value < 0 && !tableMeta.getSignedInfos().get(pos)) { newValue = BigInteger.valueOf((Long) value).add(BigInteger.ONE.shiftLeft(64)); } else { if (value instanceof Long) { newValue = BigInteger.valueOf((Long) value); } } break; default: break; } } return newValue; } @Override public DataHandlerResult process(BinlogEvent binlogEvent, PumaContext context) { DataHandlerResult result = new DataHandlerResult(); if (binlogEvent instanceof PumaIgnoreEvent) { log.info("Ingore one unknown event. eventType: " + binlogEvent.getHeader().getEventType()); result.setEmpty(true); result.setFinished(true); return result; } byte eventType = binlogEvent.getHeader().getEventType(); if (log.isDebugEnabled()) { log.debug("event#" + eventType); } if (eventType == BinlogConstants.STOP_EVENT || eventType == BinlogConstants.ROTATE_EVENT) { result.setEmpty(true); result.setFinished(true); } else if (eventType == BinlogConstants.FORMAT_DESCRIPTION_EVENT) { result.setEmpty(true); result.setFinished(true); } else if (eventType == BinlogConstants.QUERY_EVENT) { handleQueryEvent(binlogEvent, result); } else { doProcess(result, binlogEvent, context, eventType); } if (result != null && !result.isEmpty() && result.getData() != null) { BinlogInfo binlogInfo = new BinlogInfo(context.getDBServerId(), context.getBinlogFileName(), context.getBinlogStartPos(), context.getEventIndex(), binlogEvent.getHeader().getTimestamp()); result.getData().setBinlogInfo(binlogInfo); result.getData().setServerId(binlogEvent.getHeader().getServerId()); } return result; } protected void handleQueryEvent(BinlogEvent binlogEvent, DataHandlerResult result) { QueryEvent queryEvent = (QueryEvent) binlogEvent; String sql = StringUtils.normalizeSpace(queryEvent.getSql()); if (StringUtils.startsWithIgnoreCase(sql, "ALTER ") || StringUtils.startsWithIgnoreCase(sql, "CREATE ") || StringUtils.startsWithIgnoreCase(sql, "DROP ") || StringUtils.startsWithIgnoreCase(sql, "RENAME ") || StringUtils.startsWithIgnoreCase(sql, "TRUNCATE ")) { handleDDlEvent(result, queryEvent, sql); } else if (StringUtils.equalsIgnoreCase(sql, "BEGIN")) { handleTransactionBeginEvent(binlogEvent, result, queryEvent); } else { result.setEmpty(true); result.setFinished(true); // log.info("QueryEvent sql=" + queryEvent.getSql()); } } protected void handleTransactionBeginEvent(BinlogEvent binlogEvent, DataHandlerResult result, QueryEvent queryEvent) { // BEGIN事件,发送一个begin transaction的事件 ChangedEvent dataChangedEvent = new RowChangedEvent(); ((RowChangedEvent) dataChangedEvent).setTransactionBegin(true); dataChangedEvent.setExecuteTime(binlogEvent.getHeader().getTimestamp()); dataChangedEvent.setDatabase(queryEvent.getDatabaseName()); result.setData(dataChangedEvent); result.setEmpty(false); result.setFinished(true); } /** * @param result * @param queryEvent * @param sql */ protected void handleDDlEvent(DataHandlerResult result, QueryEvent queryEvent, String sql) { ChangedEvent dataChangedEvent = new DdlEvent(); DdlEvent ddlEvent = (DdlEvent) dataChangedEvent; ddlEvent.setSql(sql); ddlEvent.setDdlEventType(SimpleDdlParser.getEventType(sql)); ddlEvent.setDdlEventSubType(SimpleDdlParser.getEventSubType(ddlEvent.getDdlEventType(), sql)); if (ddlEvent.getDdlEventType() == DdlEventType.DDL_DEFAULT || ddlEvent.getDdlEventSubType() == DdlEventSubType.DDL_SUB_DEFAULT) { log.info("DdlEvent Type do not found. ddl sql=" + sql); } SimpleDdlParser.DdlResult ddlResult = SimpleDdlParser .getDdlResult(ddlEvent.getDdlEventType(), ddlEvent.getDdlEventSubType(), sql); if (ddlResult != null) { ddlEvent.setDatabase(StringUtils.isNotBlank(ddlResult.getDatabase()) ? ddlResult.getDatabase() : StringUtils.EMPTY); ddlEvent.setTable(StringUtils.isNotBlank(ddlResult.getTable()) ? ddlResult.getTable() : StringUtils.EMPTY); if (ddlEvent.getDdlEventType() != DdlEventType.DDL_CREATE) { log.info("DDL event, sql=" + sql + " ,database =" + ddlResult.getDatabase() + " table =" + ddlResult.getTable() + " queryEvent.getDatabaseName()" + queryEvent.getDatabaseName()); } } if (StringUtils.isBlank(ddlEvent.getDatabase())) { ddlEvent.setDatabase(queryEvent.getDatabaseName()); } if (ddlEvent.getDdlEventType() == DdlEventType.DDL_ALTER && ddlEvent.getDdlEventSubType() == DdlEventSubType.DDL_ALTER_TABLE) { ddlEvent.setDDLType(DDLType.ALTER_TABLE); } tableMetasInfoFetcher.refreshTableMeta(ddlEvent.getDatabase(), ddlEvent.getTable()); ddlEvent.setExecuteTime(queryEvent.getHeader().getTimestamp()); result.setData(dataChangedEvent); result.setEmpty(false); result.setFinished(true); } protected abstract void doProcess(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context, byte eventType);}AbstractDataHandler声明实现了DataHandler接口,其process方法针对BinlogConstants.QUERY_EVENT执行handleQueryEvent,非STOP_EVENT、ROTATE_EVENT、FORMAT_DESCRIPTION_EVENT的event执行doProcess,该方法由子类去实现;handleQueryEvent方法针对ALTER、CREATE、DROP、RENAME、TRUNCATE的执行handleDDlEvent方法,针对BEGIN的执行handleTransactionBeginEvent方法;handleDDlEvent方法会执行tableMetasInfoFetcher.refreshTableMeta;handleTransactionBeginEvent方法标记result的finished为trueDefaultDataHandlerpuma/puma/src/main/java/com/dianping/puma/datahandler/DefaultDataHandler.java ...

June 4, 2020 · 6 min · jiezi