原文:juejin.cn/post/6998363970037874724

前言

Rabbitmq 是应用 Erlang 语言开发的开源音讯队列零碎,基于 AMQP 实现,是一种应用程序对应用程序的通信办法,应用程序通过读写出入队列的音讯来通信,而无需专用连贯来链接它们。消息传递指的是应用程序之间通过在音讯中发送数据进行通信,而不是通过间接调用彼此通信,间接调用通常是指近程过程调用的技术。

外围组成

  • Server:又称 Broker,接管客户端的连贯,实现 AMQP 实体服务,装置 rabbitmq-server
  • Connection:连贯,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手
  • Channel:网络信道,简直所有操作都在 Channel 中进行,Channel 是进行音讯读写的通道,客户端能够建设多个 Channel,每个 Channel 代表一个会话工作。
  • Message:音讯,服务与应用程序之间传送的数据,由 Properties 和 Body 组成,Properties 能够对音讯进行润饰,比方音讯的优先级,提早等高级个性,Body 则是音讯体的内容。
  • Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的音讯路由,一个虚拟主机能够有若干个 exchange 和 queue,同一个虚拟主机外面不能有雷同名称的 exchange
  • Exchange:交换机,接管音讯,依据路由键发送音讯到绑定的队列(不具备音讯存储能力)
  • Bindings:exchange 和 queue 之间的虚构连贯,binding 中能够保留多个 routing key
  • Routing key:是一个路由规定,虚拟机能够用它来确定如何路由一个特定音讯
  • Queue:队列,也称为 Message Queue,音讯队列,保留音讯并将它们转发给消费者

Rabbitmq 音讯模式

3.1 Simple 模式

Simple 模式是最简略的一个模式,由一个生产者,一个队列,一个消费者组成,生产者将音讯通过交换机(此时,图中并没有交换机的概念,如不定义交换机,会应用默认的交换机)把音讯存储到队列,消费者从队列中取出音讯进行解决。

用 Java demo 实现此模式,举荐一个开源收费的 Spring Boot 最全教程:

https://github.com/javastacks/spring-boot-best-practice

Productor

public class Send {    private final static String QUEUE_NAME = "queue1";    public static void main(String[] args) {        // 1、创立连贯工程        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.96.109");        factory.setVirtualHost("/");        Connection connection = null;        Channel channel = null;        try {            // 2、创立连贯、通道            connection = factory.newConnection();            channel = connection.createChannel();            // 3、申明队列            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            // 音讯内容            String message = "Hello world";            // 4、发送音讯到指定队列            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));            System.out.println(" [x] Sent '" + message + "'");        } catch (TimeoutException | IOException e) {            e.printStackTrace();        } finally {            // 敞开通道            if (channel != null && channel.isOpen()) {                try {                    channel.close();                } catch (Exception e) {                    e.printStackTrace();                }            }            // 敞开连贯            if (connection != null && connection.isOpen()) {                try {                    connection.close();                } catch (Exception e) {                    e.printStackTrace();                }            }        }    }}

Customer

public class Recv {    private final static String QUEUE_NAME = "queue1";    public static void main(String[] args) throws IOException, TimeoutException {        // 1、创立连贯工程        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.96.109");        factory.setVirtualHost("/");        // 2、获取 Connection和 Channel        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        // 3、申明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody(), "UTF-8");            System.out.println(" [x] Received '" + message + "'");        };        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {        });    }}

察看可视化界面,会看到音讯先会被写入到队列中,随后又被消费者生产了。

3.2 Fanout 模式

Fanout——公布订阅模式,是一种播送机制。

此模式包含:一个生产者、一个交换机 (exchange)、多个队列、多个消费者。生产者将音讯发送到交换机,交换机不存储音讯,将音讯存储到队列,消费者从队列中取音讯。如果生产者将音讯发送到没有绑定队列的交换机上,音讯将失落。

用 Java demo 实现此模式

Productor

public class Productor {   private static final String EXCHANGE_NAME = "fanout_exchange";   public static void main(String[] args) {       // 1、创立连贯工程       ConnectionFactory factory = new ConnectionFactory();       factory.setHost("192.168.96.109");       factory.setUsername("admin");       factory.setPassword("admin");       factory.setVirtualHost("/");       Connection connection = null;       Channel channel = null;       try {           // 2、获取连贯、通道           connection = factory.newConnection();           channel = connection.createChannel();           // 音讯内容           String message = "hello fanout mode";           // 指定路由key           String routeKey = "";           String type = "fanout";           // 3、申明交换机           channel.exchangeDeclare(EXCHANGE_NAME, type);           // 4、申明队列           channel.queueDeclare("queue1", true, false, false, null);           channel.queueDeclare("queue2", true, false, false, null);           channel.queueDeclare("queue3", true, false, false, null);           channel.queueDeclare("queue4", true, false, false, null);           // 5、绑定 channel 与 queue           channel.queueBind("queue1", EXCHANGE_NAME, routeKey);           channel.queueBind("queue2", EXCHANGE_NAME, routeKey);           channel.queueBind("queue3", EXCHANGE_NAME, routeKey);           channel.queueBind("queue4", EXCHANGE_NAME, routeKey);           // 6、公布音讯           channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));           System.out.println("音讯发送胜利!");       } catch (IOException | TimeoutException e) {           e.printStackTrace();           System.out.println("音讯发送异样");       }finally {           // 敞开通道和连贯......       }   }}

Customer

public class Customer {    private static Runnable runnable = new Runnable() {        @Override        public void run() {            // 创立连贯工厂            ConnectionFactory factory = new ConnectionFactory();            factory.setHost("192.168.96.109");            factory.setUsername("admin");         factory.setPassword("admin");         factory.setVirtualHost("/");            final String queueName = Thread.currentThread().getName();            Connection connection = null;            Channel channel = null;            try {                // 获取连贯、通道                connection = factory.newConnection();                channel = connection.createChannel();                Channel finalChannel = channel;                finalChannel.basicConsume(queueName, true, new DeliverCallback() {                    @Override                    public void handle(String consumerTag, Delivery delivery) throws IOException {                        System.out.println(delivery.getEnvelope().getDeliveryTag());                        System.out.println(queueName + ":收到音讯是:" + new String(delivery.getBody(), "UTF-8"));                    }                }, new CancelCallback() {                    @Override                    public void handle(String consumerTag) throws IOException {                    }                });                System.out.println(queueName + ":开始接管音讯");            } catch (IOException |                    TimeoutException e) {                e.printStackTrace();            } finally {                // 敞开通道和连贯......            }        }    };    public static void main(String[] args) throws IOException, TimeoutException {     // 创立线程别离从四个队列中获取音讯        new Thread(runnable, "queue1").start();        new Thread(runnable, "queue2").start();        new Thread(runnable, "queue3").start();        new Thread(runnable, "queue4").start();    }}

执行完 Productor 发现四个队列中别离减少了一条音讯,而执行完 Customer 后四个队列中的音讯都被消费者生产了。

3.3 Direct 模式

Direct 模式是在 Fanout 模式根底上增加了 routing key,Fanout(公布/订阅)模式是交换机将音讯存储到所有绑定的队列中,而 Direct 模式是在此基础上,增加了过滤条件,交换机只会将音讯存储到满足 routing key 的队列中。

在上图中,咱们能够看到交换机绑定了两个队列,其中队列 Q1绑定的 routing key 为 “orange” ,队列Q2绑定的routing key 为 “black” 和 “green”。在这样的设置中,公布 routing key 为 “orange” 的音讯将被路由到 Q1,routing key 为 “black” 或 “green” 的音讯将被路由到 Q2

在 rabbitmq 中给队列绑定 routing_key,routing_key 必须是单词列表

用 Java demo 实现此模式

Productor

public class Productor {    private static final String EXCHANGE_NAME = "direct_exchange";    public static void main(String[] args) {        // 1、创立连贯工程        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.96.109");        factory.setUsername("admin");        factory.setPassword("admin");        factory.setVirtualHost("/");        Connection connection = null;        Channel channel = null;        try {            // 2、获取连贯、通道            connection = factory.newConnection();            channel = connection.createChannel();            // 音讯内容            String message = "hello direct mode";            // 指定路由key            String routeKey = "email";            String type = "direct";            // 3、申明交换机            channel.exchangeDeclare(EXCHANGE_NAME, type);            // 4、申明队列            channel.queueDeclare("queue1", true, false, false, null);            channel.queueDeclare("queue2", true, false, false, null);            channel.queueDeclare("queue3", true, false, false, null);            // 5、绑定 channel 与 queue            channel.queueBind("queue1", EXCHANGE_NAME, "email");            channel.queueBind("queue2", EXCHANGE_NAME, "sms");            channel.queueBind("queue3", EXCHANGE_NAME, "vx");   // 6、公布音讯            channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));            System.out.println("音讯发送胜利!");        } catch (IOException | TimeoutException e) {            e.printStackTrace();            System.out.println("音讯发送异样");        } finally {            // 敞开通道和连贯......        }    }}

能够通过可视化页面查看,各队列绑定的 routing_key

因为设置的 routing_key为 “email”,所以,应该只有 queue1 存储了一条音讯。

Customer 与上述 fanout 示例统一。

3.4 Topic 模式

Topic 模式是生产者通过交换机将音讯存储到队列后,交换机依据绑定队列的 routing key 的值进行通配符匹配,如果匹配通过,音讯将被存储到该队列,如果 routing key 的值匹配到了多个队列,音讯将会被发送到多个队列;如果一个队列也没匹配上,该音讯将失落。

routing_key 必须是单词列表,用点分隔,其中 * 和 # 的含意为:

  • *:1个单词
  • \#:0个或多个单词

用Java demo 实现此模式

Productor

public class Productor {    private static final String EXCHANGE_NAME = "topic_exchange";    public static void main(String[] args) {        // 1、创立连贯工程        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.96.109");        factory.setUsername("admin");        factory.setPassword("admin");        factory.setVirtualHost("/");        Connection connection = null;        Channel channel = null;        try {           // 2、获取连贯、通道            connection = factory.newConnection();            channel = connection.createChannel();            // 音讯内容            String message = "hello topic mode";            // 指定路由key            String routeKey = "com.order.test.xxx";            String type = "topic";            // 3、申明交换机            channel.exchangeDeclare(EXCHANGE_NAME, type);            // 4、申明队列            channel.queueDeclare("queue5",true,false,false,null);            channel.queueDeclare("queue6",true,false,false,null);            // 5、绑定 channel 与 queue            channel.queueBind("queue5", EXCHANGE_NAME, "*.order.#");            channel.queueBind("queue6", EXCHANGE_NAME, "#.test.*");            // 6、公布音讯            channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));            System.out.println("音讯发送胜利!");        } catch (IOException | TimeoutException e) {            e.printStackTrace();            System.out.println("音讯发送异样");        } finally {            // 敞开通道和连贯......        }    }}

执行完 Productor 后,通过可视化页面查看到,queue 绑定的 routing_key

因为上述例子中,routing_key为:“com.order.test.xxx”,那么 queue5 和 queue6 都将接管到音讯。

Customer 与上述实例一样,执行完 Customer 后,再次查看队列信息,queue5 和 queue6 的音讯都被生产了。

3.5 Work 模式

当有多个消费者时,如何平衡音讯者生产音讯的多少,次要有两种模式:

  • 轮询模式散发:按程序轮询散发,每个消费者取得雷同数量的音讯
  • 偏心散发:依据消费者生产能力偏心散发,解决快的解决的多,解决慢的解决的少,无功受禄

3.5.1 轮询散发

在这种模式下,rabbitmq 采纳轮询的形式将任务分配给多个消费者,但可能呈现一种状况,当调配给某一个消费者的工作很简单时,而有些消费者接管的工作较轻量,会呈现有的消费者很忙,而有的消费者处于闲暇的状态,而 rabbitmq 不会感知到这种状况的产生,rabbitmq 不思考消费者未确认音讯的数量,只是自觉的分配任务。

用 Java demo 实现此模式

Productor

public class Productor {    public static void main(String[] args) {        // 1、创立连贯工程        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.96.109");        factory.setUsername("admin");        factory.setPassword("admin");        factory.setVirtualHost("/");        Connection connection = null;        Channel channel = null;        try {            // 2、获取连贯、通道            connection = factory.newConnection();            channel = connection.createChannel();            // 3、向 Queue1 公布20个音讯            for (int i = 0; i < 20; i++) {                String msg = "feiyangyang: " + i;                channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8));            }            System.out.println("音讯发送胜利!");        } catch (IOException | TimeoutException e) {            e.printStackTrace();            System.out.println("音讯发送异样");        } finally {            // 敞开通道和连贯......        }    }}

Worker1

public class Worker1 {    public static void main(String[] args) {        // 1、创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.96.109");        factory.setUsername("admin");        factory.setPassword("admin");        factory.setVirtualHost("/");        Connection connection = null;        Channel channel = null;        try {            // 获取连贯、通道            connection = factory.newConnection();            channel = connection.createChannel();            Channel finalChannel = channel;            finalChannel.basicConsume("queue1", true, new DeliverCallback() {                @Override                public void handle(String consumerTag, Delivery delivery) throws IOException {                    System.out.println("Worker1" + ":收到音讯是:" + new String(delivery.getBody(), "UTF-8"));                    try {                        Thread.sleep(2000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }, new CancelCallback() {                @Override                public void handle(String consumerTag) throws IOException {                }            });            System.out.println("Worker1 开始接管音讯");            System.in.read();        } catch (IOException |                TimeoutException e) {            e.printStackTrace();        } finally {            // 敞开通道和连贯......        }    }}

Worker2 与 Worker1 雷同

咱们看下音讯散发后果:

Worker1 开始接管音讯Worker1:收到音讯是:feiyangyang: 0Worker1:收到音讯是:feiyangyang: 2Worker1:收到音讯是:feiyangyang: 4Worker1:收到音讯是:feiyangyang: 6Worker1:收到音讯是:feiyangyang: 8Worker1:收到音讯是:feiyangyang: 10Worker1:收到音讯是:feiyangyang: 12Worker1:收到音讯是:feiyangyang: 14Worker1:收到音讯是:feiyangyang: 16Worker1:收到音讯是:feiyangyang: 18Worker2 开始接管音讯Worker2:收到音讯是:feiyangyang: 1Worker2:收到音讯是:feiyangyang: 3Worker2:收到音讯是:feiyangyang: 5Worker2:收到音讯是:feiyangyang: 7Worker2:收到音讯是:feiyangyang: 9Worker2:收到音讯是:feiyangyang: 11Worker2:收到音讯是:feiyangyang: 13Worker2:收到音讯是:feiyangyang: 15Worker2:收到音讯是:feiyangyang: 17Worker2:收到音讯是:feiyangyang: 19

能够看出,轮询散发模式就是将音讯平衡的调配所有消费者。

3.5.2 偏心散发

为了解决 Work 轮询散发模式 这个问题,rabbitmq 应用带有 perfetchCount = 1 设置的 basicQos 办法。当消费者承受解决并确认前一条音讯前,不向此消费者发送新音讯,会调配给其余闲暇的消费者。

Productor 代码与上述轮询模式雷同,而 Customer 中稍作批改

Worker1

// Channel 应用 Qos 机制finalChannel.basicQos(1);finalChannel.basicConsume("queue1", false, new DeliverCallback() {    @Override    public void handle(String consumerTag, Delivery delivery) throws IOException {        System.out.println("Worker1" + ":收到音讯是:" + new String(delivery.getBody(), "UTF-8"));        try {            Thread.sleep(1000);            // 改成手动应答            finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}, new CancelCallback() {    @Override    public void handle(String consumerTag) throws IOException {    }});

上述实例相较于轮询散发模式,增加了 Qos 机制,设置值为1,代表消费者每次从队列中获取几条音讯,将 Worker1 的 sleep 工夫设置为 1s,将 Worker2 的 sleep 工夫设置为 2s,查看音讯散发后果

Worker1 开始接管音讯Worker1:收到音讯是:feiyangyang: 0Worker1:收到音讯是:feiyangyang: 2Worker1:收到音讯是:feiyangyang: 4Worker1:收到音讯是:feiyangyang: 5Worker1:收到音讯是:feiyangyang: 7Worker1:收到音讯是:feiyangyang: 8Worker1:收到音讯是:feiyangyang: 10Worker1:收到音讯是:feiyangyang: 11Worker1:收到音讯是:feiyangyang: 13Worker1:收到音讯是:feiyangyang: 14Worker1:收到音讯是:feiyangyang: 16Worker1:收到音讯是:feiyangyang: 17Worker1:收到音讯是:feiyangyang: 19Worker2 开始接管音讯Worker2:收到音讯是:feiyangyang: 1Worker2:收到音讯是:feiyangyang: 3Worker2:收到音讯是:feiyangyang: 6Worker2:收到音讯是:feiyangyang: 9Worker2:收到音讯是:feiyangyang: 12Worker2:收到音讯是:feiyangyang: 15Worker2:收到音讯是:feiyangyang: 18

当应用 Work 偏心散发模式时,要设置消费者为手动应答,并且开启 Qos 机制。

避免音讯失落机制

4.1 音讯确认

消费者实现一项工作可能须要几秒钟,如果其中一个消费者开始了一项长期工作并且只实现了局部工作而死亡,如果将 autoAck 设置为 true ,一旦 RabbitMQ 将消息传递给消费者,它会立刻将其标记为删除,在这种状况下,咱们将失落所有已分派给该特定消费者但尚未解决的音讯。

如果其中一个消费者宕了,rabbitmq 能够将其音讯调配给其余消费者。为了确保音讯不会失落,rabbitmq 采纳音讯确认,消费者发回确认音讯,通知 rabbitmq 音讯曾经被接管并解决,此时,rabbitmq 能够释怀的删除这条音讯。

如果消费者在没有发送 ack 的状况下宕了,rabbitmq 将了解为该条音讯未被消费者解决完,如果有其余消费者在线,将迅速从新交付给其余消费者,这样就能够确保不会失落音讯了。

默认状况下rabbitmq 会启用手动音讯确认,也就是 autoAck 默认为 false,一旦咱们实现了一项工作,须要手动的进行音讯确认,所以 autoAck 须要放弃为默认值 false,并应用如下办法进行手动应答。

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

4.2 长久化

rabbitmq 的音讯确认机制能够保障音讯不会失落,然而如果 rabbitmq 服务器进行,咱们的工作依然会失落。

当 rabbitmq 退出或解体时,如果不进行长久化,队列和音讯都会隐没。须要做两件事来确保音讯不会失落,将队列和音讯都标记为长久的。

  1. 设置队列长久
boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);
  1. 设置音讯长久
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

将音讯标记为持久性并不能齐全保障音讯不会失落,当 rabbitmq 接管到音讯并且还没保留时,依然有很短的工夫窗口会使音讯失落,如果须要更强的保障,能够应用发布者确认机制。

应用场景

解耦、削峰、异步

解耦

在微服务架构体系中,微服务A须要与微服务B进行通信,传统的做法是A调用B的接口。但这样做如果零碎B无法访问或连贯超时,零碎A须要期待,直到零碎B做出响应,并且A与B存在重大的耦合景象。如果引入音讯队列进行零碎AB的通信,流程是这样的:

  • 零碎A将音讯存储到音讯队列中,返回胜利信息
  • 零碎B从队列中获取音讯,进行解决操作

零碎A将音讯放到队列中,就不必关怀零碎B是否能够获取等其余事件了,实现了两个零碎间的解耦。

应用场景:

  • 短信、邮件告诉

削峰

零碎A每秒申请100个,零碎能够稳固运行,但如果在秒杀流动中,每秒并发达到1w个,但零碎最大解决能力只能每秒解决 1000 个,所以,在秒杀流动中,零碎服务器会呈现宕机的景象。如果引入 MQ ,能够解决这个问题。每秒 1w个申请会导致系统解体,那咱们让用户发送的申请都存储到队列中,因为零碎最大解决能力是每秒1000个申请,让零碎A每秒只从队列中拉取1000个申请,保证系统能稳固运行,在秒杀期间,申请大量进入到队列,积压到MQ中,而零碎每秒只从队列中取1000个申请解决。这种短暂的高峰期积压是没问题的,因为高峰期一旦过来,每秒申请数迅速递加,而零碎每秒还是从队列中取1000个申请进行解决,零碎会疾速将积压的音讯生产掉。

应用场景:

  • 秒杀流动
  • 团抢流动

异步

用户注册,须要发送注册邮件和注册短信,传统的做法有两种:串行、并行。

  • 串行形式:将注册信息写库后(50ms),发送邮件(50ms),再发送短信(50ms),工作实现后,返回客户端,共耗时(150ms)
  • 并行形式:将注册信息写库后(50ms),开启子线程让发送邮件和发送短信同时进行(50ms),返回客户端,共耗时(100ms)
  • 引入MQ,将注册信息写库(50ms),将发送邮件和短信的操作写入队列(5s),返回客户端,而消费者什么时候从队列中取音讯进行解决,不必关怀,共耗时(55ms)

应用场景:

  • 将不是必须期待响应后果的业务逻辑进行异步解决

近期热文举荐:

1.1,000+ 道 Java面试题及答案整顿(2022最新版)

2.劲爆!Java 协程要来了。。。

3.Spring Boot 2.x 教程,太全了!

4.别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!

5.《Java开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞+转发哦!