序
本文次要钻研一下cheddar的MessageSender
MessageSender
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java
public interface MessageSender<T extends Message> { /** * Send a message * @param message Message to send * @throws MessageSendException */ void send(T message) throws MessageSendException; /** * Send a message, where the message is not visible to receivers for the specified delay duration * @param message Message to send * @param delaySeconds Duration for which sent message is invisible to receivers * @throws MessageSendException */ void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;}
MessageSender接口定义了send、sendDelayedMessage办法
MessageSenderImpl
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSenderImpl.java
public class MessageSenderImpl<T extends Message> implements MessageSender<T> { private final MessageQueue<T> messageQueue; public MessageSenderImpl(final MessageQueue<T> messageQueue) { this.messageQueue = messageQueue; } @Override public void send(final T message) throws MessageSendException { messageQueue.send(message); } @Override public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException { messageQueue.sendDelayedMessage(message, delaySeconds); }}
MessageSenderImpl实现了MessageSender接口,其send办法委托给了messageQueue.send;其sendDelayedMessage办法委托给了messageQueue.sendDelayedMessage
MessageQueue
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageQueue.java
public interface MessageQueue<T extends Message> { /** * @return The queue name */ String getName(); /** * Send a message to this message queue * @param message Message to send * @throws MessageSendException */ void send(T message) throws MessageSendException; /** * Send a message to this message queue; the message is not visible to receivers for the specified delay duration * @param message Message to send * @param delaySeconds Duration for which sent message is invisible to receivers * @throws MessageSendException */ void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException; /** * Receives any number of messages on this queue, but does not delete them. No order or priority of messages is * guaranteed. * @return List of received {@code Message}s * @throws MessageReceiveException */ List<T> receive() throws MessageReceiveException; /** * Receives any number of messages on this queue up to the maximum specified, but does not delete them. No order or * priority of messages is guaranteed. This call will spend up to the wait time given for a message to arrive in the * queue before returning. * @param waitTimeSeconds The duration (in seconds) for which the call will wait for a message to arrive in the * queue before returning. If a message is available, the call will return sooner. * @param maxMessages The maximum number of messages to return. Will never return more messages than this value but * may return fewer. Values can be from 1 to 10. * @return List of received {@code Message}s * @throws MessageReceiveException */ List<T> receive(int waitTimeSeconds, int maxMessages) throws MessageReceiveException; /** * Deletes a message previously received from this queue. * @param typedMessage {@code Message} to delete * @throws MessageDeleteException */ void delete(T message) throws MessageDeleteException;}
MessageQueue接口定义了getName、send、sendDelayedMessage、receive、delete办法
InMemoryMessageQueue
Cheddar/cheddar/cheddar-integration-mocks/src/main/java/com/clicktravel/infrastructure/messaging/inmemory/InMemoryMessageQueue.java
public class InMemoryMessageQueue<T extends Message> implements MessageQueue<T>, Resettable { private final Logger logger = LoggerFactory.getLogger(getClass()); private final Queue<T> queue = new ConcurrentLinkedQueue<>(); private final String name; private final InMemoryMessageQueuePoller inMemoryMessageQueuePoller; @SuppressWarnings("unchecked") public InMemoryMessageQueue(final String name, final InMemoryMessageQueuePoller inMemoryMessageQueuePoller, final InMemoryExchange<T>... inMemoryExchanges) { this.name = name; this.inMemoryMessageQueuePoller = inMemoryMessageQueuePoller; final List<String> exchangeNames = new ArrayList<>(); for (final InMemoryExchange<T> inMemoryExchange : inMemoryExchanges) { inMemoryExchange.addSubscriber(this); exchangeNames.add(inMemoryExchange.getName()); } logger.info("Using in memory message queue: " + name + " with subscriptions to these exchanges: [" + StringUtils.join(exchangeNames) + "]"); } @Override public void send(final T message) { queue.add(message); inMemoryMessageQueuePoller.poll(); } @Override public void sendDelayedMessage(final T message, final int delaySeconds) { send(message); // delay not supported } @Override public String getName() { return name; } @Override public List<T> receive(final int waitTimeSeconds, final int maxMessages) { return receive(); } @Override public List<T> receive() { final T message = queue.peek(); final List<T> messages = new ArrayList<T>(1); if (message != null) { messages.add(message); } return messages; } @Override public void delete(final T message) { queue.remove(message); } @Override public String toString() { return "InMemoryMessageQueue [name=" + name + ", queue=" + queue + "]"; } @Override public void reset() { queue.clear(); }}
InMemoryMessageQueue实现了MessageQueue、Resettable接口,它定义了ConcurrentLinkedQueue及InMemoryMessageQueuePoller两个属性;send办法会往queue增加message,而后执行inMemoryMessageQueuePoller.poll();sendDelayedMessage办法目前不反对
SqsMessageQueue
Cheddar/cheddar/cheddar-integration-aws/src/main/java/com/clicktravel/infrastructure/messaging/aws/sqs/SqsMessageQueue.java
public abstract class SqsMessageQueue<T extends Message> implements MessageQueue<T> { private final SqsQueueResource sqsQueueResource; public SqsMessageQueue(final SqsQueueResource sqsQueueResource) { this.sqsQueueResource = sqsQueueResource; } protected abstract String toSqsMessageBody(final T message); protected abstract T toMessage(final com.amazonaws.services.sqs.model.Message sqsMessage); @Override public String getName() { return sqsQueueResource.getQueueName(); } @Override public void send(final T message) throws MessageSendException { try { sqsQueueResource.sendMessage(toSqsMessageBody(message)); } catch (final AmazonClientException e) { throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName() + "]", e); } } @Override public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException { try { sqsQueueResource.sendDelayedMessage(toSqsMessageBody(message), delaySeconds); } catch (final AmazonClientException e) { throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName() + "]", e); } } @Override public List<T> receive() throws MessageReceiveException { try { return toMessages(sqsQueueResource.receiveMessages()); } catch (final AmazonClientException e) { throw new MessageReceiveException("Unable to receive messages on SQS queue:[" + sqsQueueResource.getQueueName() + "]", e); } } @Override public List<T> receive(final int waitTimeSeconds, final int maxMessages) throws MessageReceiveException { try { return toMessages(sqsQueueResource.receiveMessages(waitTimeSeconds, maxMessages)); } catch (final AmazonClientException e) { throw new MessageReceiveException("Unable to receive messages on SQS queue:[" + sqsQueueResource.getQueueName() + "]", e); } } private List<T> toMessages(final List<com.amazonaws.services.sqs.model.Message> sqsMessages) { final ArrayList<T> messages = new ArrayList<>(); for (final com.amazonaws.services.sqs.model.Message sqsMessage : sqsMessages) { messages.add(toMessage(sqsMessage)); } return messages; } @Override public void delete(final T message) throws MessageDeleteException { try { sqsQueueResource.deleteMessage(message.getReceiptHandle()); } catch (final AmazonClientException e) { throw new MessageDeleteException("Unable to delete message on SQS queue:[" + sqsQueueResource.getQueueName() + "]", e); } } public SqsQueueResource getSqsQueue() { return sqsQueueResource; }}
SqsMessageQueue是个抽象类,申明实现MessageQueue接口,其send办法委托给了sqsQueueResource.sendMessage;其sendDelayedMessage办法委托给了sqsQueueResource.sendDelayedMessage
小结
cheddar的MessageSender接口定义了send、sendDelayedMessage办法;MessageSenderImpl实现了MessageSender接口,其send办法委托给了messageQueue.send;其sendDelayedMessage办法委托给了messageQueue.sendDelayedMessage;InMemoryMessageQueue和SqsMessageQueue提供了两种实现,其中inMemory的实现不反对sendDelayedMessage办法。
doc
- Cheddar