序
本文主要研究一下 puma 的 Sender
Sender
puma/puma/src/main/java/com/dianping/puma/sender/Sender.java
public interface Sender extends LifeCycle {String getName();
void send(ChangedEvent event, PumaContext context) throws SenderException;
}
- Sender 定义了 getName、send 方法
AbstractSender
puma/puma/src/main/java/com/dianping/puma/sender/AbstractSender.java
public abstract class AbstractSender implements Sender {protected static final Logger LOG = LoggerFactory.getLogger(AbstractSender.class);
private String name;
private int maxTryTimes = 3;
private boolean canMissEvent = false;
private volatile boolean stopped = true;
private final String MSG_SKIP = "[Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d]";
private final String MSG_LOOP_FAILED = "[Can't Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] ";
/**
* @return the stop
*/
public boolean isStop() {return stopped;}
/**
* @return the maxTryTimes
*/
public int getMaxTryTimes() {return maxTryTimes;}
/**
* @param maxTryTimes the maxTryTimes to set
*/
public void setMaxTryTimes(int maxTryTimes) {this.maxTryTimes = maxTryTimes;}
/**
* @return the canMissEvent
*/
public boolean isCanMissEvent() {return canMissEvent;}
/**
* @param canMissEvent the canMissEvent to set
*/
public void setCanMissEvent(boolean canMissEvent) {this.canMissEvent = canMissEvent;}
/*
* (non-Javadoc)
*
* @see com.dianping.puma.common.LifeCycle#start()
*/
@Override
public void start() {stopped = false;}
/*
* (non-Javadoc)
*
* @see com.dianping.puma.common.LifeCycle#stop()
*/
@Override
public void stop() {stopped = true;}
/*
* (non-Javadoc)
*
* @see com.dianping.puma.sender.Sender#getName()
*/
@Override
public String getName() {return name;}
public void setName(String name) {this.name = name;}
@Override
public void send(ChangedEvent event, PumaContext context) throws SenderException {
long retryCount = 0;
while (true) {if (isStop()) {break;}
try {doSend(event, context);
break;
} catch (Exception e) {LOG.error("Send error!", e);
if (retryCount++ > maxTryTimes) {if (canMissEvent) {LOG.error(String.format(MSG_SKIP, maxTryTimes, context.getPumaServerName(),
context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos()));
return;
} else {if (retryCount % 100 == 0) {LOG.error(String.format(MSG_LOOP_FAILED, maxTryTimes, context.getPumaServerName(),
context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos()));
}
}
}
try {Thread.sleep(((retryCount % 15) + 1) * 300);
} catch (InterruptedException e1) {Thread.currentThread().interrupt();
throw new SenderException("Interrupted", e1);
}
}
}
}
protected abstract void doSend(ChangedEvent event, PumaContext context) throws SenderException;
}
- AbstractSender 声明实现了 Sender 接口,其 send 方法通过 while 循环执行 doSend(event, context) 方法,出现 Exception 时,在 retryCount 没有大于 maxTryTimes 时则 sleep
((retryCount % 15) + 1) * 300
之后再次重试
FileDumpSender
puma/puma/src/main/java/com/dianping/puma/sender/FileDumpSender.java
public class FileDumpSender extends AbstractSender {private Map<String, WriteChannel> writeChannels = new ConcurrentHashMap<String, WriteChannel>();
private ChangedEvent transactionBegin;
private EventFilterChain storageEventFilterChain;
@Override
public void start() {super.start();
}
@Override
public void stop() {for (WriteChannel channel : writeChannels.values()) {channel.stop();
}
super.stop();}
@Override
protected void doSend(ChangedEvent event, PumaContext context) throws SenderException {
// Storage filter.
storageEventFilterChain.reset();
if (!storageEventFilterChain.doNext(event)) {return;}
try {String database = event.getDatabase();
if (database != null && database.length() > 0) {WriteChannel writeChannel = this.writeChannels.get(database);
if (writeChannel == null) {writeChannel = buildEventStorage(database);
this.writeChannels.put(database, writeChannel);
}
boolean isTransactionBegin = false;
if (event instanceof RowChangedEvent) {isTransactionBegin = ((RowChangedEvent) event).isTransactionBegin();}
if (transactionBegin != null && !isTransactionBegin) {//readChannel.store(transactionBegin);
transactionBegin = null;
}
writeChannel.append(event);
} else {if (event instanceof RowChangedEvent) {if (((RowChangedEvent) event).isTransactionBegin()) {transactionBegin = event;} else {Cat.logEvent("Puma", "RowChangeEvent-Has-No-Database");
LOG.error(String.format("RowChangeEvent[%s] has no database", event.toString()));
}
} else {Cat.logEvent("Puma", "ChangeEvent-Has-No-Database");
LOG.error(String.format("ChangeEvent[%s] has no database", event.toString()));
}
}
} catch (IOException e) {e.printStackTrace();
}
}
private WriteChannel buildEventStorage(String database) {WriteChannel writeChannel = ChannelFactory.newWriteChannel(database);
writeChannel.start();
return writeChannel;
}
public void setStorageEventFilterChain(EventFilterChain storageEventFilterChain) {this.storageEventFilterChain = storageEventFilterChain;}
}
- FileDumpSender 继承了 AbstractSender,它定义了 writeChannels 属性,其 stop 方法会遍历 writeChannels,挨个执行 channel.stop();其 doSend 方法先执行 storageEventFilterChain.doNext(event),返回 false 的话则直接返回,之后获取或者创建指定 database 的 writeChannel,执行 writeChannel.append(event);对于 database 为 null 的则判断是否是 RowChangedEvent,是的话,在 RowChangedEvent 的 isTransactionBegin 时设置 transactionBegin
小结
Sender 定义了 getName、send 方法;AbstractSender 声明实现了 Sender 接口,其 send 方法通过 while 循环执行 doSend(event, context) 方法,出现 Exception 时,在 retryCount 没有大于 maxTryTimes 时则 sleep((retryCount % 15) + 1) * 300
之后再次重试
doc
- Sender