Amazon SQS是AWS上支流的音讯队列服务,按理说它是有SDK的,那么为什么还要本人编写客户端呢?因为它提供的SDK太简略,就几个Web API,没有方法间接用。咱们具体来说一说。

SQS SDK中的API,咱们次要用到的也就是getQueueUrl, sendMessage, receiveMessage等。getQueueUrl能依据传入的queueName查找到queueUrl,后续用这个queueUrl来拜访相应的queue(即:调用sendMessage发消息,或调用receiveMessage收音讯)。次要复杂度在于收音讯:这个API是要被动调用的,可是你怎么晓得有没有新音讯须要你去收呢?事实上,这个receiveMessage API是基于拉模式(pull mode)的,你须要轮询来不停地拉取新音讯,这个比拟像Kafka。随之而来的,就须要线程治理,须要一个对SDK做了进一步包装的客户端库。

Spring Cloud Messaging提供了SQS的客户端库。然而当咱们在2023年3月构建基于SQS的应用程序时,咱们用的是AWS SDK V2,而Spring Cloud Messaging尚未正式反对AWS SDK V2。因而,咱们决定本人编写SQS的客户端库。而且咱们的设计也与Spring Cloud Messaging的有所不同:咱们同时应用多个AWS账号,为此,咱们间接在配置中援用queueUrl(它其实是动态值,可间接援用);而Spring Cloud Messaging只能在配置中援用queueName,而后再运行时获取以后AWS账号中相应的queueUrl。

当初就来讲一讲设计与实现。音讯队列客户端遵循生产者-消费者模型,分为Producer和Consumer。SQS的音讯体必须是不大于256KB的文本,因而能够把音讯体当成一个String。

Producer

Producer很简略,把音讯收回去就行了,顺便对超时和异样做适当的解决。库的用户能够自行决定音讯体的序列化和反序列化形式,咱们不干预这件事。

Producer的应用形式很简略:

new SqsMessageProducer(queueUrl)    .produce(yourMessagePayload);

Producer的残缺实现代码大抵如下:

/** How to use: Call produce() with your serialized message string. */public class SqsMessageProducer {  private final String queueUrl;  private final int timeoutSeconds;  private final SqsAsyncClient client;  public SqsMessageProducer(String queueUrl, int timeoutSeconds) {    this.queueUrl = queueUrl;    this.timeoutSeconds = timeoutSeconds;    client = new SqsClientFactory().createSqsAsyncClient();  }  public void produce(String payload) {    var sendMessageFuture =        client.sendMessage(            SendMessageRequest.builder().queueUrl(queueUrl).messageBody(payload).build());    // 不能有限期待future,要有超时机制    try {      sendMessageFuture.get(timeoutSeconds, TimeUnit.SECONDS);    } catch (InterruptedException | ExecutionException | TimeoutException e) {      throw new ProducerException(e);    }  }  public static class ProducerException extends RuntimeException {    public ProducerException(Throwable cause) {      super(cause);    }  }}

如果想进一步提高Producer的性能,能够让它异步获取sendMessageFuture的后果,不必同步期待。然而这么做会升高可靠性,不能保障调用了Producer就肯定胜利发送了音讯,因而须要衡量。

Consumer

Consumer的应用形式很简略,无效利用了函数式编程格调,不须要编写派生类,只须要创立Consumer的实例,传入一个音讯处理函数,而后启动就能够。示例代码如下:

new SqsMessageConsumer(queueUrl, yourCustomizedThreadNamePrefix, yourMessageHandler)  .runAsync();

Consumer的实现要简单一些,须要实现音讯驱动的异步计算格调。解决音讯个别会比收取音讯更花工夫,因而它创立一个主循环线程用来轮询音讯队列,创立一个工作线程池用来解决音讯。主循环线程每次可能收到0~n个音讯,把收到的音讯分发给工作线程池来解决。因为工作线程池自带工作队列用于缓冲,所以这两种线程之间是互不阻塞的:如果工作线程慢了,主循环线程能够照常收取和散发新音讯;如果主循环线程慢了,工作线程能够照常解决已有的音讯。

留神一个要点:SQS不会主动清理已被收取的音讯,因为它不晓得你是否胜利解决了音讯。当一个音讯被收取后,它会临时被暗藏,免得其余消费者收到它,如果此音讯始终没有被清理,它会在一段时间后(默认30秒,可配置)从新呈现,被某个消费者再度收取。你须要一个机制来被动告知SQS某条音讯已被解决,这个机制就是deleteMessage API:胜利解决一个音讯后,被动调deleteMessage来从队列中删除此音讯;如果解决失败,什么都不必做,SQS会在一段时间后再次让消费者收取到此音讯。

外围代码这么写:

private volatile boolean shouldShutdown = false;// 只有没有敞开,主循环就始终收取音讯while (!shouldShutdown) {  List<Message> messages;  try {    messages = receiveMessages();  } catch (Throwable e) {    logger.error("failed to receive", e);    continue;  }  try {    dispatchMessages(queueUrl, messages);  } catch (Throwable e) {    logger.error("failed to dispatch", e);  }}// 收音讯的具体实现private List<Message> receiveMessages() throws ExecutionException, InterruptedException {  // visibilityTimeout = message handling timeout  // It is usually set at infrastructure level  var receiveMessageFuture =      client.receiveMessage(          ReceiveMessageRequest.builder()              .queueUrl(queueUrl)              .waitTimeSeconds(10)              .maxNumberOfMessages(maxParallelism)              .build());  // 下面已在申请中设置waitTimeSeconds=10,所以这里能够不设置超时  return receiveMessageFuture.get().messages();}// 把收到音讯分发给工作线程池做解决// 要显式地把解决好的音讯从队列中删除// 如果不删除,会在将来再次被主循环收取到private void dispatchMessages(String queueUrl, List<Message> messages) {  for (Message message : messages) {    workerThreadPool.execute(        () -> {          String messageId = message.messageId();          try {            logger.info("Started handling message with id={}", messageId);            messageHandler.accept(message);            logger.info("Completed handling message with id={}", messageId);            // Should delete the succeeded message            client.deleteMessage(                DeleteMessageRequest.builder()                    .queueUrl(queueUrl)                    .receiptHandle(message.receiptHandle())                    .build());            logger.info("Deleted handled message with id={}", messageId);          } catch (Throwable e) {            // Logging is enough. Failed message is not deleted, and will be retried on a future polling.            logger.error("Failed to handle message with id=$messageId", e);          }        });  }}

在以上代码中,每次receiveMessage时设置waitTimeSeconds=10,即最多期待10秒,若没有新音讯就返回0条音讯;若有新音讯,就提前返回所收到的1或多条音讯。之所以不有限期待,是怕网关主动敞开长时间静默的网络连接。

还须要一个优雅敞开机制,让服务器能顺利敞开和清理资源:

Thread mainLoopThread = Thread.currentThread();// JVM awaits all shutdown hooks to complete// https://stackoverflow.com/questions/8663107/how-does-the-jvm-terminate-daemon-threads-or-how-to-write-daemon-threads-that-tRuntime.getRuntime()    .addShutdownHook(        new Thread(            () -> {              shouldShutdown = true;              mainLoopThread.interrupt();              try {                workerThreadPool.shutdown();                boolean terminated = workerThreadPool.awaitTermination(1, TimeUnit.MINUTES);                if (!terminated) {                  List<Runnable> runnables = workerThreadPool.shutdownNow();                  logger.info("shutdownNow with {} runnables undone", runnables.size());                }              } catch (RuntimeException e) {                logger.error("shutdown failed", e);                throw e;              } catch (InterruptedException e) {                logger.error("shutdown interrupted", e);                throw new IllegalStateException(e);              }            }));

有时网络连接不稳固,主循环频繁报错比拟noisy,改成指数退却的重试:

while (!shouldShutdown) {  List<Message> messages;  try {    messages = receiveMessages();    // after success, restore backoff to the initial value    receiveBackoffSeconds = 1;  } catch (Throwable e) {    logger.error("failed to receive", e);    logger.info("Gonna sleep {} seconds for backoff", receiveBackoffSeconds);    try {      //noinspection BusyWait      Thread.sleep(receiveBackoffSeconds * 1000L);    } catch (InterruptedException ex) {      logger.error("backoff sleep interrupted", ex);    }    // after failure, increment next backoff (≤ limit)    receiveBackoffSeconds = exponentialBackoff(receiveBackoffSeconds, 60);    continue;  }  try {    dispatchMessages(queueUrl, messages);  } catch (Throwable e) {    logger.error("failed to dispatch", e);  }}private int exponentialBackoff(int current, int limit) {  int next = current * 2;  return Math.min(next, limit);}

工作线程池是一个ThreadPoolExecutor,应用一个有界的BlockingQueue来实现回压(back-pressure),当这个queue一满,主循环线程就会被迫暂停,以避免本地的音讯积压过多:如果积压过多,既会节约内存,又会导致很多音讯被收取却得不到及时处理,这时还不如让给其余消费者实例去收取。创立工作线程池的相干代码如下:

workerThreadPool =    new ThreadPoolExecutor(        maxParallelism,        maxParallelism,        0,        TimeUnit.SECONDS,        // bounded queue for back pressure        new LinkedBlockingQueue<>(100),        new CustomizableThreadFactory(threadPoolPrefix + "-pool-"),        new TimeoutBlockingPolicy(30));// Used by workerThreadPoolprivate static class TimeoutBlockingPolicy implements RejectedExecutionHandler {  private final long timeoutSeconds;  public TimeoutBlockingPolicy(long timeoutSeconds) {    this.timeoutSeconds = timeoutSeconds;  }  @Override  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {    try {      BlockingQueue<Runnable> queue = executor.getQueue();      if (!queue.offer(r, this.timeoutSeconds, TimeUnit.SECONDS)) {        throw new RejectedExecutionException("Timeout after " + timeoutSeconds + " seconds");      }    } catch (InterruptedException e) {      throw new IllegalStateException(e);    }  }}

Consumer的残缺实现代码大抵如下:

/** * How to use: * 1. create a consumer instance with a queue name and a stateless messageHandler function. * 2. call runAsync() method to start listening to the queue. */public class SqsMessageConsumer implements Runnable {  private static final Logger logger = LoggerFactory.getLogger(SqsMessageConsumer.class);  private final String queueUrl;  private final Consumer<Message> messageHandler;  private final int maxParallelism;  private final SqsAsyncClient client;  private final ExecutorService workerThreadPool;  private volatile boolean shouldShutdown = false;  public SqsMessageConsumer(      String queueUrl,      String threadPoolPrefix,      Consumer<Message> messageHandler) {    this(queueUrl, threadPoolPrefix, messageHandler, 8);  }  public SqsMessageConsumer(      String queueUrl,      String threadPoolPrefix,      Consumer<Message> messageHandler,      int maxParallelism) {    this.queueUrl = queueUrl;    this.messageHandler = messageHandler;    this.maxParallelism = maxParallelism;    client = new SqsClientFactory().createSqsAsyncClient();    workerThreadPool =        new ThreadPoolExecutor(            maxParallelism,            maxParallelism,            0,            TimeUnit.SECONDS,            // bounded queue for back pressure            new LinkedBlockingQueue<>(100),            new CustomizableThreadFactory(threadPoolPrefix + "-pool-"),            new TimeoutBlockingPolicy(30));  }  /** Use this method by default, it is asynchronous and handles threading for you. */  public void runAsync() {    Thread mainLoopThread = new Thread(this);    mainLoopThread.start();  }  /**   * Use this method only if you run it in your own thread pool, it runs synchronously in the   * contextual thread.   */  @Override  public void run() {      Thread mainLoopThread = Thread.currentThread();      // JVM awaits all shutdown hooks to complete      // https://stackoverflow.com/questions/8663107/how-does-the-jvm-terminate-daemon-threads-or-how-to-write-daemon-threads-that-t      Runtime.getRuntime()        .addShutdownHook(            new Thread(                () -> {                  shouldShutdown = true;                  mainLoopThread.interrupt();                  try {                    workerThreadPool.shutdown();                    boolean terminated = workerThreadPool.awaitTermination(1, TimeUnit.MINUTES);                    if (!terminated) {                      List<Runnable> runnables = workerThreadPool.shutdownNow();                      logger.info("shutdownNow with {} runnables undone", runnables.size());                    }                  } catch (RuntimeException e) {                    logger.error("shutdown failed", e);                    throw e;                  } catch (InterruptedException e) {                    logger.error("shutdown interrupted", e);                    throw new IllegalStateException(e);                  }                }));    logger.info("polling loop started");    int receiveBackoffSeconds = 1;    // "shouldShutdown" state is more reliable than Thread interrupted state    while (!shouldShutdown) {      List<Message> messages;      try {        messages = receiveMessages();        // after success, restore backoff to the initial value        receiveBackoffSeconds = 1;      } catch (Throwable e) {        logger.error("failed to receive", e);        logger.info("Gonna sleep {} seconds for backoff", receiveBackoffSeconds);        try {          //noinspection BusyWait          Thread.sleep(receiveBackoffSeconds * 1000L);        } catch (InterruptedException ex) {          logger.error("backoff sleep interrupted", ex);        }        // after failure, increment next backoff (≤ limit)        receiveBackoffSeconds = exponentialBackoff(receiveBackoffSeconds, 60);        continue;      }      try {        dispatchMessages(queueUrl, messages);      } catch (Throwable e) {        logger.error("failed to dispatch", e);      }    }  }  private int exponentialBackoff(int current, int limit) {    int next = current * 2;    return Math.min(next, limit);  }  private List<Message> receiveMessages() throws ExecutionException, InterruptedException {    // visibilityTimeout = message handling timeout    // It has usually been set at infrastructure level    var receiveMessageFuture =        client.receiveMessage(            ReceiveMessageRequest.builder()                .queueUrl(queueUrl)                .waitTimeSeconds(10)                .maxNumberOfMessages(maxParallelism)                .build());    // Consumer can wait infinitely for the next message, rely on library default timeout.    return receiveMessageFuture.get().messages();  }  private void dispatchMessages(String queueUrl, List<Message> messages) {    for (Message message : messages) {      workerThreadPool.execute(          () -> {            String messageId = message.messageId();            try {              logger.info("Started handling message with id={}", messageId);              messageHandler.accept(message);              logger.info("Completed handling message with id={}", messageId);              // Should delete the succeeded message              client.deleteMessage(                  DeleteMessageRequest.builder()                      .queueUrl(queueUrl)                      .receiptHandle(message.receiptHandle())                      .build());              logger.info("Deleted handled message with id={}", messageId);            } catch (Throwable e) {              // Logging is enough. Failed message is not deleted, will be retried at next polling.              logger.error("Failed to handle message with id=$messageId", e);            }          });    }  }  // Used by workerThreadPool  private static class TimeoutBlockingPolicy implements RejectedExecutionHandler {    private final long timeoutSeconds;    public TimeoutBlockingPolicy(long timeoutSeconds) {      this.timeoutSeconds = timeoutSeconds;    }    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {      try {        BlockingQueue<Runnable> queue = executor.getQueue();        if (!queue.offer(r, this.timeoutSeconds, TimeUnit.SECONDS)) {          throw new RejectedExecutionException("Timeout after " + timeoutSeconds + " seconds");        }      } catch (InterruptedException e) {        throw new IllegalStateException(e);      }    }  }}