序
本文次要钻研一下cheddar的tx
MessageAction
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessageAction.java
public class MessageAction { private final TypedMessage typedMessage; private final int delaySeconds; public MessageAction(final TypedMessage typedMessage, final int delaySeconds) { this.typedMessage = typedMessage; this.delaySeconds = delaySeconds; } public TypedMessage message() { return typedMessage; } public int delay() { return delaySeconds; } public void apply(final MessageSender<TypedMessage> messageSender) { if (delay() > 0) { messageSender.sendDelayedMessage(typedMessage, delaySeconds); } else { messageSender.send(typedMessage); } } public void apply(final MessagePublisher<TypedMessage> messagePublisher) { messagePublisher.publish(typedMessage); }}
MessageAction定义了typedMessage、delaySeconds属性,它提供了两个apply办法,接管messageSender参数的apply办法当delay大于0时执行messageSender.sendDelayedMessage,否则执行messageSender.send(typedMessage);接管messagePublisher参数的apply办法执行messagePublisher.publish
MessageSender
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java
public interface MessageSender<T extends Message> { /** * Send a message * @param message Message to send * @throws MessageSendException */ void send(T message) throws MessageSendException; /** * Send a message, where the message is not visible to receivers for the specified delay duration * @param message Message to send * @param delaySeconds Duration for which sent message is invisible to receivers * @throws MessageSendException */ void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;}
MessageSender接口定义了send、sendDelayedMessage办法
TransactionalResource
Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/TransactionalResource.java
public interface TransactionalResource { void begin() throws TransactionException; void commit() throws TransactionException; void abort() throws TransactionException;}
TransactionalResource接口定义了begin、commit、abort办法
TransactionalMessageSender
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessageSender.java
public class TransactionalMessageSender implements MessageSender<TypedMessage>, TransactionalResource { private final Logger logger = LoggerFactory.getLogger(getClass()); private final MessageSender<TypedMessage> messageSender; private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>(); public TransactionalMessageSender(final MessageSender<TypedMessage> messageSender) { this.messageSender = messageSender; } private MessagingTransaction getCurrentTransaction() { if (currentTransaction.get() == null) { throw new NonExistentTransactionException(); } return currentTransaction.get(); } @Override public void begin() throws TransactionException { if (currentTransaction.get() != null) { throw new NestedTransactionException(currentTransaction.get()); } currentTransaction.set(new MessagingTransaction()); logger.trace("Beginning transaction: " + currentTransaction.get().transactionId()); } @Override public void commit() throws TransactionException { final MessagingTransaction transaction = getCurrentTransaction(); logger.trace("Committing transaction: " + transaction.transactionId()); transaction.applyActions(messageSender); currentTransaction.remove(); logger.trace("Transaction successfully committed: " + transaction.transactionId()); } @Override public void send(final TypedMessage typedMessage) throws MessageSendException { final MessagingTransaction transaction = getCurrentTransaction(); transaction.addMessage(typedMessage); } @Override public void sendDelayedMessage(final TypedMessage typedMessage, final int delay) throws MessageSendException { final MessagingTransaction transaction = getCurrentTransaction(); transaction.addDelayedMessage(typedMessage, delay); } @Override public void abort() throws TransactionException { currentTransaction.remove(); }}
TransactionalMessageSender实现了MessageSender、TransactionalResource接口;begin办法给currentTransaction设置新的MessagingTransaction;commit办法获取MessagingTransaction,执行applyActions办法,最初执行currentTransaction.remove();abort办法执行currentTransaction.remove()办法;send办法执行transaction.addMessage;sendDelayedMessage办法执行addDelayedMessage
MessagePublisher
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessagePublisher.java
public interface MessagePublisher<T extends Message> { /** * Forward a message for publication * @param message * @throws MessagePublishException */ void publish(T message) throws MessagePublishException;}
MessagePublisher接口定义了publish办法
TransactionalMessagePublisher
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessagePublisher.java
public class TransactionalMessagePublisher implements MessagePublisher<TypedMessage>, TransactionalResource { private final Logger logger = LoggerFactory.getLogger(getClass()); private final MessagePublisher<TypedMessage> messagePublisher; private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>(); public TransactionalMessagePublisher(final MessagePublisher<TypedMessage> messagePublisher) { this.messagePublisher = messagePublisher; } private MessagingTransaction getCurrentTransaction() { if (currentTransaction.get() == null) { throw new NonExistentTransactionException(); } return currentTransaction.get(); } @Override public void begin() throws TransactionException { if (currentTransaction.get() != null) { throw new NestedTransactionException(currentTransaction.get()); } currentTransaction.set(new MessagingTransaction()); logger.trace("Beginning transaction: " + currentTransaction.get().transactionId()); } @Override public void commit() throws TransactionException { final MessagingTransaction transaction = getCurrentTransaction(); logger.trace("Committing transaction: " + transaction.transactionId()); transaction.applyActions(messagePublisher); currentTransaction.remove(); logger.trace("Transaction successfully committed: " + transaction.transactionId()); } @Override public void publish(final TypedMessage typedMessage) throws MessagePublishException { final MessagingTransaction transaction = getCurrentTransaction(); transaction.addMessage(typedMessage); } @Override public void abort() throws TransactionException { currentTransaction.remove(); }}
TransactionalMessagePublisher实现了MessagePublisher、TransactionalResource接口;begin办法给currentTransaction设置新的MessagingTransaction;commit办法获取MessagingTransaction,执行applyActions办法,最初执行currentTransaction.remove();abort办法执行currentTransaction.remove()办法;publish办法执行transaction.addMessage
Transaction
Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/Transaction.java
public interface Transaction { String transactionId();}
Transaction接口定义了transactionId办法
MessagingTransaction
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessagingTransaction.java
public class MessagingTransaction implements Transaction { private final Queue<MessageAction> messageActions; private final String transactionId; public MessagingTransaction() { messageActions = new LinkedList<>(); transactionId = UUID.randomUUID().toString(); } @Override public String transactionId() { return transactionId; } public void applyActions(final MessagePublisher<TypedMessage> messagePublisher) { while (!messageActions.isEmpty()) { final MessageAction messageAction = messageActions.remove(); messagePublisher.publish(messageAction.message()); } } public void applyActions(final MessageSender<TypedMessage> messageSender) { while (!messageActions.isEmpty()) { final MessageAction messageAction = messageActions.remove(); messageAction.apply(messageSender); } } public void addMessage(final TypedMessage typedMessage) { messageActions.add(new MessageAction(typedMessage, 0)); } public void addDelayedMessage(final TypedMessage typedMessage, final int delay) { messageActions.add(new MessageAction(typedMessage, delay)); }}
MessagingTransaction办法实现了Transaction接口;其transactionId办法返回的是结构器生成的UUID;applyActions办法遍历messageAction,别离执行messagePublisher.publish及messageAction.apply(messageSender)
小结
cheddar的tx提供了TransactionalMessagePublisher、TransactionalMessageSender,它们都实现了TransactionalResource接口;其commit办法都执行了transaction.applyActions;MessageAction提供了两个apply办法,接管messageSender参数的apply办法当delay大于0时执行messageSender.sendDelayedMessage,否则执行messageSender.send(typedMessage);接管messagePublisher参数的apply办法执行messagePublisher.publish。
doc
- Cheddar