序
本文次要钻研一下 cheddar 的 tx
MessageAction
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessageAction.java
public class MessageAction {
private final TypedMessage typedMessage;
private final int delaySeconds;
public MessageAction(final TypedMessage typedMessage, final int delaySeconds) {
this.typedMessage = typedMessage;
this.delaySeconds = delaySeconds;
}
public TypedMessage message() {return typedMessage;}
public int delay() {return delaySeconds;}
public void apply(final MessageSender<TypedMessage> messageSender) {if (delay() > 0) {messageSender.sendDelayedMessage(typedMessage, delaySeconds);
} else {messageSender.send(typedMessage);
}
}
public void apply(final MessagePublisher<TypedMessage> messagePublisher) {messagePublisher.publish(typedMessage);
}
}
MessageAction 定义了 typedMessage、delaySeconds 属性,它提供了两个 apply 办法,接管 messageSender 参数的 apply 办法当 delay 大于 0 时执行 messageSender.sendDelayedMessage,否则执行 messageSender.send(typedMessage);接管 messagePublisher 参数的 apply 办法执行 messagePublisher.publish
MessageSender
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java
public interface MessageSender<T extends Message> {
/**
* Send a message
* @param message Message to send
* @throws MessageSendException
*/
void send(T message) throws MessageSendException;
/**
* Send a message, where the message is not visible to receivers for the specified delay duration
* @param message Message to send
* @param delaySeconds Duration for which sent message is invisible to receivers
* @throws MessageSendException
*/
void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;
}
MessageSender 接口定义了 send、sendDelayedMessage 办法
TransactionalResource
Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/TransactionalResource.java
public interface TransactionalResource {void begin() throws TransactionException;
void commit() throws TransactionException;
void abort() throws TransactionException;}
TransactionalResource 接口定义了 begin、commit、abort 办法
TransactionalMessageSender
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessageSender.java
public class TransactionalMessageSender implements MessageSender<TypedMessage>, TransactionalResource {private final Logger logger = LoggerFactory.getLogger(getClass());
private final MessageSender<TypedMessage> messageSender;
private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();
public TransactionalMessageSender(final MessageSender<TypedMessage> messageSender) {this.messageSender = messageSender;}
private MessagingTransaction getCurrentTransaction() {if (currentTransaction.get() == null) {throw new NonExistentTransactionException();
}
return currentTransaction.get();}
@Override
public void begin() throws TransactionException {if (currentTransaction.get() != null) {throw new NestedTransactionException(currentTransaction.get());
}
currentTransaction.set(new MessagingTransaction());
logger.trace("Beginning transaction:" + currentTransaction.get().transactionId());
}
@Override
public void commit() throws TransactionException {final MessagingTransaction transaction = getCurrentTransaction();
logger.trace("Committing transaction:" + transaction.transactionId());
transaction.applyActions(messageSender);
currentTransaction.remove();
logger.trace("Transaction successfully committed:" + transaction.transactionId());
}
@Override
public void send(final TypedMessage typedMessage) throws MessageSendException {final MessagingTransaction transaction = getCurrentTransaction();
transaction.addMessage(typedMessage);
}
@Override
public void sendDelayedMessage(final TypedMessage typedMessage, final int delay) throws MessageSendException {final MessagingTransaction transaction = getCurrentTransaction();
transaction.addDelayedMessage(typedMessage, delay);
}
@Override
public void abort() throws TransactionException {currentTransaction.remove();
}
}
TransactionalMessageSender 实现了 MessageSender、TransactionalResource 接口;begin 办法给 currentTransaction 设置新的 MessagingTransaction;commit 办法获取 MessagingTransaction,执行 applyActions 办法,最初执行 currentTransaction.remove();abort 办法执行 currentTransaction.remove() 办法;send 办法执行 transaction.addMessage;sendDelayedMessage 办法执行 addDelayedMessage
MessagePublisher
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessagePublisher.java
public interface MessagePublisher<T extends Message> {
/**
* Forward a message for publication
* @param message
* @throws MessagePublishException
*/
void publish(T message) throws MessagePublishException;
}
MessagePublisher 接口定义了 publish 办法
TransactionalMessagePublisher
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessagePublisher.java
public class TransactionalMessagePublisher implements MessagePublisher<TypedMessage>, TransactionalResource {private final Logger logger = LoggerFactory.getLogger(getClass());
private final MessagePublisher<TypedMessage> messagePublisher;
private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();
public TransactionalMessagePublisher(final MessagePublisher<TypedMessage> messagePublisher) {this.messagePublisher = messagePublisher;}
private MessagingTransaction getCurrentTransaction() {if (currentTransaction.get() == null) {throw new NonExistentTransactionException();
}
return currentTransaction.get();}
@Override
public void begin() throws TransactionException {if (currentTransaction.get() != null) {throw new NestedTransactionException(currentTransaction.get());
}
currentTransaction.set(new MessagingTransaction());
logger.trace("Beginning transaction:" + currentTransaction.get().transactionId());
}
@Override
public void commit() throws TransactionException {final MessagingTransaction transaction = getCurrentTransaction();
logger.trace("Committing transaction:" + transaction.transactionId());
transaction.applyActions(messagePublisher);
currentTransaction.remove();
logger.trace("Transaction successfully committed:" + transaction.transactionId());
}
@Override
public void publish(final TypedMessage typedMessage) throws MessagePublishException {final MessagingTransaction transaction = getCurrentTransaction();
transaction.addMessage(typedMessage);
}
@Override
public void abort() throws TransactionException {currentTransaction.remove();
}
}
TransactionalMessagePublisher 实现了 MessagePublisher、TransactionalResource 接口;begin 办法给 currentTransaction 设置新的 MessagingTransaction;commit 办法获取 MessagingTransaction,执行 applyActions 办法,最初执行 currentTransaction.remove();abort 办法执行 currentTransaction.remove() 办法;publish 办法执行 transaction.addMessage
Transaction
Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/Transaction.java
public interface Transaction {String transactionId();
}
Transaction 接口定义了 transactionId 办法
MessagingTransaction
Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessagingTransaction.java
public class MessagingTransaction implements Transaction {
private final Queue<MessageAction> messageActions;
private final String transactionId;
public MessagingTransaction() {messageActions = new LinkedList<>();
transactionId = UUID.randomUUID().toString();
}
@Override
public String transactionId() {return transactionId;}
public void applyActions(final MessagePublisher<TypedMessage> messagePublisher) {while (!messageActions.isEmpty()) {final MessageAction messageAction = messageActions.remove();
messagePublisher.publish(messageAction.message());
}
}
public void applyActions(final MessageSender<TypedMessage> messageSender) {while (!messageActions.isEmpty()) {final MessageAction messageAction = messageActions.remove();
messageAction.apply(messageSender);
}
}
public void addMessage(final TypedMessage typedMessage) {messageActions.add(new MessageAction(typedMessage, 0));
}
public void addDelayedMessage(final TypedMessage typedMessage, final int delay) {messageActions.add(new MessageAction(typedMessage, delay));
}
}
MessagingTransaction 办法实现了 Transaction 接口;其 transactionId 办法返回的是结构器生成的 UUID;applyActions 办法遍历 messageAction,别离执行 messagePublisher.publish 及 messageAction.apply(messageSender)
小结
cheddar 的 tx 提供了 TransactionalMessagePublisher、TransactionalMessageSender,它们都实现了 TransactionalResource 接口;其 commit 办法都执行了 transaction.applyActions;MessageAction 提供了两个 apply 办法,接管 messageSender 参数的 apply 办法当 delay 大于 0 时执行 messageSender.sendDelayedMessage,否则执行 messageSender.send(typedMessage);接管 messagePublisher 参数的 apply 办法执行 messagePublisher.publish。
doc
- Cheddar