本文次要钻研一下cheddar的MessageSender

MessageSender

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java

public interface MessageSender<T extends Message> {    /**     * Send a message     * @param message Message to send     * @throws MessageSendException     */    void send(T message) throws MessageSendException;    /**     * Send a message, where the message is not visible to receivers for the specified delay duration     * @param message Message to send     * @param delaySeconds Duration for which sent message is invisible to receivers     * @throws MessageSendException     */    void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;}
MessageSender接口定义了send、sendDelayedMessage办法

MessageSenderImpl

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSenderImpl.java

public class MessageSenderImpl<T extends Message> implements MessageSender<T> {    private final MessageQueue<T> messageQueue;    public MessageSenderImpl(final MessageQueue<T> messageQueue) {        this.messageQueue = messageQueue;    }    @Override    public void send(final T message) throws MessageSendException {        messageQueue.send(message);    }    @Override    public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException {        messageQueue.sendDelayedMessage(message, delaySeconds);    }}
MessageSenderImpl实现了MessageSender接口,其send办法委托给了messageQueue.send;其sendDelayedMessage办法委托给了messageQueue.sendDelayedMessage

MessageQueue

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageQueue.java

public interface MessageQueue<T extends Message> {    /**     * @return The queue name     */    String getName();    /**     * Send a message to this message queue     * @param message Message to send     * @throws MessageSendException     */    void send(T message) throws MessageSendException;    /**     * Send a message to this message queue; the message is not visible to receivers for the specified delay duration     * @param message Message to send     * @param delaySeconds Duration for which sent message is invisible to receivers     * @throws MessageSendException     */    void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;    /**     * Receives any number of messages on this queue, but does not delete them. No order or priority of messages is     * guaranteed.     * @return List of received {@code Message}s     * @throws MessageReceiveException     */    List<T> receive() throws MessageReceiveException;    /**     * Receives any number of messages on this queue up to the maximum specified, but does not delete them. No order or     * priority of messages is guaranteed. This call will spend up to the wait time given for a message to arrive in the     * queue before returning.     * @param waitTimeSeconds The duration (in seconds) for which the call will wait for a message to arrive in the     *            queue before returning. If a message is available, the call will return sooner.     * @param maxMessages The maximum number of messages to return. Will never return more messages than this value but     *            may return fewer. Values can be from 1 to 10.     * @return List of received {@code Message}s     * @throws MessageReceiveException     */    List<T> receive(int waitTimeSeconds, int maxMessages) throws MessageReceiveException;    /**     * Deletes a message previously received from this queue.     * @param typedMessage {@code Message} to delete     * @throws MessageDeleteException     */    void delete(T message) throws MessageDeleteException;}
MessageQueue接口定义了getName、send、sendDelayedMessage、receive、delete办法

InMemoryMessageQueue

Cheddar/cheddar/cheddar-integration-mocks/src/main/java/com/clicktravel/infrastructure/messaging/inmemory/InMemoryMessageQueue.java

public class InMemoryMessageQueue<T extends Message> implements MessageQueue<T>, Resettable {    private final Logger logger = LoggerFactory.getLogger(getClass());    private final Queue<T> queue = new ConcurrentLinkedQueue<>();    private final String name;    private final InMemoryMessageQueuePoller inMemoryMessageQueuePoller;    @SuppressWarnings("unchecked")    public InMemoryMessageQueue(final String name, final InMemoryMessageQueuePoller inMemoryMessageQueuePoller,            final InMemoryExchange<T>... inMemoryExchanges) {        this.name = name;        this.inMemoryMessageQueuePoller = inMemoryMessageQueuePoller;        final List<String> exchangeNames = new ArrayList<>();        for (final InMemoryExchange<T> inMemoryExchange : inMemoryExchanges) {            inMemoryExchange.addSubscriber(this);            exchangeNames.add(inMemoryExchange.getName());        }        logger.info("Using in memory message queue: " + name + " with subscriptions to these exchanges: ["                + StringUtils.join(exchangeNames) + "]");    }    @Override    public void send(final T message) {        queue.add(message);        inMemoryMessageQueuePoller.poll();    }    @Override    public void sendDelayedMessage(final T message, final int delaySeconds) {        send(message); // delay not supported    }    @Override    public String getName() {        return name;    }    @Override    public List<T> receive(final int waitTimeSeconds, final int maxMessages) {        return receive();    }    @Override    public List<T> receive() {        final T message = queue.peek();        final List<T> messages = new ArrayList<T>(1);        if (message != null) {            messages.add(message);        }        return messages;    }    @Override    public void delete(final T message) {        queue.remove(message);    }    @Override    public String toString() {        return "InMemoryMessageQueue [name=" + name + ", queue=" + queue + "]";    }    @Override    public void reset() {        queue.clear();    }}
InMemoryMessageQueue实现了MessageQueue、Resettable接口,它定义了ConcurrentLinkedQueue及InMemoryMessageQueuePoller两个属性;send办法会往queue增加message,而后执行inMemoryMessageQueuePoller.poll();sendDelayedMessage办法目前不反对

SqsMessageQueue

Cheddar/cheddar/cheddar-integration-aws/src/main/java/com/clicktravel/infrastructure/messaging/aws/sqs/SqsMessageQueue.java

public abstract class SqsMessageQueue<T extends Message> implements MessageQueue<T> {    private final SqsQueueResource sqsQueueResource;    public SqsMessageQueue(final SqsQueueResource sqsQueueResource) {        this.sqsQueueResource = sqsQueueResource;    }    protected abstract String toSqsMessageBody(final T message);    protected abstract T toMessage(final com.amazonaws.services.sqs.model.Message sqsMessage);    @Override    public String getName() {        return sqsQueueResource.getQueueName();    }    @Override    public void send(final T message) throws MessageSendException {        try {            sqsQueueResource.sendMessage(toSqsMessageBody(message));        } catch (final AmazonClientException e) {            throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName()                    + "]", e);        }    }    @Override    public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException {        try {            sqsQueueResource.sendDelayedMessage(toSqsMessageBody(message), delaySeconds);        } catch (final AmazonClientException e) {            throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName()                    + "]", e);        }    }    @Override    public List<T> receive() throws MessageReceiveException {        try {            return toMessages(sqsQueueResource.receiveMessages());        } catch (final AmazonClientException e) {            throw new MessageReceiveException("Unable to receive messages on SQS queue:["                    + sqsQueueResource.getQueueName() + "]", e);        }    }    @Override    public List<T> receive(final int waitTimeSeconds, final int maxMessages) throws MessageReceiveException {        try {            return toMessages(sqsQueueResource.receiveMessages(waitTimeSeconds, maxMessages));        } catch (final AmazonClientException e) {            throw new MessageReceiveException("Unable to receive messages on SQS queue:["                    + sqsQueueResource.getQueueName() + "]", e);        }    }    private List<T> toMessages(final List<com.amazonaws.services.sqs.model.Message> sqsMessages) {        final ArrayList<T> messages = new ArrayList<>();        for (final com.amazonaws.services.sqs.model.Message sqsMessage : sqsMessages) {            messages.add(toMessage(sqsMessage));        }        return messages;    }    @Override    public void delete(final T message) throws MessageDeleteException {        try {            sqsQueueResource.deleteMessage(message.getReceiptHandle());        } catch (final AmazonClientException e) {            throw new MessageDeleteException("Unable to delete message on SQS queue:["                    + sqsQueueResource.getQueueName() + "]", e);        }    }    public SqsQueueResource getSqsQueue() {        return sqsQueueResource;    }}
SqsMessageQueue是个抽象类,申明实现MessageQueue接口,其send办法委托给了sqsQueueResource.sendMessage;其sendDelayedMessage办法委托给了sqsQueueResource.sendDelayedMessage

小结

cheddar的MessageSender接口定义了send、sendDelayedMessage办法;MessageSenderImpl实现了MessageSender接口,其send办法委托给了messageQueue.send;其sendDelayedMessage办法委托给了messageQueue.sendDelayedMessage;InMemoryMessageQueue和SqsMessageQueue提供了两种实现,其中inMemory的实现不反对sendDelayedMessage办法。

doc

  • Cheddar