本文主要研究一下puma的Sender

Sender

puma/puma/src/main/java/com/dianping/puma/sender/Sender.java

public interface Sender extends LifeCycle {    String getName();    void send(ChangedEvent event, PumaContext context) throws SenderException;}
  • Sender定义了getName、send方法

AbstractSender

puma/puma/src/main/java/com/dianping/puma/sender/AbstractSender.java

public abstract class AbstractSender implements Sender {    protected static final Logger LOG = LoggerFactory.getLogger(AbstractSender.class);    private String name;    private int maxTryTimes = 3;    private boolean canMissEvent = false;    private volatile boolean stopped = true;    private final String MSG_SKIP = "[Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] ";    private final String MSG_LOOP_FAILED = "[Can't Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] ";    /**     * @return the stop     */    public boolean isStop() {        return stopped;    }    /**     * @return the maxTryTimes     */    public int getMaxTryTimes() {        return maxTryTimes;    }    /**     * @param maxTryTimes the maxTryTimes to set     */    public void setMaxTryTimes(int maxTryTimes) {        this.maxTryTimes = maxTryTimes;    }    /**     * @return the canMissEvent     */    public boolean isCanMissEvent() {        return canMissEvent;    }    /**     * @param canMissEvent the canMissEvent to set     */    public void setCanMissEvent(boolean canMissEvent) {        this.canMissEvent = canMissEvent;    }    /*     * (non-Javadoc)     *     * @see com.dianping.puma.common.LifeCycle#start()     */    @Override    public void start() {        stopped = false;    }    /*     * (non-Javadoc)     *     * @see com.dianping.puma.common.LifeCycle#stop()     */    @Override    public void stop() {        stopped = true;    }    /*     * (non-Javadoc)     *     * @see com.dianping.puma.sender.Sender#getName()     */    @Override    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    @Override    public void send(ChangedEvent event, PumaContext context) throws SenderException {        long retryCount = 0;        while (true) {            if (isStop()) {                break;            }            try {                doSend(event, context);                break;            } catch (Exception e) {                LOG.error("Send error!", e);                if (retryCount++ > maxTryTimes) {                    if (canMissEvent) {                        LOG.error(String.format(MSG_SKIP, maxTryTimes, context.getPumaServerName(),                                context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos()));                        return;                    } else {                        if (retryCount % 100 == 0) {                            LOG.error(String.format(MSG_LOOP_FAILED, maxTryTimes, context.getPumaServerName(),                                    context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos()));                        }                    }                }                try {                    Thread.sleep(((retryCount % 15) + 1) * 300);                } catch (InterruptedException e1) {                    Thread.currentThread().interrupt();                    throw new SenderException("Interrupted", e1);                }            }        }    }    protected abstract void doSend(ChangedEvent event, PumaContext context) throws SenderException;}
  • AbstractSender声明实现了Sender接口,其send方法通过while循环执行doSend(event, context)方法,出现Exception时,在retryCount没有大于maxTryTimes时则sleep((retryCount % 15) + 1) * 300之后再次重试

FileDumpSender

puma/puma/src/main/java/com/dianping/puma/sender/FileDumpSender.java

public class FileDumpSender extends AbstractSender {    private Map<String, WriteChannel> writeChannels = new ConcurrentHashMap<String, WriteChannel>();    private ChangedEvent transactionBegin;    private EventFilterChain storageEventFilterChain;    @Override    public void start() {        super.start();    }    @Override    public void stop() {        for (WriteChannel channel : writeChannels.values()) {            channel.stop();        }        super.stop();    }    @Override    protected void doSend(ChangedEvent event, PumaContext context) throws SenderException {        // Storage filter.        storageEventFilterChain.reset();        if (!storageEventFilterChain.doNext(event)) {            return;        }        try {            String database = event.getDatabase();            if (database != null && database.length() > 0) {                WriteChannel writeChannel = this.writeChannels.get(database);                if (writeChannel == null) {                    writeChannel = buildEventStorage(database);                    this.writeChannels.put(database, writeChannel);                }                boolean isTransactionBegin = false;                if (event instanceof RowChangedEvent) {                    isTransactionBegin = ((RowChangedEvent) event).isTransactionBegin();                }                if (transactionBegin != null && !isTransactionBegin) {                    //readChannel.store(transactionBegin);                    transactionBegin = null;                }                writeChannel.append(event);            } else {                if (event instanceof RowChangedEvent) {                    if (((RowChangedEvent) event).isTransactionBegin()) {                        transactionBegin = event;                    } else {                        Cat.logEvent("Puma", "RowChangeEvent-Has-No-Database");                        LOG.error(String.format("RowChangeEvent[%s] has no database", event.toString()));                    }                } else {                    Cat.logEvent("Puma", "ChangeEvent-Has-No-Database");                    LOG.error(String.format("ChangeEvent[%s] has no database", event.toString()));                }            }        } catch (IOException e) {            e.printStackTrace();        }    }    private WriteChannel buildEventStorage(String database) {        WriteChannel writeChannel = ChannelFactory.newWriteChannel(database);        writeChannel.start();        return writeChannel;    }    public void setStorageEventFilterChain(EventFilterChain storageEventFilterChain) {        this.storageEventFilterChain = storageEventFilterChain;    }}
  • FileDumpSender继承了AbstractSender,它定义了writeChannels属性,其stop方法会遍历writeChannels,挨个执行channel.stop();其doSend方法先执行storageEventFilterChain.doNext(event),返回false的话则直接返回,之后获取或者创建指定database的writeChannel,执行writeChannel.append(event);对于database为null的则判断是否是RowChangedEvent,是的话,在RowChangedEvent的isTransactionBegin时设置transactionBegin

小结

Sender定义了getName、send方法;AbstractSender声明实现了Sender接口,其send方法通过while循环执行doSend(event, context)方法,出现Exception时,在retryCount没有大于maxTryTimes时则sleep((retryCount % 15) + 1) * 300之后再次重试

doc

  • Sender