乐趣区

关于程序员:5-分钟完成RabbitMQ入门

本篇文章将讲述 springboot 整合 rabbitmq 过程,实用于初学者疾速入门。内容包含对几种常见的队列模型阐述以及实际(路由模式、公布订阅 (播送) 模式、主题模式);因为这几种模式的队列后果类似,区别在于交换机类型的不同,因而交换机的类型决定了它们之间的工作模式。交换机类型别离对应:直连交换机(Direct Exchange)、扇型交换机(Fanout Exchange)、Topic Exchange(主题交换机)。因而以下波及的常识和代码多围绕 交换机 去开展阐述。

三种模式

路由模式

生产者将携带路由键的音讯发送给直连交换机,交换机依据路由键值转发到队列名为该值的队列

订阅模式(播送)

生产者将音讯发送给扇型交换机,交换机将音讯分发给所有绑定的队列,但留神同一队列多个消费者只有其中一个生产了音讯。

主题模式

与直连交换机相似,但它的 路由建(routing key) 和 绑定值(key)是有规定的;领有下面两种交换机性能。

规定:*:必须存在一个单词;#:存在零个或者多个单词。

理解了以上三种模式流程之后,上面将对三种模式进行代码实战,以加深印象。

代码实战

目录构造
│  RabbitmqApplication.java        # 我的项目启动文件
│
├─config
│      RabbitConfig.java        # 生产者确认音讯发送配置
│      RabbitConstants.java        # 交换机、路由键、队列名常量
│
├─controller
│      MessageController.java    # 音讯测试 API
│
├─message
│      DirectMessage.java        # 路由模式音讯
│      FanoutMessage.java        # 播送音讯
│      RabbitMessage.java        # 音讯基类
│      TopicMessage.java        # 主题音讯
│
├─receiver                    
│      MessageReceiver.java        # 音讯接管类
│
└─service
    │  RabbitService.java        
    │
    └─impl
            RabbitServiceImpl.java    # 发送音讯业务
外围代码解析
  • 音讯基类:RabbitMessage.java
    /**
     * 音讯内容
     */
    private T data;
    public RabbitMessage<T> setData(T data) {
        this.data = data;
        return this;
    }

    /**
     * 音讯惟一编号
     */
    private String messageId = UUID.randomUUID().toString();

    /**
     * 音讯创立工夫
     */
    private String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

    /**
     * 交换机
     * @return
     */
    public abstract String exchange();

    /**
     * 路由键
     * @return
     */
    public abstract String routingKey();
  • 发送音讯:RabbitServiceImpl.java
@Override
public void send(RabbitMessage message) {log.info("【生产者】发送音讯:ID:{},交换机:{},路由键:{},内容:{}", message.getMessageId(), message.exchange(), message.routingKey(), message.getData());
    if (null == message.exchange()) {rabbitTemplate.convertAndSend(message.routingKey(), message, new CorrelationData(message.getMessageId()));
    } else {rabbitTemplate.convertAndSend(message.exchange(), message.routingKey(), message, new CorrelationData(message.getMessageId()));
    }
}
  • 接管音讯:MessageReceiver.java
@RabbitListener(
    bindings = @QueueBinding(value = @Queue(value = QUEUE_DIRECT, durable = "true"),
        exchange = @Exchange(name = EXCHANGE_DIRECT, type = ExchangeTypes.DIRECT)
    )
)
@RabbitHandler
public void direct(DirectMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {log.info("【消费者】收到音讯:ID:{},交换机:{},路由键:{},内容:{}", message.getMessageId(), message.exchange(), message.routingKey(), message.getData());
    channel.basicAck(tag, false);
}
  • 依赖:pom.xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 具体请看源码,记得批改application.yml 配置哦!!!

拓展

无论是播送还是主题模式,在多节点利用中,当音讯发送至同一队列多个消费者时,只有其中一个节点生产了音讯,如何拓展为播送成果?

在 fanout 模式下,应用 redis 的公布订阅模式来告诉其它节点

如何避免音讯失落?

1、生产者确认音讯发送
2、交换机、路由、队列设置长久化模式,保障音讯不会失落
3、消费者确认生产音讯,手动 ack

如何避免音讯积压?

不举荐应用channel.basicReject(deliveryTag, false),第二个参数为是否从新放回队列,如果使用不当会导致音讯始终在生产 - 入列中循环。

可依据业务设置缓存,当缓存达到一定量时再发送音讯。

好,这篇 springboot 整合 rabbitmq 教程就暂且到此。

始终在谋求思路的传递而非代码的 COPY

退出移动版