关注公众号:Java课代表,每日文章更新,及时获取更多常识。

2 工作队列(Work Queue)

在第一篇教程中,咱们写了两个程序用来从指定的 queue 中发送和接管音讯。这篇教程,咱们将创立一个工作队列,用来给多个 worker 散发一些"耗时的"工作。

工作队列(或者称之为工作队列)背地的思维,是用来防止立刻解决那些很耗资源并且须要期待其运行完结的工作(课代表注:说白了就是削峰)。取而代之的是,将工作安顿到稍后进行(课代表注:说白了就是异步执行)。一个后盾运行的工作程序将会接管到并执行该工作。当你运行了多个工作程序,工作队列中的工作将会被他们独特分担解决。

这个思维在web利用中十分有用,因为在web利用中,通过一个短的http申请窗口无奈解决简单的工作。

筹备工作(Preparation)

在后面的教程中,咱们发送了一个字符串音讯:“"Hello World!”。接下来咱们发送一些用来代表工作很简单的字符串。咱们并没有真实世界中那些像图片缩放,PDF文件渲染之类的简单工作,所以,让咱们应用Thread.sleep()办法来伪装很忙。用字符串中点号的个数当做工作的复杂度:每个点号代表一秒钟的“工作”。例如:由字符串Hello...代表的工作将耗时3秒钟。

将后面例子中Send.java的代码略微扭转一下,使其容许任意音讯从终端输出。该利用会将工作安顿到咱们的工作队列,所以给它命名为:NewTask.java

String message = String.join(" ", argv);channel.basicPublish("", "hello", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");

老的 Recv.java 利用也要做一些改变:它须要为音讯中的每个点号伪造一秒钟的工作。它将负责接管音讯并解决工作,所以将它命名为Worker.java

DeliverCallback deliverCallback = (consumerTag, delivery) -> {  String message = new String(delivery.getBody(), "UTF-8");  System.out.println(" [x] Received '" + message + "'");  try {    doWork(message);  } finally {    System.out.println(" [x] Done");  }};boolean autoAck = true; // acknowledgment is covered belowchannel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

用来模仿执行工夫的假工作:

private static void doWork(String task) throws InterruptedException {    for (char ch: task.toCharArray()) {        if (ch == '.') Thread.sleep(1000);    }}

像教程1中那样编译一下(确保须要的jar包都在工作目录中,并且设置了环境变量:CP):

javac -cp $CP NewTask.java Worker.java
Windows下自行将 $CP 替换为 %CP%,下同。——课代表注

轮询散发(Round-robin dispatching)

应用工作队列的劣势之一是不便横向扩大。假如工作积压了,咱们能够减少更多的 worker 程序,轻松扩大。

首先,让咱们同时运行两个 worker 实例。他们都将从队列中获取音讯,但具体是怎么运行的呢?咱们一起探索一下。

你须要关上三个终端。两个用来运行worker程序。这两个将会是消费者——C1和C2

# shell 1java -cp $CP Worker# => [*] Waiting for messages. To exit press CTRL+C
# shell 2java -cp $CP Worker# => [*] Waiting for messages. To exit press CTRL+C

第三个终端用来公布新工作。当消费者启动之后,能够发送几个音讯:

# shell 3java -cp $CP NewTask First message.# => [x] Sent 'First message.'java -cp $CP NewTask Second message..# => [x] Sent 'Second message..'java -cp $CP NewTask Third message...# => [x] Sent 'Third message...'java -cp $CP NewTask Fourth message....# => [x] Sent 'Fourth message....'java -cp $CP NewTask Fifth message.....# => [x] Sent 'Fifth message.....'

让咱们看一看运行 worker 的终端打印了什么:

java -cp $CP Worker# => [*] Waiting for messages. To exit press CTRL+C# => [x] Received 'First message.'# => [x] Received 'Third message...'# => [x] Received 'Fifth message.....'
java -cp $CP Worker# => [*] Waiting for messages. To exit press CTRL+C# => [x] Received 'Second message..'# => [x] Received 'Fourth message....'

默认状况下,RabbitMQ 会将每个音讯按程序发送给下一个消费者。每个消费者都会被平均分配到雷同数量的音讯。这种音讯散发机制称为轮询

能够多运行几个 worker 实例自行尝试。

音讯确认(Message acknowledgment)

执行工作可能须要一段时间。你有没有想过,如果工作还没执行完,利用挂掉了怎么办?以咱们目前的代码,一旦 RabbitMQ 将音讯分发给了消费者,它会立即将该音讯标记为已删除。如此看来,一旦终止 worker 程序,就会失落它正在解决的音讯,以及它曾经接管,但还没开始解决的音讯。

但咱们并不心愿失落工作。如果一个 worker 利用挂掉了,咱们心愿他所解决的工作能交给给别的 worker 解决。

为了确保音讯不会失落,RabbitMQ 提供音讯确认机制。音讯确认由消费者发回,通知 RabbitMQ 某个指定的音讯曾经被接管、解决,并且 RabbitMQ 能够删掉该音讯了。

如果某个消费者没有返回确认(ack) 就挂掉了(channel 敞开,链接敞开或者TCP连贯失落了),RabbitMQ 将会认为该音讯没有被正确处理,会将其从新入队(re-queue)。如果此时有其余消费者在线,RabbitMQ 会迅速将该音讯发送给他们。这样就能够保障,即便 worker 忽然挂了,音讯也不会失落。

音讯不会超时:RabbitMQ 将会在某个消费者挂掉时从新发送该音讯。即便解决一条音讯须要破费很长时间也无所谓。

手工音讯确认 默认开启。在后面的示例中咱们通过设置autoAck=true将其敞开了。当初咱们将标记位设为false,并让worker 在工作实现时发送确认信息。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)DeliverCallback deliverCallback = (consumerTag, delivery) -> {  String message = new String(delivery.getBody(), "UTF-8");  System.out.println(" [x] Received '" + message + "'");  try {    doWork(message);  } finally {    System.out.println(" [x] Done");    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  }};boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

下面的代码能够确保即便你应用 CTRL+C 进行一个正在解决音讯的worker,也不会失落任何音讯。worker 挂掉后未被确认的音讯将会很快被从新投递。

确认音讯的发送必须和接管音讯时的 channel 雷同。尝试应用不同的 channel 返回确认将会报 channel 协定异样。具体参见确认机制的参考文档

遗记确认

一个常见的谬误就是遗记调用basicAck。这个简略谬误,将会导致严重后果。当你的程序处理完音讯,却遗记发送确认,音讯将会被从新投递,RabbitMQ 因为无奈删除未被确认的音讯,导致内存占用越来越多。

为了不便排查此类问题,能够应用 rabbitmqctl 工具打印 messages_unacknowledged 字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Windows上来掉 sudo :

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

音讯长久化(Message durability)

咱们曾经学习了如何在消费者挂掉的状况下保障工作不失落。然而,如果 RabbitMQ 服务进行了,工作还是会丢。

如果没有通过配置,当 RabbitMQ 进行或解体时,它将会失落 队列(queue) 中已有的音讯。为了防止这种状况,咱们须要将队列(queue) 和音讯(message) 都设置为长久化(durable)

boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);

只管下面的命令是对的,但目前还不能正确工作。因为咱们曾经在 RabbitMQ 中申明了一个名为“hello”的非长久化队列。RabbitMQ 无奈批改已存在队列的参数。咱们能够换个思路,命名一个新的,开启长久化的队列,比方task_queue

boolean durable = true;channel.queueDeclare("task_queue", durable, false, false, null);

长久化参数为truequeueDeclare 办法须要在生产者和消费者代码中都加上。

此时,咱们能够确定,即便 RabbitMQ 重启,task_queue 这个队列也不会丢。接下来咱们通过将MessageProperties 的值设置为PERSISTENT_TEXT_PLAIN,从而将音讯设置为长久化。

import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",            MessageProperties.PERSISTENT_TEXT_PLAIN,            message.getBytes());

音讯长久化的注意事项

将音讯标记为长久化并不能齐全保障音讯不失落。只管通知了RabbitMQ将音讯保留到磁盘,依然存在一段小的窗口期RabbitMQ接管了音讯但还没来得及保留。此外,RabbitMQ不会对每条音讯都执行 fsync(2) —— 它可能刚刚被写入缓存,还没真正写到磁盘上。长久化机制并不强壮,但对于task 来说队列足够了。如果须要更牢靠的长久化,你须要应用 publisher confirms。

偏心散发(Fair dispatch)

轮询散发有时候并不能满足咱们的须要。比方在只有两个 worker 的场景下,序号为奇数的音讯波及大量运算,而序号为偶数的音讯都很简略。RabbitMQ 并不知道音讯的难易水平,他只会平均分发给两个 worker。

呈现这种状况是因为,RabbitMQ 只负责将队列中收到的音讯散发进来,他并不关怀消费者未确认的音讯数量。它只是自觉地将第N的音讯发给第N个消费者。

为了解决这个问题,咱们能够调用 basicQos办法,将它的参数 prefetchCount 设置为 1。这将通知 RabbitMQ 同一时间内给 worker 的音讯数量不要超过 1。换句话说,在 worker 没有返回确认之前,不要给他散发新音讯。这样一来,RabbitMQ 会将音讯发送给其余不忙的 worker。

int prefetchCount = 1;channel.basicQos(prefetchCount);

对于队列大小

如果所有 worker 都很忙,队列有可能被塞满。你须要实时监控他的大小,或者减少 worker 的数量,或者采纳其余策略(课代表注:比方管制生产者和消费者的比例)

代码整合(Putting it all together)

最终的 NewTask.java 代码如下:

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;public class NewTask {  private static final String TASK_QUEUE_NAME = "task_queue";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    try (Connection connection = factory.newConnection();         Channel channel = connection.createChannel()) {        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);        String message = String.join(" ", argv);        channel.basicPublish("", TASK_QUEUE_NAME,                MessageProperties.PERSISTENT_TEXT_PLAIN,                message.getBytes("UTF-8"));        System.out.println(" [x] Sent '" + message + "'");    }  }}

(NewTask.java 源文件)")

Worker.java:

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;public class Worker {  private static final String TASK_QUEUE_NAME = "task_queue";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    final Connection connection = factory.newConnection();    final Channel channel = connection.createChannel();    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");    channel.basicQos(1);    DeliverCallback deliverCallback = (consumerTag, delivery) -> {        String message = new String(delivery.getBody(), "UTF-8");        System.out.println(" [x] Received '" + message + "'");        try {            doWork(message);        } finally {            System.out.println(" [x] Done");            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    };    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });  }  private static void doWork(String task) {    for (char ch : task.toCharArray()) {        if (ch == '.') {            try {                Thread.sleep(1000);            } catch (InterruptedException _ignored) {                Thread.currentThread().interrupt();            }        }    }  }}

(Worker.java 源文件)")

应用音讯确认并设置prefetchCount参数建设的工作队列。其长久化设置能够让音讯在 RabbitMQ 重启后仍然存在。

更多对于 ChannelMessageProperties 的内容,请拜访:JavaDocs online.

接下来咱们进入教程3,学习如何将同一个音讯发送给多个消费者。


举荐浏览
RabbitMQ教程 1.“Hello World”

Freemarker 教程(一)-模板开发手册

下载的附件名总乱码?你该去读一下 RFC 文档了!

应用Spring Validation优雅地校验参数

深入浅出 MySQL 优先队列(你肯定会踩到的order by limit 问题)


码字不易,欢送点赞分享。
搜寻:【Java课代表】,关注公众号,及时获取更多Java干货。