关注公众号:Java 课代表,每日文章更新,及时获取更多常识。
2 工作队列(Work Queue)
在第一篇教程中,咱们写了两个程序用来从指定的 queue 中发送和接管音讯。这篇教程,咱们将创立一个工作队列,用来给多个 worker 散发一些 ” 耗时的 ” 工作。
工作队列 (或者称之为工作队列) 背地的思维,是用来防止立刻解决那些很耗资源并且须要期待其运行完结的工作(课代表注:说白了就是削峰)。取而代之的是,将工作安顿到稍后进行(课代表注:说白了就是异步执行)。一个后盾运行的工作程序将会接管到并执行该工作。当你运行了多个工作程序,工作队列中的工作将会被他们独特分担解决。
这个思维在 web 利用中十分有用,因为在 web 利用中,通过一个短的 http 申请窗口无奈解决简单的工作。
筹备工作(Preparation)
在后面的教程中,咱们发送了一个字符串音讯:“”Hello World!”。接下来咱们发送一些用来代表工作很简单的字符串。咱们并没有真实世界中那些像图片缩放,PDF 文件渲染之类的简单工作,所以,让咱们应用 Thread.sleep()
办法来伪装很忙。用字符串中点号的个数当做工作的复杂度:每个点号代表一秒钟的“工作”。例如:由字符串 Hello...
代表的工作将耗时 3 秒钟。
将后面例子中 Send.java 的代码略微扭转一下,使其容许任意音讯从终端输出。该利用会将工作安顿到咱们的工作队列,所以给它命名为:NewTask.java
String message = String.join(" ", argv);
channel.basicPublish("","hello", null, message.getBytes());
System.out.println("[x] Sent'" + message + "'");
老的 Recv.java 利用也要做一些改变:它须要为音讯中的每个点号伪造一秒钟的工作。它将负责接管音讯并解决工作,所以将它命名为Worker.java
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received'" + message + "'");
try {doWork(message);
} finally {System.out.println("[x] Done");
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
用来模仿执行工夫的假工作:
private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);
}
}
像教程 1 中那样编译一下(确保须要的 jar 包都在工作目录中,并且设置了环境变量:CP):
javac -cp $CP NewTask.java Worker.java
Windows 下自行将 $CP 替换为 %CP%,下同。——课代表注
轮询散发(Round-robin dispatching)
应用工作队列的劣势之一是不便横向扩大。假如工作积压了,咱们能够减少更多的 worker 程序,轻松扩大。
首先,让咱们同时运行两个 worker
实例。他们都将从队列中获取音讯,但具体是怎么运行的呢?咱们一起探索一下。
你须要关上三个终端。两个用来运行 worker
程序。这两个将会是消费者——C1 和 C2
# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
第三个终端用来公布新工作。当消费者启动之后,能够发送几个音讯:
# shell 3
java -cp $CP NewTask First message.
# => [x] Sent 'First message.'
java -cp $CP NewTask Second message..
# => [x] Sent 'Second message..'
java -cp $CP NewTask Third message...
# => [x] Sent 'Third message...'
java -cp $CP NewTask Fourth message....
# => [x] Sent 'Fourth message....'
java -cp $CP NewTask Fifth message.....
# => [x] Sent 'Fifth message.....'
让咱们看一看运行 worker 的终端打印了什么:
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认状况下,RabbitMQ 会将每个音讯按程序发送给下一个消费者。每个消费者都会被平均分配到雷同数量的音讯。这种音讯散发机制称为 轮询。
能够多运行几个 worker 实例自行尝试。
音讯确认(Message acknowledgment)
执行工作可能须要一段时间。你有没有想过,如果工作还没执行完,利用挂掉了怎么办?以咱们目前的代码,一旦 RabbitMQ 将音讯分发给了消费者,它会立即将该音讯标记为已删除。如此看来,一旦终止 worker 程序,就会失落它正在解决的音讯,以及它曾经接管,但还没开始解决的音讯。
但咱们并不心愿失落工作。如果一个 worker 利用挂掉了,咱们心愿他所解决的工作能交给给别的 worker 解决。
为了确保音讯不会失落,RabbitMQ 提供音讯确认机制。音讯确认由消费者发回,通知 RabbitMQ 某个指定的音讯曾经被接管、解决,并且 RabbitMQ 能够删掉该音讯了。
如果某个消费者没有返回 确认(ack) 就挂掉了(channel 敞开,链接敞开或者 TCP 连贯失落了),RabbitMQ 将会认为该音讯没有被正确处理,会将其从新入队(re-queue)。如果此时有其余消费者在线,RabbitMQ 会迅速将该音讯发送给他们。这样就能够保障,即便 worker 忽然挂了,音讯也不会失落。
音讯不会超时:RabbitMQ 将会在某个消费者挂掉时从新发送该音讯。即便解决一条音讯须要破费很长时间也无所谓。
手工音讯确认 默认开启。在后面的示例中咱们通过设置 autoAck=true
将其敞开了。当初咱们将标记位设为false
,并让 worker 在工作实现时发送确认信息。
channel.basicQos(1); // accept only one unack-ed message at a time (see below)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received'" + message + "'");
try {doWork(message);
} finally {System.out.println("[x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
下面的代码能够确保即便你应用 CTRL+C 进行一个正在解决音讯的 worker,也不会失落任何音讯。worker 挂掉后未被确认的音讯将会很快被从新投递。
确认音讯的发送必须和接管音讯时的 channel 雷同。尝试应用不同的 channel 返回确认将会报 channel 协定异样。具体参见确认机制的参考文档
遗记确认
一个常见的谬误就是遗记调用
basicAck
。这个简略谬误,将会导致严重后果。当你的程序处理完音讯,却遗记发送确认,音讯将会被从新投递,RabbitMQ 因为无奈删除未被确认的音讯,导致内存占用越来越多。为了不便排查此类问题,能够应用
rabbitmqctl
工具打印messages_unacknowledged
字段:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Windows 上来掉 sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
音讯长久化(Message durability)
咱们曾经学习了如何在消费者挂掉的状况下保障工作不失落。然而,如果 RabbitMQ 服务进行了,工作还是会丢。
如果没有通过配置,当 RabbitMQ 进行或解体时,它将会失落 队列 (queue) 中已有的音讯。为了防止这种状况,咱们须要将队列(queue) 和音讯(message) 都设置为 长久化(durable):
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
只管下面的命令是对的,但目前还不能正确工作。因为咱们曾经在 RabbitMQ 中申明了一个名为“hello”的非长久化队列。RabbitMQ 无奈批改已存在队列的参数。咱们能够换个思路,命名一个新的,开启长久化的队列,比方task_queue
:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
长久化参数为 true
的queueDeclare
办法须要在生产者和消费者代码中都加上。
此时,咱们能够确定,即便 RabbitMQ 重启,task_queue
这个队列也不会丢。接下来咱们通过将MessageProperties
的值设置为PERSISTENT_TEXT_PLAIN
,从而将音讯设置为长久化。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("","task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
音讯长久化的注意事项
将音讯标记为长久化并不能齐全保障音讯不失落。只管通知了
RabbitMQ
将音讯保留到磁盘,依然存在一段小的窗口期 RabbitMQ 接管了音讯但还没来得及保留。此外,RabbitMQ
不会对每条音讯都执行fsync(2)
—— 它可能刚刚被写入缓存,还没真正写到磁盘上。长久化机制并不强壮,但对于task
来说队列足够了。如果须要更牢靠的长久化,你须要应用 publisher confirms。
偏心散发(Fair dispatch)
轮询散发有时候并不能满足咱们的须要。比方在只有两个 worker 的场景下,序号为奇数的音讯波及大量运算,而序号为偶数的音讯都很简略。RabbitMQ 并不知道音讯的难易水平,他只会平均分发给两个 worker。
呈现这种状况是因为,RabbitMQ 只负责将队列中收到的音讯散发进来,他并不关怀消费者未确认的音讯数量。它只是自觉地将第 N 的音讯发给第 N 个消费者。
为了解决这个问题,咱们能够调用 basicQos
办法,将它的参数 prefetchCount 设置为 1。这将通知 RabbitMQ 同一时间内给 worker 的音讯数量不要超过 1。换句话说,在 worker 没有返回确认之前,不要给他散发新音讯。这样一来,RabbitMQ 会将音讯发送给其余不忙的 worker。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
对于队列大小
如果所有 worker 都很忙,队列有可能被塞满。你须要实时监控他的大小,或者减少 worker 的数量,或者采纳其余策略(课代表注:比方管制生产者和消费者的比例)
代码整合(Putting it all together)
最终的 NewTask.java
代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = String.join(" ", argv);
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println("[x] Sent'" + message + "'");
}
}
}
(NewTask.java 源文件)”)
Worker.java:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("[*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received'" + message + "'");
try {doWork(message);
} finally {System.out.println("[x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {
try {Thread.sleep(1000);
} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}
}
}
}
}
(Worker.java 源文件)”)
应用音讯确认并设置 prefetchCount
参数建设的工作队列。其长久化设置能够让音讯在 RabbitMQ 重启后仍然存在。
更多对于 Channel
和 MessageProperties
的内容,请拜访:JavaDocs online.
接下来咱们进入教程 3,学习如何将同一个音讯发送给多个消费者。
举荐浏览
RabbitMQ 教程 1.“Hello World”
Freemarker 教程(一)- 模板开发手册
下载的附件名总乱码?你该去读一下 RFC 文档了!
应用 Spring Validation 优雅地校验参数
深入浅出 MySQL 优先队列(你肯定会踩到的 order by limit 问题)
码字不易,欢送点赞分享。
搜寻:【Java 课代表】,关注公众号,及时获取更多 Java 干货。