乐趣区

聊聊puma的Sender

本文主要研究一下 puma 的 Sender

Sender

puma/puma/src/main/java/com/dianping/puma/sender/Sender.java

public interface Sender extends LifeCycle {String getName();

    void send(ChangedEvent event, PumaContext context) throws SenderException;
}
  • Sender 定义了 getName、send 方法

AbstractSender

puma/puma/src/main/java/com/dianping/puma/sender/AbstractSender.java

public abstract class AbstractSender implements Sender {protected static final Logger LOG = LoggerFactory.getLogger(AbstractSender.class);

    private String name;

    private int maxTryTimes = 3;

    private boolean canMissEvent = false;

    private volatile boolean stopped = true;

    private final String MSG_SKIP = "[Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d]";

    private final String MSG_LOOP_FAILED = "[Can't Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] ";

    /**
     * @return the stop
     */
    public boolean isStop() {return stopped;}

    /**
     * @return the maxTryTimes
     */
    public int getMaxTryTimes() {return maxTryTimes;}

    /**
     * @param maxTryTimes the maxTryTimes to set
     */
    public void setMaxTryTimes(int maxTryTimes) {this.maxTryTimes = maxTryTimes;}

    /**
     * @return the canMissEvent
     */
    public boolean isCanMissEvent() {return canMissEvent;}

    /**
     * @param canMissEvent the canMissEvent to set
     */
    public void setCanMissEvent(boolean canMissEvent) {this.canMissEvent = canMissEvent;}

    /*
     * (non-Javadoc)
     *
     * @see com.dianping.puma.common.LifeCycle#start()
     */
    @Override
    public void start() {stopped = false;}

    /*
     * (non-Javadoc)
     *
     * @see com.dianping.puma.common.LifeCycle#stop()
     */
    @Override
    public void stop() {stopped = true;}

    /*
     * (non-Javadoc)
     *
     * @see com.dianping.puma.sender.Sender#getName()
     */
    @Override
    public String getName() {return name;}

    public void setName(String name) {this.name = name;}

    @Override
    public void send(ChangedEvent event, PumaContext context) throws SenderException {
        long retryCount = 0;

        while (true) {if (isStop()) {break;}

            try {doSend(event, context);
                break;
            } catch (Exception e) {LOG.error("Send error!", e);

                if (retryCount++ > maxTryTimes) {if (canMissEvent) {LOG.error(String.format(MSG_SKIP, maxTryTimes, context.getPumaServerName(),
                                context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos()));
                        return;
                    } else {if (retryCount % 100 == 0) {LOG.error(String.format(MSG_LOOP_FAILED, maxTryTimes, context.getPumaServerName(),
                                    context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos()));
                        }
                    }
                }

                try {Thread.sleep(((retryCount % 15) + 1) * 300);
                } catch (InterruptedException e1) {Thread.currentThread().interrupt();
                    throw new SenderException("Interrupted", e1);
                }
            }
        }
    }

    protected abstract void doSend(ChangedEvent event, PumaContext context) throws SenderException;
}
  • AbstractSender 声明实现了 Sender 接口,其 send 方法通过 while 循环执行 doSend(event, context) 方法,出现 Exception 时,在 retryCount 没有大于 maxTryTimes 时则 sleep((retryCount % 15) + 1) * 300 之后再次重试

FileDumpSender

puma/puma/src/main/java/com/dianping/puma/sender/FileDumpSender.java

public class FileDumpSender extends AbstractSender {private Map<String, WriteChannel> writeChannels = new ConcurrentHashMap<String, WriteChannel>();

    private ChangedEvent transactionBegin;

    private EventFilterChain storageEventFilterChain;

    @Override
    public void start() {super.start();
    }

    @Override
    public void stop() {for (WriteChannel channel : writeChannels.values()) {channel.stop();
        }
        super.stop();}

    @Override
    protected void doSend(ChangedEvent event, PumaContext context) throws SenderException {
        // Storage filter.
        storageEventFilterChain.reset();
        if (!storageEventFilterChain.doNext(event)) {return;}

        try {String database = event.getDatabase();

            if (database != null && database.length() > 0) {WriteChannel writeChannel = this.writeChannels.get(database);

                if (writeChannel == null) {writeChannel = buildEventStorage(database);
                    this.writeChannels.put(database, writeChannel);
                }

                boolean isTransactionBegin = false;

                if (event instanceof RowChangedEvent) {isTransactionBegin = ((RowChangedEvent) event).isTransactionBegin();}

                if (transactionBegin != null && !isTransactionBegin) {//readChannel.store(transactionBegin);
                    transactionBegin = null;
                }


                writeChannel.append(event);
            } else {if (event instanceof RowChangedEvent) {if (((RowChangedEvent) event).isTransactionBegin()) {transactionBegin = event;} else {Cat.logEvent("Puma", "RowChangeEvent-Has-No-Database");
                        LOG.error(String.format("RowChangeEvent[%s] has no database", event.toString()));
                    }
                } else {Cat.logEvent("Puma", "ChangeEvent-Has-No-Database");
                    LOG.error(String.format("ChangeEvent[%s] has no database", event.toString()));
                }
            }
        } catch (IOException e) {e.printStackTrace();
        }
    }

    private WriteChannel buildEventStorage(String database) {WriteChannel writeChannel = ChannelFactory.newWriteChannel(database);
        writeChannel.start();
        return writeChannel;
    }

    public void setStorageEventFilterChain(EventFilterChain storageEventFilterChain) {this.storageEventFilterChain = storageEventFilterChain;}
}
  • FileDumpSender 继承了 AbstractSender,它定义了 writeChannels 属性,其 stop 方法会遍历 writeChannels,挨个执行 channel.stop();其 doSend 方法先执行 storageEventFilterChain.doNext(event),返回 false 的话则直接返回,之后获取或者创建指定 database 的 writeChannel,执行 writeChannel.append(event);对于 database 为 null 的则判断是否是 RowChangedEvent,是的话,在 RowChangedEvent 的 isTransactionBegin 时设置 transactionBegin

小结

Sender 定义了 getName、send 方法;AbstractSender 声明实现了 Sender 接口,其 send 方法通过 while 循环执行 doSend(event, context) 方法,出现 Exception 时,在 retryCount 没有大于 maxTryTimes 时则 sleep((retryCount % 15) + 1) * 300 之后再次重试

doc

  • Sender
退出移动版