关于java:没用过消息队列一文带你体验RabbitMQ收发消息

37次阅读

共计 7778 个字符,预计需要花费 20 分钟才能阅读完成。

人生终将是场单人旅途,孤单之前是迷茫,孤单过后是成长。

楔子

先给大家说声道歉,最近一周都没有发文,有一些比拟要紧重要的事须要解决。

明天正好无暇,原本说筹备写 SpringIOC 相干的货色,然而发现想要梳理一遍还是须要很多工夫,所以我打算缓缓写,先把 MQ 给写了,再缓缓写其余相干的,毕竟偏实践的货色一遍要比拟难写,像 MQ 这种偏实战的大家能够 clone 代码去玩一玩,还是比拟不便的。

同时 MQ 也是 Java 进阶不用可少的技术栈之一,所以 Java 开发从业者对它是必须要理解的。

当初市面上有三种音讯队列比拟火别离是:RabbitMQRocketMQKafka

明天要讲的音讯队列中我会以 RabbitMQ 作为案例来入门,因为 SpringBoot 的 amqp 中默认只集成了 RabbitMQ,用它来讲会不便许多,且RabbitMQ 的性能和稳定性都很不错,是一款通过工夫考验的开源组件。

祝有好播种。

本文代码: 码云地址 GitHub 地址

1. ???? 音讯队列?

音讯队列(MQ)全称为 Message Queue,是一种应用程序对应用程序的通信办法。

翻译一下就是:在利用之间放一个音讯组件,而后利用单方通过这个音讯组件进行通信。

好端端的为啥要在两头放个组件呢?

小零碎其实是用不到音讯队列的,个别分布式系统才会引入音讯队列,因为分布式系统须要抗住高并发,须要多零碎解耦,更须要对用户比拟敌对的响应速度,而音讯队列的个性能够人造解耦,不便异步更能起到一个顶住高并发的削峰作用,完满解决下面的三个问题。


然万物抱阳负阴,零碎之间忽然加了个中间件,进步零碎复杂度的同时也减少了很多问题:

  • 音讯失落怎么办?
  • 音讯反复生产怎么办?
  • 某些工作须要音讯的程序音讯,程序生产怎么保障?
  • 音讯队列组件的可用性如何保障?

这些都是应用音讯队列过程中须要思考须要思考的中央,音讯队列能给你带来很大的便当,也能给你带来一些对应的麻烦。

下面说了音讯队列带来的益处以及问题,而这些不在咱们明天这篇的探讨范畴之内,我打算之后再写这些,咱们明天要做的是搭建出一个音讯队列环境,让大家感受一下根底的发消息与生产音讯,更高级的问题会放在当前探讨。

2. ????RabbitMQ 一览

RabbitMQ 是一个音讯组件,是一个 erlang 开发的 AMQP(Advanced Message Queue)的开源实现。

AMQP,即 Advanced Message Queuing Protocol, 一个提供对立音讯服务的应用层规范高级音讯队列协定, 是应用层协定的一个凋谢规范, 为面向音讯的中间件设计。

RabbitMQ 采纳了 AMQP 协定,至于这协定怎么怎么样,咱们关怀的是 RabbitMQ 构造如何且怎么用。

还是那句话,学货色须要先观其大貌,咱们要用 RabbitMQ 首先要晓得它整体是怎么样,这样才有利于咱们接下来的学习。

咱们先来看看我刚画的架构图,因为 RabbitMQ 实现了 AMQP 协定,所以这些概念也是 AMQP 中共有的。

  • Broker: 中间件自身。接管和散发音讯的利用,这里指的就是 RabbitMQ Server。
  • Virtual host: 虚拟主机。出于多租户和平安因素设计的,把 AMQP 的根本组件划分到一个虚构的分组中,相似于网络中的 namespace 概念。当多个不同的用户应用同一个 RabbitMQ server 提供的服务时,能够划分出多个 vhost,每个用户在本人的 vhost 创立 exchange/queue 等。
  • Connection: 连贯。publisher/consumer 和 broker 之间的 TCP 连贯。断开连接的操作只会在 client 端进行,Broker 不会断开连接,除非呈现网络故障或 broker 服务呈现问题。
  • Channel: 渠道。如果每一次拜访 RabbitMQ 都建设一个 Connection,在音讯量大的时候建设 TCP Connection 的开销会比拟大且效率也较低。Channel 是在 connection 外部建设的逻辑连贯,如果应用程序反对多线程,通常每个 thread 创立独自的 channel 进行通信,AMQP method 蕴含了 channel id 帮忙客户端和 message broker 辨认 channel,所以 channel 之间是齐全隔离的。Channel 作为轻量级的 Connection 极大缩小了操作系统建设 TCP connection 的开销。
  • Exchange: 路由。依据散发规定,匹配查问表中的 routing key,散发音讯到 queue 中去。
  • Queue: 音讯的队列。音讯最终被送到这里期待生产,一个 message 能够被同时拷贝到多个 queue 中。
  • Binding: 绑定。exchange 和 queue 之间的虚构连贯,binding 中能够蕴含 routing key。Binding 信息被保留到 exchange 中的查问表中,用于 message 的散发根据。

看完了这些概念,我再给大家梳理一遍其流程:

当咱们的生产者端往 Broker(RabbitMQ) 中发送了一条音讯,Broker会依据其音讯的标识送往不同的 Virtual host,而后Exchange 会依据音讯的 路由 key和交换器类型将音讯散发到本人所属的 Queue 中去。

而后消费者端会通过 Connection 中的 Channel 获取刚刚推送的音讯,拉取音讯进行生产。

Tip:某个 Exchange 有哪些属于本人的 Queue,是由Binding 绑定关系决定的。

3. ????RabbitMQ 环境

下面讲了 RabbitMQ 大略的结构图和一个音讯的运行流程,讲完了实践,这里咱们就筹备实操一下吧,先进行 RabbitMQ 装置。

官网下载地址:http://www.rabbitmq.com/downl…

因为我还没有属于本人 MAC 电脑,所以这里的演示就依照 Windows 的来了,不过大家都是程序员,装置个货色总归是难不倒大家的吧????

Windows 下载地址:https://www.rabbitmq.com/inst…

进去之后能够间接找到Direct Downloads,下载相干 EXE 程序进行装置就能够了。

因为 RabbitMQ 是由 erlang 语言编写的,所以装置之前咱们还须要装置 erlang 环境,你下载 RabbitMQ 之后间接点击装置,如果没有相干环境,安装程序会提醒你,而后会让你的浏览器关上 erlang 的下载页面,在这个页面上依据本人的零碎类型点击下载安装即可,装置结束后再去装置RabbitMQ

这两者的装置都只须要始终 NEXT 下一步就能够了。

装置实现之后能够按一下 Windows 键看到成果如下:

Tip:其中 Rabbit-Command 前面会用到,是 RabbitMQ 的命令行操作台。


装置完 RabbitMQ 咱们须要对咱们的开发环境也导入 RabbitMQ 相干的 JAR 包。

为了不便起见,咱们能够间接应用 Spring-boot-start 的形式导入,这外面也会蕴含所有咱们须要用到的 RabbitMQ 相干的 JAR 包。

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
</dependencies>

间接引入 spring-boot-starter-amqp 即可。

4. ✍Hello World

搭建好环境之后,咱们就能够上手了。

思考到这是一个入门文章,读者很多可能没有接触过RabbitMQ,间接应用主动配置的形式可能会令大家很蛊惑,因为主动配置会屏蔽很多细节,导致大家只看到了被封装后的样子,不利于大家了解。

所以在本节 Hello World 这里,我会间接应用最原始的连贯形式就行演示,让大家看到最原始的连贯的样子。

Tip:这种形式演示的代码我都在放在 prototype 包上面。

4.1 生产者

先来看看生产者代码,也就是咱们 push 音讯的代码:

    public static final String QUEUE_NAME = "erduo";

    // 创立连贯工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();

    // 连贯到本地 server
    connectionFactory.setHost("127.0.0.1");

    // 通过连贯工厂创立连贯
    Connection connection = connectionFactory.newConnection();

    // 通过连贯创立通道
    Channel channel = connection.createChannel();

    // 创立一个名为耳朵的队列,该队列非长久(RabbitMQ 重启后会隐没)、非独占(非仅用于此链接)、非主动删除(服务器将不再应用的队列删除)
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    String msg = "hello, 我是耳朵。" + LocalDateTime.now().toString();
    // 公布音讯
    // 四个参数为:指定路由器,指定 key,指定参数,和二进制数据内容
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));

    System.out.println("生产者发送音讯完结,发送内容为:" + msg);
    channel.close();
    connection.close();

代码我都给了正文,然而我还是要给大家解说一遍,梳理一下。

先通过 RabbitMQ 中的 ConnectionFactory 配置一下将要连贯的 server-host,而后创立一个新连贯,再通过此连贯创立通道(Channel),通过这个通道创立队列和发送音讯。

这里看上去还是很好了解的,我须要把创立队列和发送音讯这里再拎进去说一下。

创立队列

    AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

创立队列的办法外面有五个参数,第一个是参数是队列的名称,往后的三个参数代表不同的配置,最初一个参数是额定参数。

  • durable:代表是否将此队列长久化。
  • exclusive:代表是否独占,如果设置为独占队列则此队列仅对首次申明它的连贯可见,并在连贯断开时主动删除。
  • autoDelete:代表断开连接后是否主动删除此队列。
  • arguments:代表其余额定参数。

这些参数中 durable 常常会用到,它代表了咱们能够对队列做长久化,以保障 RabbitMQ 宕机复原后此队列也能够自行复原。

发送音讯

    void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;

发送音讯的办法里是四个参数,第一个是必须的指定 exchange,下面的示例代码中咱们传入了一个空字符串,这代表咱们交由默认的匿名 exchange 去帮咱们路由音讯。

第二个参数是路由 key,exchange 会依据此 key 对音讯进行路由转发,第三个参数是额定参数,讲音讯长久化时会用到一下,最初一个参数就是咱们要发送的数据了,须要将咱们的数据转成字节数组的形式传入。

测试

讲完了这些 API 之后,咱们能够测试一下咱们的代码了,run 一下之后,会在控制台打出如下:

这样之后咱们就把音讯发送到了 RabbitMQ 中去,此时能够关上 RabbitMQ 控制台 (前文装置时提到过) 去应用命令 rabbitmqctl.bat list_queues 去查看音讯队列当初的状况:

能够看到有一条 message 在外面,这就代表咱们的音讯曾经发送胜利了,接下来咱们能够编写一个消费者对外面的 message 进行生产了。

4.2 消费者

消费者代码和生产者的差不多,都须要建设连贯建设通道:

    // 创立连贯工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();

    // 连贯到本地 server
    connectionFactory.setHost("127.0.0.1");

    // 通过连贯工厂创立连贯
    Connection connection = connectionFactory.newConnection();

    // 通过连贯创立通道
    Channel channel = connection.createChannel();

    // 创立消费者,阻塞接管音讯
    com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("-------------------------------------------");
            System.out.println("consumerTag :" + consumerTag);
            System.out.println("exchangeName :" + envelope.getExchange());
            System.out.println("routingKey :" + envelope.getRoutingKey());
            String msg = new String(body, StandardCharsets.UTF_8);
            System.out.println("音讯内容 :" + msg);
        }
    };

    // 启动消费者生产指定队列
    channel.basicConsume(Producer.QUEUE_NAME, consumer);
//        channel.close();
//        connection.close();

建设完通道之后,咱们须要创立一个消费者对象,而后用这个消费者对象去生产指定队列中的音讯。

这个示例中咱们就是新建了一个 consumer,而后用它去生产 队列 -erduo中的音讯。

最初两句代码我给正文掉了,因为一旦把连贯也敞开了,那咱们的消费者就不能放弃生产状态了,所以要开着连贯,监听此队列。

ok,运行这段程序,而后咱们的消费者会去 队列 -erduo拿到外面的音讯,成果如下:

  • consumerTag:是这个音讯的标识。
  • exchangeName:是这个音讯所发送 exchange 的名字,咱们先前传入的是空字符串,所以这里也是空字符串。
  • exchangeName:是这个音讯所发送路由 key。

这样咱们的程序就处在一个监听的状态下,你再次调用生产者发送音讯消费者就会实时的在管制上打印消息内容。

5. ???? 音讯接管确认(ACK)

下面咱们演示了生产者和消费者,咱们生产者发送一条音讯,消费者生产一条信息,这个时候咱们的 RabbitMQ 应该有多少音讯?

实践上来说发送一条,生产一条,当初外面应该是 0 才对,然而当初的状况并不是:

音讯队列外面还是有 1 条信息,咱们重启一下消费者,又打印了一遍咱们生产过的那条音讯,通过音讯下面的工夫咱们能够看进去还是过后咱们发送的那条信息,也就是说咱们消费者生产过了之后这条信息并没有被删除。

这种情况呈现的起因是因为 RabbitMQ 音讯接管确认机制,也就是说一条信息被消费者接管到了之后,须要进行一次确认操作,这条音讯才会被删除。

RabbitMQ中默认生产确认是手动的,也能够将其设置为主动删除,主动删除模式消费者接管到音讯之后就会主动删除这条音讯,如果音讯处理过程中产生了异样,这条音讯就等于没被解决完然而也被删除掉了,所以这里咱们会始终应用手动确认模式。

音讯承受确认 (ACK) 的代码很简略,只有在原来消费者的代码里加上一句就能够了:

    com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("-------------------------------------------");
            System.out.println("consumerTag :" + consumerTag);
            System.out.println("exchangeName :" + envelope.getExchange());
            System.out.println("routingKey :" + envelope.getRoutingKey());
            String msg = new String(body, StandardCharsets.UTF_8);
            System.out.println("音讯内容 :" + msg);

            // 音讯确认
            channel.basicAck(envelope.getDeliveryTag(), false);
            System.out.println("音讯已确认");
        }
    };

咱们将代码改成如此之后,能够再 run 一次消费者,能够看看成果:

再来看看 RabbitMQ 中的队列状况:

从图中咱们能够看出音讯生产后曾经胜利被删除了,其实大胆猜一猜,主动删除应该是在咱们的代码还没执行之前就帮咱们返回了确认,所以这就导致了音讯失落的可能性。

咱们采纳手动确认的形式之后,能够先将逻辑处理完毕之后(可能出现异常的中央能够 try-catch 起来),把手动确认的代码放到最初一行,这样如果出现异常状况导致这条音讯没有被确认,那么这条音讯会在之后被从新生产一遍。

后记

明天的内容就到这里,下一篇将会咱们将会撇弃传统的手动建设连贯的形式进行发消息收音讯,而转用 Spring 帮咱们定义好的 注解 和 Spring 提供的RabbitTemplate,更不便的收发音讯。

音讯队列呢,其实用法都是一样的,只是各个开源音讯队列的侧重点稍有不同,咱们应该依据咱们本人的我的项目需要来决定咱们应该选取什么样的音讯队列来为咱们的我的项目服务,这个我的项目选型的工作个别都是开发组长帮你们做了,个别是轮不到咱们来做的,然而面试的时候可能会考查相干常识,所以这几种音讯队列咱们都应该有所涉猎。

好了,以上就是本期的全部内容,感激你能看到这里,欢送对本文点赞珍藏与评论,???? 你们的每个点赞都是我创作的最大能源。

我是耳朵,一个始终想做常识输入的伪文艺程序员,咱们下期见。

本文代码:码云地址 GitHub 地址

正文完
 0