Amazon SQS 是 AWS 上支流的音讯队列服务,按理说它是有 SDK 的,那么为什么还要本人编写客户端呢?因为它提供的 SDK 太简略,就几个 Web API,没有方法间接用。咱们具体来说一说。
SQS SDK 中的 API,咱们次要用到的也就是 getQueueUrl, sendMessage, receiveMessage 等。getQueueUrl 能依据传入的 queueName 查找到 queueUrl,后续用这个 queueUrl 来拜访相应的 queue(即:调用 sendMessage 发消息,或调用 receiveMessage 收音讯)。次要复杂度在于收音讯:这个 API 是要被动调用的,可是你怎么晓得有没有新音讯须要你去收呢?事实上,这个 receiveMessage API 是基于拉模式 (pull mode) 的,你须要轮询来不停地拉取新音讯,这个比拟像 Kafka。随之而来的,就须要线程治理,须要一个对 SDK 做了进一步包装的客户端库。
Spring Cloud Messaging 提供了 SQS 的客户端库。然而当咱们在 2023 年 3 月构建基于 SQS 的应用程序时,咱们用的是 AWS SDK V2,而 Spring Cloud Messaging 尚未正式反对 AWS SDK V2。因而,咱们决定本人编写 SQS 的客户端库。而且咱们的设计也与 Spring Cloud Messaging 的有所不同:咱们同时应用多个 AWS 账号,为此,咱们间接在配置中援用 queueUrl(它其实是动态值,可间接援用);而 Spring Cloud Messaging 只能在配置中援用 queueName,而后再运行时获取以后 AWS 账号中相应的 queueUrl。
当初就来讲一讲设计与实现。音讯队列客户端遵循生产者 - 消费者模型,分为 Producer 和 Consumer。SQS 的音讯体必须是不大于 256KB 的文本,因而能够把音讯体当成一个 String。
Producer
Producer 很简略,把音讯收回去就行了,顺便对超时和异样做适当的解决。库的用户能够自行决定音讯体的序列化和反序列化形式,咱们不干预这件事。
Producer 的应用形式很简略:
new SqsMessageProducer(queueUrl)
.produce(yourMessagePayload);
Producer 的残缺实现代码大抵如下:
/** How to use: Call produce() with your serialized message string. */
public class SqsMessageProducer {
private final String queueUrl;
private final int timeoutSeconds;
private final SqsAsyncClient client;
public SqsMessageProducer(String queueUrl, int timeoutSeconds) {
this.queueUrl = queueUrl;
this.timeoutSeconds = timeoutSeconds;
client = new SqsClientFactory().createSqsAsyncClient();
}
public void produce(String payload) {
var sendMessageFuture =
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody(payload).build());
// 不能有限期待 future,要有超时机制
try {sendMessageFuture.get(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {throw new ProducerException(e);
}
}
public static class ProducerException extends RuntimeException {public ProducerException(Throwable cause) {super(cause);
}
}
}
如果想进一步提高 Producer 的性能,能够让它异步获取 sendMessageFuture 的后果,不必同步期待。然而这么做会升高可靠性,不能保障调用了 Producer 就肯定胜利发送了音讯,因而须要衡量。
Consumer
Consumer 的应用形式很简略,无效利用了函数式编程格调,不须要编写派生类,只须要创立 Consumer 的实例,传入一个音讯处理函数,而后启动就能够。示例代码如下:
new SqsMessageConsumer(queueUrl, yourCustomizedThreadNamePrefix, yourMessageHandler)
.runAsync();
Consumer 的实现要简单一些,须要实现音讯驱动的异步计算格调。解决音讯个别会比收取音讯更花工夫,因而它创立一个主循环线程用来轮询音讯队列,创立一个工作线程池用来解决音讯。主循环线程每次可能收到 0~n 个音讯,把收到的音讯分发给工作线程池来解决。因为工作线程池自带工作队列用于缓冲,所以这两种线程之间是互不阻塞的:如果工作线程慢了,主循环线程能够照常收取和散发新音讯;如果主循环线程慢了,工作线程能够照常解决已有的音讯。
留神一个要点:SQS 不会主动清理已被收取的音讯,因为它不晓得你是否胜利解决了音讯。当一个音讯被收取后,它会临时被暗藏,免得其余消费者收到它,如果此音讯始终没有被清理,它会在一段时间后 (默认 30 秒,可配置) 从新呈现,被某个消费者再度收取。你须要一个机制来被动告知 SQS 某条音讯已被解决,这个机制就是 deleteMessage API:胜利解决一个音讯后,被动调 deleteMessage 来从队列中删除此音讯;如果解决失败,什么都不必做,SQS 会在一段时间后再次让消费者收取到此音讯。
外围代码这么写:
private volatile boolean shouldShutdown = false;
// 只有没有敞开,主循环就始终收取音讯
while (!shouldShutdown) {
List<Message> messages;
try {messages = receiveMessages();
} catch (Throwable e) {logger.error("failed to receive", e);
continue;
}
try {dispatchMessages(queueUrl, messages);
} catch (Throwable e) {logger.error("failed to dispatch", e);
}
}
// 收音讯的具体实现
private List<Message> receiveMessages() throws ExecutionException, InterruptedException {
// visibilityTimeout = message handling timeout
// It is usually set at infrastructure level
var receiveMessageFuture =
client.receiveMessage(ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.waitTimeSeconds(10)
.maxNumberOfMessages(maxParallelism)
.build());
// 下面已在申请中设置 waitTimeSeconds=10,所以这里能够不设置超时
return receiveMessageFuture.get().messages();
}
// 把收到音讯分发给工作线程池做解决
// 要显式地把解决好的音讯从队列中删除
// 如果不删除,会在将来再次被主循环收取到
private void dispatchMessages(String queueUrl, List<Message> messages) {for (Message message : messages) {
workerThreadPool.execute(() -> {String messageId = message.messageId();
try {logger.info("Started handling message with id={}", messageId);
messageHandler.accept(message);
logger.info("Completed handling message with id={}", messageId);
// Should delete the succeeded message
client.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.build());
logger.info("Deleted handled message with id={}", messageId);
} catch (Throwable e) {
// Logging is enough. Failed message is not deleted, and will be retried on a future polling.
logger.error("Failed to handle message with id=$messageId", e);
}
});
}
}
在以上代码中,每次 receiveMessage 时设置 waitTimeSeconds=10,即最多期待 10 秒,若没有新音讯就返回 0 条音讯;若有新音讯,就提前返回所收到的 1 或多条音讯。之所以不有限期待,是怕网关主动敞开长时间静默的网络连接。
还须要一个优雅敞开机制,让服务器能顺利敞开和清理资源:
Thread mainLoopThread = Thread.currentThread();
// JVM awaits all shutdown hooks to complete
// https://stackoverflow.com/questions/8663107/how-does-the-jvm-terminate-daemon-threads-or-how-to-write-daemon-threads-that-t
Runtime.getRuntime()
.addShutdownHook(
new Thread(() -> {
shouldShutdown = true;
mainLoopThread.interrupt();
try {workerThreadPool.shutdown();
boolean terminated = workerThreadPool.awaitTermination(1, TimeUnit.MINUTES);
if (!terminated) {List<Runnable> runnables = workerThreadPool.shutdownNow();
logger.info("shutdownNow with {} runnables undone", runnables.size());
}
} catch (RuntimeException e) {logger.error("shutdown failed", e);
throw e;
} catch (InterruptedException e) {logger.error("shutdown interrupted", e);
throw new IllegalStateException(e);
}
}));
有时网络连接不稳固,主循环频繁报错比拟 noisy,改成指数退却的重试:
while (!shouldShutdown) {
List<Message> messages;
try {messages = receiveMessages();
// after success, restore backoff to the initial value
receiveBackoffSeconds = 1;
} catch (Throwable e) {logger.error("failed to receive", e);
logger.info("Gonna sleep {} seconds for backoff", receiveBackoffSeconds);
try {
//noinspection BusyWait
Thread.sleep(receiveBackoffSeconds * 1000L);
} catch (InterruptedException ex) {logger.error("backoff sleep interrupted", ex);
}
// after failure, increment next backoff (≤ limit)
receiveBackoffSeconds = exponentialBackoff(receiveBackoffSeconds, 60);
continue;
}
try {dispatchMessages(queueUrl, messages);
} catch (Throwable e) {logger.error("failed to dispatch", e);
}
}
private int exponentialBackoff(int current, int limit) {
int next = current * 2;
return Math.min(next, limit);
}
工作线程池是一个 ThreadPoolExecutor,应用一个有界的 BlockingQueue 来实现回压(back-pressure),当这个 queue 一满,主循环线程就会被迫暂停,以避免本地的音讯积压过多:如果积压过多,既会节约内存,又会导致很多音讯被收取却得不到及时处理,这时还不如让给其余消费者实例去收取。创立工作线程池的相干代码如下:
workerThreadPool =
new ThreadPoolExecutor(
maxParallelism,
maxParallelism,
0,
TimeUnit.SECONDS,
// bounded queue for back pressure
new LinkedBlockingQueue<>(100),
new CustomizableThreadFactory(threadPoolPrefix + "-pool-"),
new TimeoutBlockingPolicy(30));
// Used by workerThreadPool
private static class TimeoutBlockingPolicy implements RejectedExecutionHandler {
private final long timeoutSeconds;
public TimeoutBlockingPolicy(long timeoutSeconds) {this.timeoutSeconds = timeoutSeconds;}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {BlockingQueue<Runnable> queue = executor.getQueue();
if (!queue.offer(r, this.timeoutSeconds, TimeUnit.SECONDS)) {throw new RejectedExecutionException("Timeout after" + timeoutSeconds + "seconds");
}
} catch (InterruptedException e) {throw new IllegalStateException(e);
}
}
}
Consumer 的残缺实现代码大抵如下:
/**
* How to use:
* 1. create a consumer instance with a queue name and a stateless messageHandler function.
* 2. call runAsync() method to start listening to the queue.
*/
public class SqsMessageConsumer implements Runnable {private static final Logger logger = LoggerFactory.getLogger(SqsMessageConsumer.class);
private final String queueUrl;
private final Consumer<Message> messageHandler;
private final int maxParallelism;
private final SqsAsyncClient client;
private final ExecutorService workerThreadPool;
private volatile boolean shouldShutdown = false;
public SqsMessageConsumer(
String queueUrl,
String threadPoolPrefix,
Consumer<Message> messageHandler) {this(queueUrl, threadPoolPrefix, messageHandler, 8);
}
public SqsMessageConsumer(
String queueUrl,
String threadPoolPrefix,
Consumer<Message> messageHandler,
int maxParallelism) {
this.queueUrl = queueUrl;
this.messageHandler = messageHandler;
this.maxParallelism = maxParallelism;
client = new SqsClientFactory().createSqsAsyncClient();
workerThreadPool =
new ThreadPoolExecutor(
maxParallelism,
maxParallelism,
0,
TimeUnit.SECONDS,
// bounded queue for back pressure
new LinkedBlockingQueue<>(100),
new CustomizableThreadFactory(threadPoolPrefix + "-pool-"),
new TimeoutBlockingPolicy(30));
}
/** Use this method by default, it is asynchronous and handles threading for you. */
public void runAsync() {Thread mainLoopThread = new Thread(this);
mainLoopThread.start();}
/**
* Use this method only if you run it in your own thread pool, it runs synchronously in the
* contextual thread.
*/
@Override
public void run() {Thread mainLoopThread = Thread.currentThread();
// JVM awaits all shutdown hooks to complete
// https://stackoverflow.com/questions/8663107/how-does-the-jvm-terminate-daemon-threads-or-how-to-write-daemon-threads-that-t
Runtime.getRuntime()
.addShutdownHook(
new Thread(() -> {
shouldShutdown = true;
mainLoopThread.interrupt();
try {workerThreadPool.shutdown();
boolean terminated = workerThreadPool.awaitTermination(1, TimeUnit.MINUTES);
if (!terminated) {List<Runnable> runnables = workerThreadPool.shutdownNow();
logger.info("shutdownNow with {} runnables undone", runnables.size());
}
} catch (RuntimeException e) {logger.error("shutdown failed", e);
throw e;
} catch (InterruptedException e) {logger.error("shutdown interrupted", e);
throw new IllegalStateException(e);
}
}));
logger.info("polling loop started");
int receiveBackoffSeconds = 1;
// "shouldShutdown" state is more reliable than Thread interrupted state
while (!shouldShutdown) {
List<Message> messages;
try {messages = receiveMessages();
// after success, restore backoff to the initial value
receiveBackoffSeconds = 1;
} catch (Throwable e) {logger.error("failed to receive", e);
logger.info("Gonna sleep {} seconds for backoff", receiveBackoffSeconds);
try {
//noinspection BusyWait
Thread.sleep(receiveBackoffSeconds * 1000L);
} catch (InterruptedException ex) {logger.error("backoff sleep interrupted", ex);
}
// after failure, increment next backoff (≤ limit)
receiveBackoffSeconds = exponentialBackoff(receiveBackoffSeconds, 60);
continue;
}
try {dispatchMessages(queueUrl, messages);
} catch (Throwable e) {logger.error("failed to dispatch", e);
}
}
}
private int exponentialBackoff(int current, int limit) {
int next = current * 2;
return Math.min(next, limit);
}
private List<Message> receiveMessages() throws ExecutionException, InterruptedException {
// visibilityTimeout = message handling timeout
// It has usually been set at infrastructure level
var receiveMessageFuture =
client.receiveMessage(ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.waitTimeSeconds(10)
.maxNumberOfMessages(maxParallelism)
.build());
// Consumer can wait infinitely for the next message, rely on library default timeout.
return receiveMessageFuture.get().messages();
}
private void dispatchMessages(String queueUrl, List<Message> messages) {for (Message message : messages) {
workerThreadPool.execute(() -> {String messageId = message.messageId();
try {logger.info("Started handling message with id={}", messageId);
messageHandler.accept(message);
logger.info("Completed handling message with id={}", messageId);
// Should delete the succeeded message
client.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.build());
logger.info("Deleted handled message with id={}", messageId);
} catch (Throwable e) {
// Logging is enough. Failed message is not deleted, will be retried at next polling.
logger.error("Failed to handle message with id=$messageId", e);
}
});
}
}
// Used by workerThreadPool
private static class TimeoutBlockingPolicy implements RejectedExecutionHandler {
private final long timeoutSeconds;
public TimeoutBlockingPolicy(long timeoutSeconds) {this.timeoutSeconds = timeoutSeconds;}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {BlockingQueue<Runnable> queue = executor.getQueue();
if (!queue.offer(r, this.timeoutSeconds, TimeUnit.SECONDS)) {throw new RejectedExecutionException("Timeout after" + timeoutSeconds + "seconds");
}
} catch (InterruptedException e) {throw new IllegalStateException(e);
}
}
}
}