关于java:聊聊cheddar的tx

3次阅读

共计 8106 个字符,预计需要花费 21 分钟才能阅读完成。

本文次要钻研一下 cheddar 的 tx

MessageAction

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessageAction.java

public class MessageAction {

    private final TypedMessage typedMessage;
    private final int delaySeconds;

    public MessageAction(final TypedMessage typedMessage, final int delaySeconds) {
        this.typedMessage = typedMessage;
        this.delaySeconds = delaySeconds;
    }

    public TypedMessage message() {return typedMessage;}

    public int delay() {return delaySeconds;}

    public void apply(final MessageSender<TypedMessage> messageSender) {if (delay() > 0) {messageSender.sendDelayedMessage(typedMessage, delaySeconds);
        } else {messageSender.send(typedMessage);
        }
    }

    public void apply(final MessagePublisher<TypedMessage> messagePublisher) {messagePublisher.publish(typedMessage);
    }

}

MessageAction 定义了 typedMessage、delaySeconds 属性,它提供了两个 apply 办法,接管 messageSender 参数的 apply 办法当 delay 大于 0 时执行 messageSender.sendDelayedMessage,否则执行 messageSender.send(typedMessage);接管 messagePublisher 参数的 apply 办法执行 messagePublisher.publish

MessageSender

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java

public interface MessageSender<T extends Message> {

    /**
     * Send a message
     * @param message Message to send
     * @throws MessageSendException
     */
    void send(T message) throws MessageSendException;

    /**
     * Send a message, where the message is not visible to receivers for the specified delay duration
     * @param message Message to send
     * @param delaySeconds Duration for which sent message is invisible to receivers
     * @throws MessageSendException
     */
    void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;
}

MessageSender 接口定义了 send、sendDelayedMessage 办法

TransactionalResource

Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/TransactionalResource.java

public interface TransactionalResource {void begin() throws TransactionException;

    void commit() throws TransactionException;

    void abort() throws TransactionException;}

TransactionalResource 接口定义了 begin、commit、abort 办法

TransactionalMessageSender

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessageSender.java

public class TransactionalMessageSender implements MessageSender<TypedMessage>, TransactionalResource {private final Logger logger = LoggerFactory.getLogger(getClass());

    private final MessageSender<TypedMessage> messageSender;
    private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();

    public TransactionalMessageSender(final MessageSender<TypedMessage> messageSender) {this.messageSender = messageSender;}

    private MessagingTransaction getCurrentTransaction() {if (currentTransaction.get() == null) {throw new NonExistentTransactionException();
        }
        return currentTransaction.get();}

    @Override
    public void begin() throws TransactionException {if (currentTransaction.get() != null) {throw new NestedTransactionException(currentTransaction.get());
        }
        currentTransaction.set(new MessagingTransaction());
        logger.trace("Beginning transaction:" + currentTransaction.get().transactionId());
    }

    @Override
    public void commit() throws TransactionException {final MessagingTransaction transaction = getCurrentTransaction();
        logger.trace("Committing transaction:" + transaction.transactionId());
        transaction.applyActions(messageSender);
        currentTransaction.remove();
        logger.trace("Transaction successfully committed:" + transaction.transactionId());
    }

    @Override
    public void send(final TypedMessage typedMessage) throws MessageSendException {final MessagingTransaction transaction = getCurrentTransaction();
        transaction.addMessage(typedMessage);
    }

    @Override
    public void sendDelayedMessage(final TypedMessage typedMessage, final int delay) throws MessageSendException {final MessagingTransaction transaction = getCurrentTransaction();
        transaction.addDelayedMessage(typedMessage, delay);
    }

    @Override
    public void abort() throws TransactionException {currentTransaction.remove();
    }
}

TransactionalMessageSender 实现了 MessageSender、TransactionalResource 接口;begin 办法给 currentTransaction 设置新的 MessagingTransaction;commit 办法获取 MessagingTransaction,执行 applyActions 办法,最初执行 currentTransaction.remove();abort 办法执行 currentTransaction.remove() 办法;send 办法执行 transaction.addMessage;sendDelayedMessage 办法执行 addDelayedMessage

MessagePublisher

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessagePublisher.java

public interface MessagePublisher<T extends Message> {

    /**
     * Forward a message for publication
     * @param message
     * @throws MessagePublishException
     */
    void publish(T message) throws MessagePublishException;

}

MessagePublisher 接口定义了 publish 办法

TransactionalMessagePublisher

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessagePublisher.java

public class TransactionalMessagePublisher implements MessagePublisher<TypedMessage>, TransactionalResource {private final Logger logger = LoggerFactory.getLogger(getClass());

    private final MessagePublisher<TypedMessage> messagePublisher;
    private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();

    public TransactionalMessagePublisher(final MessagePublisher<TypedMessage> messagePublisher) {this.messagePublisher = messagePublisher;}

    private MessagingTransaction getCurrentTransaction() {if (currentTransaction.get() == null) {throw new NonExistentTransactionException();
        }
        return currentTransaction.get();}

    @Override
    public void begin() throws TransactionException {if (currentTransaction.get() != null) {throw new NestedTransactionException(currentTransaction.get());
        }
        currentTransaction.set(new MessagingTransaction());
        logger.trace("Beginning transaction:" + currentTransaction.get().transactionId());
    }

    @Override
    public void commit() throws TransactionException {final MessagingTransaction transaction = getCurrentTransaction();
        logger.trace("Committing transaction:" + transaction.transactionId());
        transaction.applyActions(messagePublisher);
        currentTransaction.remove();
        logger.trace("Transaction successfully committed:" + transaction.transactionId());
    }

    @Override
    public void publish(final TypedMessage typedMessage) throws MessagePublishException {final MessagingTransaction transaction = getCurrentTransaction();
        transaction.addMessage(typedMessage);
    }

    @Override
    public void abort() throws TransactionException {currentTransaction.remove();
    }
}

TransactionalMessagePublisher 实现了 MessagePublisher、TransactionalResource 接口;begin 办法给 currentTransaction 设置新的 MessagingTransaction;commit 办法获取 MessagingTransaction,执行 applyActions 办法,最初执行 currentTransaction.remove();abort 办法执行 currentTransaction.remove() 办法;publish 办法执行 transaction.addMessage

Transaction

Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/Transaction.java

public interface Transaction {String transactionId();

}

Transaction 接口定义了 transactionId 办法

MessagingTransaction

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessagingTransaction.java

public class MessagingTransaction implements Transaction {

    private final Queue<MessageAction> messageActions;

    private final String transactionId;

    public MessagingTransaction() {messageActions = new LinkedList<>();
        transactionId = UUID.randomUUID().toString();
    }

    @Override
    public String transactionId() {return transactionId;}

    public void applyActions(final MessagePublisher<TypedMessage> messagePublisher) {while (!messageActions.isEmpty()) {final MessageAction messageAction = messageActions.remove();
            messagePublisher.publish(messageAction.message());
        }
    }

    public void applyActions(final MessageSender<TypedMessage> messageSender) {while (!messageActions.isEmpty()) {final MessageAction messageAction = messageActions.remove();
            messageAction.apply(messageSender);
        }
    }

    public void addMessage(final TypedMessage typedMessage) {messageActions.add(new MessageAction(typedMessage, 0));
    }

    public void addDelayedMessage(final TypedMessage typedMessage, final int delay) {messageActions.add(new MessageAction(typedMessage, delay));
    }

}

MessagingTransaction 办法实现了 Transaction 接口;其 transactionId 办法返回的是结构器生成的 UUID;applyActions 办法遍历 messageAction,别离执行 messagePublisher.publish 及 messageAction.apply(messageSender)

小结

cheddar 的 tx 提供了 TransactionalMessagePublisher、TransactionalMessageSender,它们都实现了 TransactionalResource 接口;其 commit 办法都执行了 transaction.applyActions;MessageAction 提供了两个 apply 办法,接管 messageSender 参数的 apply 办法当 delay 大于 0 时执行 messageSender.sendDelayedMessage,否则执行 messageSender.send(typedMessage);接管 messagePublisher 参数的 apply 办法执行 messagePublisher.publish。

doc

  • Cheddar
正文完
 0