序
本文主要研究一下如何使用 reactor-rabbitmq
maven
<dependency>
<groupId>io.projectreactor.rabbitmq</groupId>
<artifactId>reactor-rabbitmq</artifactId>
<version>1.0.0.M2</version>
</dependency>
rabbitmq
参考 docker 搭建 rabbitmq 集群
当前使用的镜像是 bijukunjummen/rabbitmq-server:3.7.0,docker-compose 文件配置的账号密码为 myuser/mypass
访问 http://192.168.99.100:15672 可以查看界面
实例
@Test
public void testProducer() throws InterruptedException {
int count = 100;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
connectionFactory.setUsername(“myuser”);
connectionFactory.setPassword(“mypass”);
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection(
new Address[] {new Address(“192.168.99.100”,5672), new Address(“192.168.99.100”,5673), new Address(“192.168.99.100”,5674)},
“reactive-sender”))
.resourceCreationScheduler(Schedulers.elastic());
Sender sender = ReactorRabbitMq.createSender(senderOptions);
Flux<OutboundMessageResult> confirmations = sender.sendWithPublishConfirms(Flux.range(1, count)
.map(i -> new OutboundMessage(“”, QUEUE, (“Message_” + i).getBytes())));
CountDownLatch latch = new CountDownLatch(count);
sender.declareQueue(QueueSpecification.queue(QUEUE))
.thenMany(confirmations)
.doOnError(e -> LOGGER.error(“Send failed”, e))
.subscribe(r -> {
if (r.isAck()) {
LOGGER.info(“Message {} sent successfully”, new String(r.getOutboundMessage().getBody()));
latch.countDown();
}
});
latch.await(10, TimeUnit.SECONDS);
sender.close();
}
@Test
public void testConsumer() throws InterruptedException {
int count = 100;
CountDownLatch latch = new CountDownLatch(count);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
connectionFactory.setUsername(“myuser”);
connectionFactory.setPassword(“mypass”);
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection(
new Address[] {new Address(“192.168.99.100”,5672), new Address(“192.168.99.100”,5673), new Address(“192.168.99.100”,5674)},
“reactive-sender”))
.resourceCreationScheduler(Schedulers.elastic());
Sender sender = ReactorRabbitMq.createSender(senderOptions);
Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));
ReceiverOptions receiverOptions = new ReceiverOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection(
new Address[] {new Address(“192.168.99.100”,5672), new Address(“192.168.99.100”,5673), new Address(“192.168.99.100”,5674)},
“reactive-receiver”))
.connectionSubscriptionScheduler(Schedulers.elastic());
Receiver receiver = ReactorRabbitMq.createReceiver(receiverOptions);
Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);
Disposable disposable = queueDeclaration.thenMany(messages).subscribe(m -> {
LOGGER.info(“Received message {}”, new String(m.getBody()));
latch.countDown();
});
latch.await(10, TimeUnit.SECONDS);
disposable.dispose();
sender.close();
receiver.close();
}
由于设置了账号密码,因而需要在 ConnectionFactory 那里指定账号密码
另外由于使用了 rabbitmq 集群,因而通过 connectionSupplier 指定要连接的多个 rabbitmq 地址
这里不管是 producer 还是 consumer,都通过 queueDeclaration 进行操作
小结
reactor-rabbitmq 对 rabbitmq 的 api 进行封装,改造为 reactive streams 模式,提供了 Non-blocking Back-pressure 以及 End-to-end Reactive Pipeline 特性。
doc
reactor-rabbitmq-samples
Reactor RabbitMQ Reference Guide