序
本文主要研究一下puma的Sender
Sender
puma/puma/src/main/java/com/dianping/puma/sender/Sender.java
public interface Sender extends LifeCycle { String getName(); void send(ChangedEvent event, PumaContext context) throws SenderException;}
- Sender定义了getName、send方法
AbstractSender
puma/puma/src/main/java/com/dianping/puma/sender/AbstractSender.java
public abstract class AbstractSender implements Sender { protected static final Logger LOG = LoggerFactory.getLogger(AbstractSender.class); private String name; private int maxTryTimes = 3; private boolean canMissEvent = false; private volatile boolean stopped = true; private final String MSG_SKIP = "[Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] "; private final String MSG_LOOP_FAILED = "[Can't Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] "; /** * @return the stop */ public boolean isStop() { return stopped; } /** * @return the maxTryTimes */ public int getMaxTryTimes() { return maxTryTimes; } /** * @param maxTryTimes the maxTryTimes to set */ public void setMaxTryTimes(int maxTryTimes) { this.maxTryTimes = maxTryTimes; } /** * @return the canMissEvent */ public boolean isCanMissEvent() { return canMissEvent; } /** * @param canMissEvent the canMissEvent to set */ public void setCanMissEvent(boolean canMissEvent) { this.canMissEvent = canMissEvent; } /* * (non-Javadoc) * * @see com.dianping.puma.common.LifeCycle#start() */ @Override public void start() { stopped = false; } /* * (non-Javadoc) * * @see com.dianping.puma.common.LifeCycle#stop() */ @Override public void stop() { stopped = true; } /* * (non-Javadoc) * * @see com.dianping.puma.sender.Sender#getName() */ @Override public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public void send(ChangedEvent event, PumaContext context) throws SenderException { long retryCount = 0; while (true) { if (isStop()) { break; } try { doSend(event, context); break; } catch (Exception e) { LOG.error("Send error!", e); if (retryCount++ > maxTryTimes) { if (canMissEvent) { LOG.error(String.format(MSG_SKIP, maxTryTimes, context.getPumaServerName(), context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos())); return; } else { if (retryCount % 100 == 0) { LOG.error(String.format(MSG_LOOP_FAILED, maxTryTimes, context.getPumaServerName(), context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos())); } } } try { Thread.sleep(((retryCount % 15) + 1) * 300); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); throw new SenderException("Interrupted", e1); } } } } protected abstract void doSend(ChangedEvent event, PumaContext context) throws SenderException;}
- AbstractSender声明实现了Sender接口,其send方法通过while循环执行doSend(event, context)方法,出现Exception时,在retryCount没有大于maxTryTimes时则sleep
((retryCount % 15) + 1) * 300
之后再次重试
FileDumpSender
puma/puma/src/main/java/com/dianping/puma/sender/FileDumpSender.java
public class FileDumpSender extends AbstractSender { private Map<String, WriteChannel> writeChannels = new ConcurrentHashMap<String, WriteChannel>(); private ChangedEvent transactionBegin; private EventFilterChain storageEventFilterChain; @Override public void start() { super.start(); } @Override public void stop() { for (WriteChannel channel : writeChannels.values()) { channel.stop(); } super.stop(); } @Override protected void doSend(ChangedEvent event, PumaContext context) throws SenderException { // Storage filter. storageEventFilterChain.reset(); if (!storageEventFilterChain.doNext(event)) { return; } try { String database = event.getDatabase(); if (database != null && database.length() > 0) { WriteChannel writeChannel = this.writeChannels.get(database); if (writeChannel == null) { writeChannel = buildEventStorage(database); this.writeChannels.put(database, writeChannel); } boolean isTransactionBegin = false; if (event instanceof RowChangedEvent) { isTransactionBegin = ((RowChangedEvent) event).isTransactionBegin(); } if (transactionBegin != null && !isTransactionBegin) { //readChannel.store(transactionBegin); transactionBegin = null; } writeChannel.append(event); } else { if (event instanceof RowChangedEvent) { if (((RowChangedEvent) event).isTransactionBegin()) { transactionBegin = event; } else { Cat.logEvent("Puma", "RowChangeEvent-Has-No-Database"); LOG.error(String.format("RowChangeEvent[%s] has no database", event.toString())); } } else { Cat.logEvent("Puma", "ChangeEvent-Has-No-Database"); LOG.error(String.format("ChangeEvent[%s] has no database", event.toString())); } } } catch (IOException e) { e.printStackTrace(); } } private WriteChannel buildEventStorage(String database) { WriteChannel writeChannel = ChannelFactory.newWriteChannel(database); writeChannel.start(); return writeChannel; } public void setStorageEventFilterChain(EventFilterChain storageEventFilterChain) { this.storageEventFilterChain = storageEventFilterChain; }}
- FileDumpSender继承了AbstractSender,它定义了writeChannels属性,其stop方法会遍历writeChannels,挨个执行channel.stop();其doSend方法先执行storageEventFilterChain.doNext(event),返回false的话则直接返回,之后获取或者创建指定database的writeChannel,执行writeChannel.append(event);对于database为null的则判断是否是RowChangedEvent,是的话,在RowChangedEvent的isTransactionBegin时设置transactionBegin
小结
Sender定义了getName、send方法;AbstractSender声明实现了Sender接口,其send方法通过while循环执行doSend(event, context)方法,出现Exception时,在retryCount没有大于maxTryTimes时则sleep((retryCount % 15) + 1) * 300
之后再次重试
doc
- Sender