关于springboot:springbootroute十三整合RabbitMQ

56次阅读

共计 4518 个字符,预计需要花费 12 分钟才能阅读完成。

这篇是 SpringBoot 整合音讯队列的第一篇文章,咱们具体介绍下音讯队列的相干内容。

音讯队列简介

1. 什么是音讯队列

MQ(Message Quene):通过典型的生产者和消费者模型,生产者一直向音讯队列中产生音讯,消费者一直的从队列中获取音讯。因为生产者和消费者都是异步的,而且生产者只关怀音讯的发送,消费者只关怀音讯的接管,没有业务逻辑的侵入,轻松实现业务解耦。

2. 音讯队列有什么用

  • 异步解决

场景形容:某商场具备注册性能,注册的时候须要发送短信验证码。

传统的做法是用户提交信息到用户服务,用户服务调用短信服务发送短信,而后给用户返回响应,这种是同步的解决形式,耗时较长。退出音讯队列后,用户间接提交信息到用户服务,将信息写入音讯队列,间接给用户返回响应,短信服务从音讯队列中读取音讯进行发送短信。

  • 利用解耦

场景形容:某商场下单流程。

传统做法是用户下单,订单零碎去查问库存零碎,如果库存零碎宕机了,则下单失败,损失订单量。退出音讯队列后,用户下单,订单零碎记录订单,将订单信息写入音讯队列,下单胜利,而后库存零碎恢复正常后去操作数据库库存(不思考库存为 0 的状况)。这样订单零碎和库存零碎就达到松耦合的目标了

  • 流量削峰

场景形容:秒杀流动。

流量过大必定会导致响应超时或零碎宕机,退出音讯队列,用户秒杀申请写入音讯队列,设置音讯队列的长度等属性,达到音讯队列最大长度后,间接返回秒杀失败,而后再去生产音讯队列的数据,实现秒杀。

RabbitMQ 简介

RabbitMQ 是用 Erlang 语言编写的,实现了高级音讯队列协定(AMQP)的消息中间件。

1. AMQP 协定概念

AMQPAMQP是一种链接协定,间接定义网络替换的数据格式,这使得实现了 AMQPprovider自身就是跨平台的。以下是 AMQP 协定模型:

  • server – 又称 broker,接管客户端的链接,实现 amqp 实体服务。
  • Connection – 链接,应用程序跟 broker 的网络链接。
  • channel – 网络信道,简直所有的操作都是在 channel 中进行,数据的流转都要在 channel 上进行。channel 是进行音讯读写的通道。客户端能够建设多个 channel,每个 channel 代表一个会话工作。
  • message – 音讯,服务器与应用程序之间传送的数据。由 properties 和 body 组成。properties 能够对音讯进行润饰,比方音讯的降级,提早等高级个性。body 就是音讯体的内容。
  • virtual host – 虚拟主机,用于进行逻辑隔离,最上层的音讯路由,一个虚拟地址外面能够有多个交换机。exchange 和音讯队列 message quene。
  • exchange – 交换机,接管音讯,依据路由器转发音讯到绑定的队列。
  • binding – 绑定,交换机和队列之间的虚构链接,绑定中能够蕴含 routing key。
  • routing key – 一个路由规定,虚拟机能够用它来确定 jiekyi 如何路由一个特定音讯。
  • quene – 音讯队列,保留音讯并将它们转发给消费者。

2. RabbitMQ 的音讯模型

1. 简略模型

在上图中:

  • p:生成者
  • C:消费者
  • 红色局部:quene,音讯队列

2. 工作模型

在上图中:

  • p:生成者
  • C1、C2:消费者
  • 红色局部:quene,音讯队列

当音讯解决比拟耗时时,就会呈现生产音讯的速度远远大于生产音讯的速度,这样就会呈现音讯沉积,无奈及时处理。这时就能够让 多个消费者绑定一个队列,去生产音讯,队列中的音讯一旦生产就会失落,因而工作不会反复执行。

3. 播送模型(fanout)

这种模型中生产者发送的音讯所有消费者都能够生产。

在上图中:

  • p:生成者
  • X:交换机
  • C1、C2:消费者
  • 红色局部:quene,音讯队列

4. 路由模型(routing)

这种模型消费者发送的音讯,不同类型的音讯能够由不同的消费者去生产。

在上图中:

  • p:生成者
  • X:交换机,接管到生产者的音讯后将音讯投递给与 routing key 齐全匹配的队列
  • C1、C2:消费者
  • 红色局部:quene,音讯队列

5. 订阅模型(topic)

这种模型和 direct 模型一样,都是能够依据 routing key 将音讯路由到不同的队列,只不过这种模型能够让队列绑定 routing key 的时候应用通配符。这种类型的 routing key 都是由一个或多个单词组成,多个单词之间用 . 宰割。

通配符介绍:

*:只匹配一个单词

#:匹配一个或多个单词

6. RPC 模型

这种模式须要告诉近程计算机运行性能并期待返回运行后果。这个过程是阻塞的。

当客户端启动时,它创立一个匿名独占回调队列。并提供名字为 call 的函数,这个 call 会发送 RPC 申请并且阻塞直到收到 RPC 运算的后果。

Spring Boot 整合 RabbitMQ

第一步:引入 pom 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:减少 RabbitMQ 服务配置信息

spring:
  rabbitmq:
    virtual-host: javatrip
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest

这里咱们用播送模型来举例应用,播送模型 (fanout) 比拟好了解,就像公众号一样,我每天推文章后,会推送给每个关注用户,他们都能够看到这条音讯。

播送模型留神点:

  1. 能够有多个队列
  2. 每个队列都须要绑定交换机
  3. 每个消费者有本人的队列
  4. 交换机把音讯发送给绑定过的所有队列

1. 定义两个队列

@Configuration
public class RabbitConfig {

    final static String queueNameA = "first-queue";
    final static String queueNameB = "second-queue";

    /***
     * 定义一个队列,设置队列属性
     * @return
     */
    @Bean("queueA")
    public Queue queueA(){Map<String,Object> map = new HashMap<>();
        // 音讯过期时长,10 秒过期
        map.put("x-message-ttl",10000);
        // 队列中最大音讯条数,10 条
        map.put("x-max-length",10);
        // 第一个参数,队列名称
        // 第二个参数,durable:长久化
        // 第三个参数,exclusive:排外的,// 第四个参数,autoDelete:主动删除
        Queue queue = new Queue(queueNameA,true,false,false,map);
        return queue;
    }
    
    @Bean("queueB")
    public Queue queueB(){Map<String,Object> map = new HashMap<>();
        // 音讯过期时长,10 秒过期
        map.put("x-message-ttl",10000);
        // 队列中最大音讯条数,10 条
        map.put("x-max-length",10);
        // 第一个参数,队列名称
        // 第二个参数,durable:长久化
        // 第三个参数,exclusive:排外的,// 第四个参数,autoDelete:主动删除
        Queue queue = new Queue(queueNameB,true,false,false,map);
        return queue;
    }
}

2. 定义扇形交换机

@Bean
public FanoutExchange fanoutExchange(){

    // 第一个参数,交换机名称
    // 第二个参数,durable,是否长久化
    // 第三个参数,autoDelete,是否主动删除
    FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true,false);
    return fanoutExchange;
}

3. 交换机和队列绑定

@Bean
public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange);
    return binding;
}

@Bean
public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange);
    return binding;
}

4. 创立两个消费者别离监听两个队列

@RabbitListener(queues = RabbitConfig.queueNameA)
@Component
@Slf4j
public class ConsumerA {

    @RabbitHandler
    public void receive(String message){log.info("消费者 A 接管到的音讯:"+message);
    }
}
@RabbitListener(queues = RabbitConfig.queueNameB)
@Component
@Slf4j
public class ConsumerB {

    @RabbitHandler
    public void receive(String message){log.info("消费者 B 接管到的音讯:"+message);
    }
}

5. 创立生产者生产音讯

@RestController
public class provider {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("send")
    public void sendMessage(){

        String message = "你好,我是 Java 旅途";
        rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message);
    }
}

这样生产者发送一条音讯后,两个消费者就能同时生产到音讯了。

此是 spring-boot-route 系列的第十三篇文章,这个系列的文章都比较简单,次要目标就是为了帮忙首次接触 Spring Boot 的同学有一个零碎的意识。本文已收录至我的 github,欢送各位小伙伴star

github:https://github.com/binzh303/s…

点关注、不迷路

如果感觉文章不错,欢送 关注 点赞 珍藏,你们的反对是我创作的能源,感激大家。

如果文章写的有问题,请不要悭吝,欢送留言指出,我会及时核查批改。

如果你还想更加深刻的理解我,能够微信搜寻「Java 旅途」进行关注。回复「1024」即可取得学习视频及精美电子书。每天 7:30 准时推送技术文章,让你的下班路不在孤单,而且每月还有送书流动,助你晋升硬实力!

正文完
 0