共计 7764 个字符,预计需要花费 20 分钟才能阅读完成。
序
本文次要钻研一下 cheddar 的 MessageSender
MessageSender
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java
public interface MessageSender<T extends Message> {
/**
* Send a message
* @param message Message to send
* @throws MessageSendException
*/
void send(T message) throws MessageSendException;
/**
* Send a message, where the message is not visible to receivers for the specified delay duration
* @param message Message to send
* @param delaySeconds Duration for which sent message is invisible to receivers
* @throws MessageSendException
*/
void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;
}
MessageSender 接口定义了 send、sendDelayedMessage 办法
MessageSenderImpl
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSenderImpl.java
public class MessageSenderImpl<T extends Message> implements MessageSender<T> {
private final MessageQueue<T> messageQueue;
public MessageSenderImpl(final MessageQueue<T> messageQueue) {this.messageQueue = messageQueue;}
@Override
public void send(final T message) throws MessageSendException {messageQueue.send(message);
}
@Override
public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException {messageQueue.sendDelayedMessage(message, delaySeconds);
}
}
MessageSenderImpl 实现了 MessageSender 接口,其 send 办法委托给了 messageQueue.send;其 sendDelayedMessage 办法委托给了 messageQueue.sendDelayedMessage
MessageQueue
Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageQueue.java
public interface MessageQueue<T extends Message> {
/**
* @return The queue name
*/
String getName();
/**
* Send a message to this message queue
* @param message Message to send
* @throws MessageSendException
*/
void send(T message) throws MessageSendException;
/**
* Send a message to this message queue; the message is not visible to receivers for the specified delay duration
* @param message Message to send
* @param delaySeconds Duration for which sent message is invisible to receivers
* @throws MessageSendException
*/
void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;
/**
* Receives any number of messages on this queue, but does not delete them. No order or priority of messages is
* guaranteed.
* @return List of received {@code Message}s
* @throws MessageReceiveException
*/
List<T> receive() throws MessageReceiveException;
/**
* Receives any number of messages on this queue up to the maximum specified, but does not delete them. No order or
* priority of messages is guaranteed. This call will spend up to the wait time given for a message to arrive in the
* queue before returning.
* @param waitTimeSeconds The duration (in seconds) for which the call will wait for a message to arrive in the
* queue before returning. If a message is available, the call will return sooner.
* @param maxMessages The maximum number of messages to return. Will never return more messages than this value but
* may return fewer. Values can be from 1 to 10.
* @return List of received {@code Message}s
* @throws MessageReceiveException
*/
List<T> receive(int waitTimeSeconds, int maxMessages) throws MessageReceiveException;
/**
* Deletes a message previously received from this queue.
* @param typedMessage {@code Message} to delete
* @throws MessageDeleteException
*/
void delete(T message) throws MessageDeleteException;
}
MessageQueue 接口定义了 getName、send、sendDelayedMessage、receive、delete 办法
InMemoryMessageQueue
Cheddar/cheddar/cheddar-integration-mocks/src/main/java/com/clicktravel/infrastructure/messaging/inmemory/InMemoryMessageQueue.java
public class InMemoryMessageQueue<T extends Message> implements MessageQueue<T>, Resettable {private final Logger logger = LoggerFactory.getLogger(getClass());
private final Queue<T> queue = new ConcurrentLinkedQueue<>();
private final String name;
private final InMemoryMessageQueuePoller inMemoryMessageQueuePoller;
@SuppressWarnings("unchecked")
public InMemoryMessageQueue(final String name, final InMemoryMessageQueuePoller inMemoryMessageQueuePoller,
final InMemoryExchange<T>... inMemoryExchanges) {
this.name = name;
this.inMemoryMessageQueuePoller = inMemoryMessageQueuePoller;
final List<String> exchangeNames = new ArrayList<>();
for (final InMemoryExchange<T> inMemoryExchange : inMemoryExchanges) {inMemoryExchange.addSubscriber(this);
exchangeNames.add(inMemoryExchange.getName());
}
logger.info("Using in memory message queue:" + name + "with subscriptions to these exchanges: ["
+ StringUtils.join(exchangeNames) + "]");
}
@Override
public void send(final T message) {queue.add(message);
inMemoryMessageQueuePoller.poll();}
@Override
public void sendDelayedMessage(final T message, final int delaySeconds) {send(message); // delay not supported
}
@Override
public String getName() {return name;}
@Override
public List<T> receive(final int waitTimeSeconds, final int maxMessages) {return receive();
}
@Override
public List<T> receive() {final T message = queue.peek();
final List<T> messages = new ArrayList<T>(1);
if (message != null) {messages.add(message);
}
return messages;
}
@Override
public void delete(final T message) {queue.remove(message);
}
@Override
public String toString() {return "InMemoryMessageQueue [name=" + name + ", queue=" + queue + "]";
}
@Override
public void reset() {queue.clear();
}
}
InMemoryMessageQueue 实现了 MessageQueue、Resettable 接口,它定义了 ConcurrentLinkedQueue 及 InMemoryMessageQueuePoller 两个属性;send 办法会往 queue 增加 message,而后执行 inMemoryMessageQueuePoller.poll();sendDelayedMessage 办法目前不反对
SqsMessageQueue
Cheddar/cheddar/cheddar-integration-aws/src/main/java/com/clicktravel/infrastructure/messaging/aws/sqs/SqsMessageQueue.java
public abstract class SqsMessageQueue<T extends Message> implements MessageQueue<T> {
private final SqsQueueResource sqsQueueResource;
public SqsMessageQueue(final SqsQueueResource sqsQueueResource) {this.sqsQueueResource = sqsQueueResource;}
protected abstract String toSqsMessageBody(final T message);
protected abstract T toMessage(final com.amazonaws.services.sqs.model.Message sqsMessage);
@Override
public String getName() {return sqsQueueResource.getQueueName();
}
@Override
public void send(final T message) throws MessageSendException {
try {sqsQueueResource.sendMessage(toSqsMessageBody(message));
} catch (final AmazonClientException e) {throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName()
+ "]", e);
}
}
@Override
public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException {
try {sqsQueueResource.sendDelayedMessage(toSqsMessageBody(message), delaySeconds);
} catch (final AmazonClientException e) {throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName()
+ "]", e);
}
}
@Override
public List<T> receive() throws MessageReceiveException {
try {return toMessages(sqsQueueResource.receiveMessages());
} catch (final AmazonClientException e) {
throw new MessageReceiveException("Unable to receive messages on SQS queue:["
+ sqsQueueResource.getQueueName() + "]", e);
}
}
@Override
public List<T> receive(final int waitTimeSeconds, final int maxMessages) throws MessageReceiveException {
try {return toMessages(sqsQueueResource.receiveMessages(waitTimeSeconds, maxMessages));
} catch (final AmazonClientException e) {
throw new MessageReceiveException("Unable to receive messages on SQS queue:["
+ sqsQueueResource.getQueueName() + "]", e);
}
}
private List<T> toMessages(final List<com.amazonaws.services.sqs.model.Message> sqsMessages) {final ArrayList<T> messages = new ArrayList<>();
for (final com.amazonaws.services.sqs.model.Message sqsMessage : sqsMessages) {messages.add(toMessage(sqsMessage));
}
return messages;
}
@Override
public void delete(final T message) throws MessageDeleteException {
try {sqsQueueResource.deleteMessage(message.getReceiptHandle());
} catch (final AmazonClientException e) {
throw new MessageDeleteException("Unable to delete message on SQS queue:["
+ sqsQueueResource.getQueueName() + "]", e);
}
}
public SqsQueueResource getSqsQueue() {return sqsQueueResource;}
}
SqsMessageQueue 是个抽象类,申明实现 MessageQueue 接口,其 send 办法委托给了 sqsQueueResource.sendMessage;其 sendDelayedMessage 办法委托给了 sqsQueueResource.sendDelayedMessage
小结
cheddar 的 MessageSender 接口定义了 send、sendDelayedMessage 办法;MessageSenderImpl 实现了 MessageSender 接口,其 send 办法委托给了 messageQueue.send;其 sendDelayedMessage 办法委托给了 messageQueue.sendDelayedMessage;InMemoryMessageQueue 和 SqsMessageQueue 提供了两种实现,其中 inMemory 的实现不反对 sendDelayedMessage 办法。
doc
- Cheddar