本文次要钻研一下cheddar的tx

MessageAction

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessageAction.java

public class MessageAction {    private final TypedMessage typedMessage;    private final int delaySeconds;    public MessageAction(final TypedMessage typedMessage, final int delaySeconds) {        this.typedMessage = typedMessage;        this.delaySeconds = delaySeconds;    }    public TypedMessage message() {        return typedMessage;    }    public int delay() {        return delaySeconds;    }    public void apply(final MessageSender<TypedMessage> messageSender) {        if (delay() > 0) {            messageSender.sendDelayedMessage(typedMessage, delaySeconds);        } else {            messageSender.send(typedMessage);        }    }    public void apply(final MessagePublisher<TypedMessage> messagePublisher) {        messagePublisher.publish(typedMessage);    }}
MessageAction定义了typedMessage、delaySeconds属性,它提供了两个apply办法,接管messageSender参数的apply办法当delay大于0时执行messageSender.sendDelayedMessage,否则执行messageSender.send(typedMessage);接管messagePublisher参数的apply办法执行messagePublisher.publish

MessageSender

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java

public interface MessageSender<T extends Message> {    /**     * Send a message     * @param message Message to send     * @throws MessageSendException     */    void send(T message) throws MessageSendException;    /**     * Send a message, where the message is not visible to receivers for the specified delay duration     * @param message Message to send     * @param delaySeconds Duration for which sent message is invisible to receivers     * @throws MessageSendException     */    void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;}
MessageSender接口定义了send、sendDelayedMessage办法

TransactionalResource

Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/TransactionalResource.java

public interface TransactionalResource {    void begin() throws TransactionException;    void commit() throws TransactionException;    void abort() throws TransactionException;}
TransactionalResource接口定义了begin、commit、abort办法

TransactionalMessageSender

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessageSender.java

public class TransactionalMessageSender implements MessageSender<TypedMessage>, TransactionalResource {    private final Logger logger = LoggerFactory.getLogger(getClass());    private final MessageSender<TypedMessage> messageSender;    private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();    public TransactionalMessageSender(final MessageSender<TypedMessage> messageSender) {        this.messageSender = messageSender;    }    private MessagingTransaction getCurrentTransaction() {        if (currentTransaction.get() == null) {            throw new NonExistentTransactionException();        }        return currentTransaction.get();    }    @Override    public void begin() throws TransactionException {        if (currentTransaction.get() != null) {            throw new NestedTransactionException(currentTransaction.get());        }        currentTransaction.set(new MessagingTransaction());        logger.trace("Beginning transaction: " + currentTransaction.get().transactionId());    }    @Override    public void commit() throws TransactionException {        final MessagingTransaction transaction = getCurrentTransaction();        logger.trace("Committing transaction: " + transaction.transactionId());        transaction.applyActions(messageSender);        currentTransaction.remove();        logger.trace("Transaction successfully committed: " + transaction.transactionId());    }    @Override    public void send(final TypedMessage typedMessage) throws MessageSendException {        final MessagingTransaction transaction = getCurrentTransaction();        transaction.addMessage(typedMessage);    }    @Override    public void sendDelayedMessage(final TypedMessage typedMessage, final int delay) throws MessageSendException {        final MessagingTransaction transaction = getCurrentTransaction();        transaction.addDelayedMessage(typedMessage, delay);    }    @Override    public void abort() throws TransactionException {        currentTransaction.remove();    }}
TransactionalMessageSender实现了MessageSender、TransactionalResource接口;begin办法给currentTransaction设置新的MessagingTransaction;commit办法获取MessagingTransaction,执行applyActions办法,最初执行currentTransaction.remove();abort办法执行currentTransaction.remove()办法;send办法执行transaction.addMessage;sendDelayedMessage办法执行addDelayedMessage

MessagePublisher

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessagePublisher.java

public interface MessagePublisher<T extends Message> {    /**     * Forward a message for publication     * @param message     * @throws MessagePublishException     */    void publish(T message) throws MessagePublishException;}
MessagePublisher接口定义了publish办法

TransactionalMessagePublisher

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessagePublisher.java

public class TransactionalMessagePublisher implements MessagePublisher<TypedMessage>, TransactionalResource {    private final Logger logger = LoggerFactory.getLogger(getClass());    private final MessagePublisher<TypedMessage> messagePublisher;    private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();    public TransactionalMessagePublisher(final MessagePublisher<TypedMessage> messagePublisher) {        this.messagePublisher = messagePublisher;    }    private MessagingTransaction getCurrentTransaction() {        if (currentTransaction.get() == null) {            throw new NonExistentTransactionException();        }        return currentTransaction.get();    }    @Override    public void begin() throws TransactionException {        if (currentTransaction.get() != null) {            throw new NestedTransactionException(currentTransaction.get());        }        currentTransaction.set(new MessagingTransaction());        logger.trace("Beginning transaction: " + currentTransaction.get().transactionId());    }    @Override    public void commit() throws TransactionException {        final MessagingTransaction transaction = getCurrentTransaction();        logger.trace("Committing transaction: " + transaction.transactionId());        transaction.applyActions(messagePublisher);        currentTransaction.remove();        logger.trace("Transaction successfully committed: " + transaction.transactionId());    }    @Override    public void publish(final TypedMessage typedMessage) throws MessagePublishException {        final MessagingTransaction transaction = getCurrentTransaction();        transaction.addMessage(typedMessage);    }    @Override    public void abort() throws TransactionException {        currentTransaction.remove();    }}
TransactionalMessagePublisher实现了MessagePublisher、TransactionalResource接口;begin办法给currentTransaction设置新的MessagingTransaction;commit办法获取MessagingTransaction,执行applyActions办法,最初执行currentTransaction.remove();abort办法执行currentTransaction.remove()办法;publish办法执行transaction.addMessage

Transaction

Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/Transaction.java

public interface Transaction {    String transactionId();}
Transaction接口定义了transactionId办法

MessagingTransaction

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessagingTransaction.java

public class MessagingTransaction implements Transaction {    private final Queue<MessageAction> messageActions;    private final String transactionId;    public MessagingTransaction() {        messageActions = new LinkedList<>();        transactionId = UUID.randomUUID().toString();    }    @Override    public String transactionId() {        return transactionId;    }    public void applyActions(final MessagePublisher<TypedMessage> messagePublisher) {        while (!messageActions.isEmpty()) {            final MessageAction messageAction = messageActions.remove();            messagePublisher.publish(messageAction.message());        }    }    public void applyActions(final MessageSender<TypedMessage> messageSender) {        while (!messageActions.isEmpty()) {            final MessageAction messageAction = messageActions.remove();            messageAction.apply(messageSender);        }    }    public void addMessage(final TypedMessage typedMessage) {        messageActions.add(new MessageAction(typedMessage, 0));    }    public void addDelayedMessage(final TypedMessage typedMessage, final int delay) {        messageActions.add(new MessageAction(typedMessage, delay));    }}
MessagingTransaction办法实现了Transaction接口;其transactionId办法返回的是结构器生成的UUID;applyActions办法遍历messageAction,别离执行messagePublisher.publish及messageAction.apply(messageSender)

小结

cheddar的tx提供了TransactionalMessagePublisher、TransactionalMessageSender,它们都实现了TransactionalResource接口;其commit办法都执行了transaction.applyActions;MessageAction提供了两个apply办法,接管messageSender参数的apply办法当delay大于0时执行messageSender.sendDelayedMessage,否则执行messageSender.send(typedMessage);接管messagePublisher参数的apply办法执行messagePublisher.publish。

doc

  • Cheddar