关于rabbitmq:RabbitMQ由浅入深入门全总结一

51次阅读

共计 38278 个字符,预计需要花费 96 分钟才能阅读完成。

写在最后面

间隔上一次发文章曾经很久了,其实这段时间始终也没有停笔,只不过在忙着找工作还有学校结课的事件,从新弄了一下博客,前面也会陆陆续续会把文章最近更新进去~

  • 这篇文章有点长,就分了两篇
  • PS:那个 Github 上 Java 常识问答的文章也没有停笔,最近也会陆续更新

1. 浅浅道来

1.1 什么是中间件?

IDC(互联网数据中心)的定义:中间件是一种独立的系统软件服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源,中间件位于客户机服务器的操作系统之上,治理计算资源和网络通信。

首先,中间件是某一类软件的总称,而不是某一种具体的软件 。它是一种位于 平台(操作系统硬件)和 应用程序之间 的通用服务,它屏蔽了底层操作系统的各种复杂性,加重了开发人员的技术累赘,同时它的设计不针对某一具体指标,而是提供具备广泛通用特点的功能模块服务,这些服务具备规范的程序接口和协定,依据平台的不同,也能够有不同的实现。

艰深的例子(仅供参考,并不算完全一致):

  • 我开了一家咖啡店,我身边有 A B C 等 n 家咖啡豆的供应商,然而我必定要筛选价格又实惠,品质还不错的豆子,然而市场是受到多方面因素稳定的,可能我当初的抉择,在一段时间后曾经不是最佳选项了。所以我专门找到一家市场中介,让他帮我操心这一摊子事件,我只和你说清价格和品质要求,你去找就是了,过程我一点也不操心。这个中介的概念,就相似中间件的

1.1.1 分布式的概念(补充)

这一段,来自我之前写的 Dubbo 入门的那篇文章哈

在百度以及维基中的定义都绝对业余且艰涩,大部分博客或者教程常常会应用《分布式系统原理和范型》中的定义,即:“分布式系统是若干独立计算机的汇合,这些计算机对于用户来说就像是单个相干零碎”

上面咱们用一些篇幅来艰深的解释一下什么叫做分布式

1.1.1.1 什么是集中式零碎

提到分布式,不得不提的就是 “集中式零碎”,这个概念最好了解了,它就是将性能,程序等装置在同一台设施上,就由这一台主机设施向外提供服务

举个最简略的例子:你拿一台 PC 主机,将其改装成了一台简略的服务器,配置好各种内容后,你将 MySQL,Web 服务器,FTP,Nginx 等等,全副装置在其中,打包部署我的项目后,就能够对外提供服务了,然而一旦这台机器无论是软件还是硬件呈现了问题,整个零碎都会受到重大的株连谬误,鸡蛋放在一个篮子里,要打就全打了

1.1.12 什么是分布式系统

既然集中式零碎有这样一种牵一发而动全身的问题,那么分布式的其中一个作用,天然是来解决这样的问题了,正如定义中所知,分布式系统在用户的体验感官里,就像传统的单零碎一样,一些变动都是这个零碎自身外部进行的,对于用户并没有什么太大的感觉

例如:淘宝,京东这种大型电商平台,它们的主机都是数以万计的,否则基本没法解决大量的数据和申请,具体其中有什么划分,以及操作,咱们上面会说到,然而对于用户的咱们,咱们不须要也不想关怀这些,咱们仍能够单纯的认为,咱们面对的就是“淘宝”这一台“主机”

所以分布式的一个绝对业余一些的说法是这样的(过程粒度)两个或者多个程序,别离运行在不同的主机过程上,它们互相配合协调,实现独特的性能,那么这几个程序之间形成的零碎就能够叫做分布式系统

  • 这几者都是雷同的程序 —— 分布式
  • 这几者都是不同的程序 —— 集群

1.2 什么是消息中间件 / 音讯队列(MQ)

消息中间件,顾名思义就是用来解决音讯相干服务的中间件,它提供了一种零碎之间通信交互的通道,例如发送方只须要把想传输的信息交给消息中间件,而发送的协定,形式,发送过程中呈现的网络,故障等等问题,都由中间件进行解决,因而它负责保障信息的牢靠传输。

所以消息中间件,就是一种用来承受数据,存储数据,发送数据的技术,它提供了各种性能,能够实现音讯的高可用,高牢靠,也提供了很好的容错机制等。能够程序对系统资源的占用,以及传输效率的晋升有很大帮忙。

  • 常说的 MQ 就是指音讯队列,即 Message Quene,常见的音讯队列有,经典的 ActivieMQ,热门的 Kafka,阿里的 RocketMQ 等等,以及这里解说的 RabbitMQ。

    • 不同的 MQ 有着不同的特点,以及其更加善于的方向,倒也说不上谁好谁坏,只有谁更适合。

1.2.1 音讯队列利用场景

依据业务的须要,其实它能够有多种利用场景,例如解耦,削峰填谷,播送等,咱们举两个场景来梳理一下简略的过程

1.2.1.1 业务解耦

最近在思考买几本书看,就以买书下订单举例,当我点击购买之后,可能会有这么一串业务逻辑执行,① 减去库存容量 ② 生成订单 ③ 领取 ④ 更新订单状态 ⑤ 发送购买胜利短信 ⑥ 更新商品快递揽收状态。在初期阶段,咱们齐全能够让这些业务同步执行,然而前期为了晋升效率,就能够将须要立刻执行的工作和可稍缓执行的工作进行拆散,例如 ⑤ 发送购买胜利短信 ⑥ 更新商品快递揽收状态,都能够思考异执行。在主流程执行完结后,这些可稍缓的业务能够通过给 MQ 发送音讯,就断定曾经执行,保障流程先完结。而后再通过拉取 MQ 音讯,或者 MQ 被动推送去异步执行其余的业务。

1.2.1.2 削峰填谷

例如发送一条带有已读未读标识的布告信息,所以须要对每一个用户都写一条这样的布告音讯,例如存到 MongoDB 中,即使 MongoDB 也撑持不下来刹时写入百万、千万记录的状况,所以能够思考应用音讯队列。比如说咱们能够在 Java 后端系统下面,用异步多线程的办法,向音讯队列 MQ 中发送音讯,这样 Web 零碎发布公告音讯的时候就不占用数据库失常的 CRUD 操作。零碎音讯保留在音讯队列中,咱们只是用它来做削峰填谷,零碎音讯最终还是要存储在数据库下面。于是咱们能够这样设计,在用户登陆零碎的时候,用异步线程从音讯队列 MQ 中,接管该用户的零碎音讯,而后把零碎音讯存储在数据库中,最初音讯队列 MQ 中的该条音讯主动删除。因为用户的错峰登录,所以往数据库中写入音讯的工作也变成了错峰写入。

1.3 什么是 RabbitMQ

RabbitMQ 是一个应用 Erlang 语言编写,且遵循 AMQP 协定的开源音讯队列零碎,反对多种客户端(语言),用于在分布式系统中存储音讯,转发音讯,具备高可用,高可扩性,易用性等特色。

更具体的介绍能够间接看一下官网:

  • https://www.rabbitmq.com/

总之这就是一种常见的音讯队列,它的这些特点,都会在前面逐条解说到,咱们首先从入门下载安装局部先说起,而后再到应用。

2. 下载与装置

一般来说,装置的形式有手动装置和 Docker 装置,大部分场景下,都会应用 Docker 装置,然而作为学习阶段,如果不是特地焦急,学习一下手动装置,也不是什么好事。

注:云服务器和虚拟机都能够,演示的 Linux 版本为 CentOS 7.9

2.1 手动装置

2.1.1 下载安装过程

注:能够在 Linux 中通过 yum 间接下载安装,这里抉择了在本人的 Windows 主机先下载文件,而后再通过 FTP 传到 Linux 上,间接装置。能够防止虚拟机上因为网络而造成的一些下载问题。

  1. 首先关上官网的下载目录,而后依据本人 Linux 的版本,抉择版本。

    • 地址:https://www.rabbitmq.com/down…

  1. 因为 RabbitMQ 是 Erlang 语言编写的,所以还须要提供 Erlang 环境,接着去下载 Erlang。

    • 地址:https://www.erlang-solutions….

      • A:此网站访问速度极慢,请急躁期待,或者须要挂上梯子
      • B:Erlang 版本须要与 RabbitMQ 匹配(如图,有最大和最小版本的限度)

        • 版本查看地址:https://www.rabbitmq.com/whic…
        • 这里抉择了 RabbitMQ 3.8.14 和 Erlang 23.2.3

  1. 将文件上传到 Linux 中(我这里指定地位是 /usr/local/bin/rabbitmq,能够本人更改抉择)

    • 当初很多 Shell 软件都自带内置的 FTP 上传,例如 FinalShell,MobaXterm 等等
    • 上传后的文件和目录地位如下
[root@centos7 rabbitmq]# ls
esl-erlang_23.2.3-1_centos_7_amd64.rpm  rabbitmq-server-3.8.14-1.el7.noarch.rpm
[root@centos7 rabbitmq]# pwd
/usr/local/bin/rabbitmq
  1. 装置 Erlang、Socat 和 RabbitMQ

    • Erlang、Socat 都是 RabbitMQ 所依赖的
# 装置 Erlang,装置后执行 erl -v 显示版本号则代表胜利
rpm -ivh esl-erlang_23.2.3-1_centos_7_amd64.rpm

# 装置 Socat 这里没有下载源文件,而是间接通过 yum 在线装置,因为它并不大
yum install -y socat

# 装置 RabbitMQ
rpm -ivh rabbitmq-server-3.8.14-1.el7.noarch.rpm
  1. 装置完结,启动服务查看 RabbitMQ 是否能够启动胜利
# 启动服务
systemctl start rabbitmq-server
# 开机自启
systemctl enable rabbitmq-server
# 进行服务
systemctl stop rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server.service

如图所示,即装置启动胜利

若装置谬误,解决参考:

  • Linux 之 RabbitMQ 装置各种问题解决
  • rabbitmq ERROR: epmd error for host deb:address (cannot connect to host/port)解决办法

2.1.2 配置 Web 界面治理

下面的装置其实曾经完结了,然而 RabbitMQ 提供给了咱们一个 Web 模式的治理界面,默认是没有的,须要进行装置。

  1. 装置 Web 治理插件,而后重启服务
# 装置命令
rabbitmq-plugins enable rabbitmq_management

# 重启服务
systemctl restart rabbitmq-server
  1. 肯定要凋谢 Linux 防火墙 的 15672 端口,否则就会无法访问,在学习阶段,你甚至能够去查问命令把防火墙关掉

    • 对应服务器(阿里云,腾讯云等)就是在平安组中凋谢 15672 端口
    • 拜访 Linux IP:15672,例如 http://192.168.122.1:15672
# 查问 15672 是否凋谢,个别默认都是 no
firewall-cmd --query-port=15672/tcp
# 凋谢指定端口 15672 
firewall-cmd --add-port=15672/tcp --permanent
# 从新载入
firewall-cmd --reload
# 再次查问,后果就是 yes 了
firewall-cmd --query-port=15672/tcp
  1. 增加近程登录的账户

    • RabbitMQ 有一个默认账号和明码都是 guest 然而只能在 localhost 下拜访
# 新增用户 用户名和明码都是 admin
rabbitmqctl add_user admin admin
  1. 为近程登录的账户增加权限

    • administrator(超级管理员):登录控制台、查看所有信息、操作用户、操作策略
    • monitoring(监控者):登录控制台、查看所有信息
    • policymaker(策略制定者):登录控制台、指定策略
    • managment(一般管理员):登录控制台
# 设置用户调配操作权限,admin 用户的权限为 administrator
rabbitmqctl set_user_tags admin administrator
  1. 为用户增加资源权限

    • 因为 admin 曾经是超级管理员权限了,所以其实不分配资源权限也能够,会默认去做。
# 命令格局为: set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
# 这里即为 admin 用户开启 配置文件和读写的权限
rabbitmqctl set_permissions -p / admin ".*"".*"".*"
  1. 拜访 Linux IP:15672,例如 http://192.168.122.1:15672,输出方才设置好的用户名明码 admin

    • 如图:拜访胜利

2.1.2.1 命令小结

  1. 增加用户:rabbitmqctl add_user <username> <password>
  2. 批改明码:rabbitmqctl change_password <username> <newpass>
  3. 删除用户:rabbitmqctl delete_user <username>
  4. 用户列表:rabbitmqctl list_users
  5. 设置用户角色:rabbitmqctl set_user_tags <username> <tag1,tag2>
  6. 删除用户所有角色:rabbitmqctl set_user_tags <username>
  7. 为用户增加资源权限:set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

应用:输出 rabbitmqctl,则会提醒可能应用的命令,而后 应用 rabbitmqctl hepl < 命令 > 能够查看具体命令的应用办法和参数。

2.1.3 简略介绍 Web 界面治理

  • Connections(连贯):此处用来治理与 RabbitMQ 建设连贯后的生产者和消费者
  • Channels(通道):连贯建设后,会造成通道,音讯的投递获取依赖通道。
  • Exchanges(交换机):用来实现音讯的路由
  • Queues(队列):寄存音讯的队列,音讯期待被生产,生产后被移除队列。
  • Admin(治理):用于对治理用户,以及对应权限进行设置,如下图所示

Tags 就是用来指定用户的角色

  • administrator(超级管理员):登录控制台、查看所有信息、操作用户、操作策略
  • monitoring(监控者):登录控制台、查看所有信息
  • policymaker(策略制定者):登录控制台、指定策略
  • managment(一般管理员):登录控制台

2.2 Docker 装置

在 Docker 中装置 RabbitMQ 不须要本人去思考版本,环境等的各种抵触不兼容问题,是十分便捷的,我演示的这台虚拟机是一个 CentOS 7.9 裸机,所以咱们从更新 yum,到装置 Docker 和 装置 RabbitMQ 按步骤都讲一下

2.2.1 配置 yum

  1. 更新 yum 到最新版
# 更新 yum
yum update

# 查看 yum 依赖的几个包 yum-utils 提供 yum-config-manager 性能,前面两个是 devicemapper 用到的
yum install -y yum-utils device-mapper-persistent-data lvm2
  1. 设置 yum 源为阿里云
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

2.2.2 装置 docker

2.2.2.1 步骤

  1. 应用 yum 装置 docker

    • docker-ce 是社区版的意思,ee 为企业版
yum install docker-ce -y
  1. 通过查看版本,查看装置是否胜利
docker -v
  1. Docker 镜像减速(这里 < 你的 ID > 要换成本人的哈)
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{"registry-mirrors": ["https://< 你的 ID>.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
  • 国内从 DockerHub 拉取镜像有时会遇到困难,此时能够配置镜像加速器。Docker 官网和国内很多云服务商都提供了国内加速器服务,例如:

    • 科大镜像:https://docker.mirrors.ustc.e…
    • 网易:https://hub-mirror.c.163.com/
    • 阿里云:https://< 你的 ID>.mirror.aliyuncs.com
    • 七牛云加速器:https://reg-mirror.qiniu.com

    当配置某一个加速器地址之后,若发现拉取不到镜像,请切换到另一个加速器地址。国内各大云服务商均提供了 Docker 镜像减速服务,倡议依据运行 Docker 的云平台抉择对应的镜像减速服务。

    阿里云镜像获取地址:https://cr.console.aliyun.com…,登陆后,左侧菜单选中镜像加速器就能够看到你的专属地址了

2.2.2.2 Docker 常见命令

2.2.2.2.1 治理命令

  • 就启动,进行,重启这些简略的命令应用 service 也是能够的,systemctl 性能略微弱小一些
# 启动 docker
systemctl docker start
# 进行 docker
systemctl docker stop
# 重启 docker
systemctl docker restart
# 查看 docker 状态
systemctl status docker
# 开机自启
systemctl enable docker
systemctl unenable docker

2.2.2.2.2 镜像命令

# 导入镜像文件
docker load < xxx.tar.gz
# 查看装置的镜像
docker images
# 删除镜像
docker rmi 镜像名

2.2.3 装置 RabbitMQ(任选其一)

注:间接用 2.2.3.2 一句话装置 会更好一些

2.2.3.1 一步一步装置

  1. 获取 RabbitMQ 的镜像
docker pull rabbitmq:management
  1. 创立并运行容器(具体参数在 3 中介绍)
docker run -id --name 容器名 -p 15672:15672 -p 5672:5672 rabbitmq:management

2.2.3.2 一句话装置

下面的装置形式,就是先获取到 RabbitMQ 镜像后再开始装置,这里是没有问题的,创立时会有一个问题,因为咱们要装置 management 也就是它的 web 治理,如果不做一些解决,默认装好的是没有用户的,所以还须要像后面一样本人进去配置,而 Docker Hub 曾经给出了咱们配置的示例,即应用 -e 代表配置,应用 RABBITMQ_DEFAULT_USERRABBITMQ_DEFAULT_PASS 配置用户名和明码

更多请查看 Docker Hub 官网给予例子中的 Setting default user and password 章节

https://registry.hub.docker.c…

  1. 执行装置
docker run -di --name myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
  1. 通过容器状态,查看是否运行胜利
# 查看容器运行状态
docker ps -a
# 启动
docker start 容器名
# 进行
docker stop 容器名
# 退出命令行,不进行
exit
# 进入到 node 容器(如果开启了 -t 的状况)docker exec -it 容器名 bash

2.2.3.2.1 参数介绍

上面别离解说一下这些参数的阐明:

  • -i:示意运行容器。
  • -t:示意为容器保留交互的形式(命令行),即调配一个伪终端。所以经常会见到 -it 这样的搭配。
  • --name:为容器起个名字。
  • -v:示意目录映射关系(前者是宿主机目录,后者是映射到宿主机上的目录),能够应用多个 -v 做多个目录或文件映射。留神:举荐做目录映射,在宿主机上做批改,而后共享到容器上。
  • -d:示意创立一个守护式容器在后盾运行(这样创立容器后不会主动登录容器,如果只加 -i -t 两个参数,创立后就会主动进去容器),即后端挂起运行。
  • -p:示意端口映射,前者是宿主机端口,后者是容器内的映射端口。能够应用多个 -p 做多个端口映射,只有做了端口映射,能力被外界拜访。

给大家举个例子:

# 创立容器,把容器 3000 端口映射到宿主机 3000 端口,把 /demo 映射到宿主机的 /demo  face 是我下载好的一个现成的镜像
docker run -d -it -p 3000:3000 -v /demo:/demo --name node face

# 例如,名为 node 的镜像中有一个须要执行的 python 程序,就能够通过如下命令进入方才调配到的命令行中去执行这个程序
docker exec -it node bash
  • 因为应用了 -t 这个参数,所以能够调配到一个伪终端,通过 docker exec -it 容器名 bash 进入命令行
  • -v 目录映射后,进入容器后,也会有一个截然不同的 demo 文件夹,例如在其中能够执行 python 程序

2.2.3.2.1 端口介绍

4369:erlang 发现端口

5672:client 端通信端口

15672:治理界面 ui 端口

25672:server 间外部通信端口

61613:不带 TLS 和带 TLS 的 STOMP 客户端

1883:不启用和启用 TLS 的 MQTT 客户端

比拟要害的就是 5672 和 15672

更多端口详情能够拜访官网文档

  • https://www.rabbitmq.com/netw…

注:如果要通过近程连贯,例如拜访 web 治理页面的 15672 端口,Java 客户端连贯的 5672 端口,肯定要进行一个凋谢操作,否则都连贯不到。

  • 以下为基于 CentOS 7.9 凋谢 15672 端口的例子
# 查问 15672 是否凋谢,个别默认都是 no
firewall-cmd --query-port=15672/tcp
# 凋谢指定端口 15672 
firewall-cmd --add-port=15672/tcp --permanent
# 从新载入
firewall-cmd --reload
# 再次查问,后果就是 yes 了
firewall-cmd --query-port=15672/tcp
  • 以下是敞开防火墙的命令
systemctl disable firewalld
systemctl stop firewalld   

3. RabbitMQ 协定和模型

装置完结后,就要进入主题,即用 Java 或者 Springboot 代码来实现 RabbitMQ 的几种形式,然而想要很好的了解这几种路由替换形式,就须要对它的协定和架构模型有所理解。

3.1 协定

3.1.1 什么是协定?

协定,网络协议的简称,网络协议是通信计算机单方必须独特听从的一组约定。如怎么样建设连贯、怎么样相互辨认等。只有恪守这个约定,计算机之间能力互相通信交换。它的三要素是:语法、语义、时序。

为了使数据在网络上从源达到目标,网络通信的参与方必须遵循雷同的规定,这套规定称为协定(protocol),它最终体现为在网络上传输的数据包的格局。

3.1.1.1 网络协议的三要素

  1. 语法:数据与管制信息的构造和格局,以及数据呈现的程序。
  2. 语义:解释管制信息每个局部的意义,以及规定了须要收回何种管制信息以及实现的动作做出何种响应。
  3. 时序:对事件产生程序的具体阐明。

人们形象地把这三个因素形容为:做什么,怎么做,做的程序。

举个例子 HTTP 协定

语法:HTTP 规定了申请报文和响应报文的格局
语义:客户端被动发动申请称为申请,服务端随之返回数据,称为响应
时序:一个申请对应一个响应,而且先有申请后有响应

3.1.1.1.1 面试题:为什么消息中间件不间接应用 HTTP 协定

对于一个消息中间件来说,其次要责任就是负责数据传递,存储,散发,高性能和简洁才是咱们所谋求的,而 HTTP 申请报文头和响应报文头是比较复杂的,蕴含了 Cookie,数据的加密解密,窗台吗,响应码等附加的性能,咱们并不需要这么简单的性能。

同时大部分状况下 HTTP 大部分都是短链接,在理论的交互过程中,一个申请到响应都很有可能会中断,中断当前就不会执行长久化,就会造成申请的失落。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取信息的过程,呈现问题和故障要对数据或音讯执行长久化等,目标是为了保障音讯和数据的高牢靠和持重的运行

3.1.2 RabbitMQ 的 AMQP 协定

RabbitMQ 的应用的协定是 AMQP(advanced message queuing protocol),它在 2003 年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。

AMQP 更精确的说是一种 binary wire-level protocol(链接协定)。这是其和 JMS 的实质差异,AMQP 不从 API 层进行限定,而是间接定义网络替换的数据格式。这使得实现了 AMQP 的 Provider(Producer)人造性就是跨平台的。

相比拟其它音讯协定,其个性为:

  1. 分布式事务反对
  2. 音讯的长久化反对
  3. 高性能和高牢靠的音讯解决劣势

3.1.3 架构模型

想要学习前面的几种音讯具体的发送模式,这个模型图就必须了解分明,因为这几种形式就是对这个模型不同水平的抉择和缩减

  • Producer:音讯的生产者(发送音讯的程序)。
  • Connection:应用程序与 Broker 之间的网络连接。
  • Channel:信道,即信息传输的通道,能够建设多个 Channel,每个 Channel 代表一个会话工作。

    • 信道是建设在 TCP 连贯内的虚构连贯,信息的读写都通过信道传输,因为对于操纵零碎而言,建设和销毁 TCP 是十分低廉的,所以引入了信道的概念,以复用一条 TCP 连贯。
  • Broker(Server):标识音讯队列服务器实体,例如这里就是 RabbitMQ Server。
  • Virtual Host:虚拟主机,一个 Broker 中能够设置多个 Virtual Host,用作不同用户的权限隔离。

    • Broker 能够了解为整个数据库服务,而 Virtual Host 就是其中每个数据库的感觉,不同我的项目能够对应不同的数据库,其中有着我的项目所属的业务表等等。
    • 每个 Virtual Host 中,能够有若干个 Exchange 和 Queue。
  • Exchange:交换机,用来接管生产者发送的音讯,而后将这些音讯依据路由键发送到队列。
  • Binding:Exchange 和 Queue 之间的虚构连贯,Binding 中能够包含多个 Routing key。
  • Routing key:路由规定,虚拟机用它来确认如何路由一个特定音讯。
  • Queue:音讯队列,它是音讯的容器,用来保留音讯,每一条音讯都能传入一个或者多个队列中,期待消费者生产,即取出这个音讯。
  • Consumer:音讯的消费者(接管音讯的程序)。

4. Java 实现 RabbitMQ

4.1 环境搭建

官网介绍几种模型:https://www.rabbitmq.com/gets…

截止目前为止,官网一共提供了 7 中模型的介绍,咱们次要介绍前五种根本的模式,也有人将 Direct 和 Topic 模式都纳入 Routing 模式,也能够看做四大种。

4.1.1 创立 Java 我的项目

首先创立好一个不应用骨架的 Maven 我的项目,而后引入 RabbitMQ 依赖,还有单元测试依赖即可

<dependency>
   <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
</dependency>

4.1.2 创立虚拟主机(可选)

在这里,咱们创立了一个新的 Virtual Hosts,用来为这个 Java 我的项目服务,大家还能够创立一个新的用户,而后对其开启这个 Virtual Hosts 的拜访权限(行将虚拟主机与用户绑定)。咱们这里还是用 admin(我之前创立的一个管理员权限用户)来演示。

注:这部分不去做也能够,间接用 / 和 admin 用户也行

4.1.3 创立连贯工具类

因为咱们前面要演示多种例子,而每一次获取连贯和开释连贯、敞开资源等操作代码都是统一的,为了避免代码冗余,优化代码,更易了解,提取出一个工具类,这样大家将重心放在不同实现形式的比照上就行了。

  • RabbitMqUtil 工具类
public class RabbitMqUtil {
    /**
     * 主机名 即 Linux IP 地址
     */
    private static String host = "";
    /**
     *  端口号 客户端拜访默认都是 5672
     */
    private static int port = 0;
    /**
     * 虚拟主机 能够设置为默认的 / 或者本人创立出指定的虚拟主机
     */
    private static String virtualHost = "";
    /**
     * 用户名
     */
    private static String username = "";
    /**
     * 明码
     */
    private static String password = "";

    // 应用动态代码块为 Properties 对象赋值
    static {
        try {
            // 实例化对象
            Properties properties = new Properties();
            // 获取 properties 文件的流对象
            InputStream in = RabbitMqUtil.class.getClassLoader().getResourceAsStream("rabbitmq.properties");
            properties.load(in);
            // 别离获取 value
            host = properties.getProperty("host");
            port = Integer.parseInt(properties.getProperty("port"));
            virtualHost = properties.getProperty("virtualHost");
            username = properties.getProperty("username");
            password = properties.getProperty("password");

        } catch (Exception e) {e.printStackTrace();
        }
    }

    /**
     * 获取连贯
     *
     * @return 连贯
     */
    public static Connection getConnection() {
        try {
            // 创立连贯工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置连贯 rabbitmq 主机
            connectionFactory.setHost(host);
            // 设置端口号
            connectionFactory.setPort(port);
            // 设置连贯的虚拟主机(数据库的感觉)connectionFactory.setVirtualHost(virtualHost);
            // 设置拜访虚拟主机的用户名和明码
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            // 返回一个新连贯
            return connectionFactory.newConnection();} catch (Exception e) {e.printStackTrace();
        }
        return null;
    }

    /**
     * 敞开通道和开释连贯
     *
     * @param channel    channel
     * @param connection connection
     */
    public static void close(Channel channel, Connection connection) {
        try {if (channel != null) {channel.close();
            }
            if (connection != null) {connection.close();
            }
        } catch (Exception e) {e.printStackTrace();
        }
    }
}
  • properties
host=192.168.122.1
port=5672
virtualHost=/rabbitmq_maven_01
username=admin
password=admin

4.2 五种实现形式

阐明:

  • 队列名,音讯等等字符串内容,更举荐定义成变量传入,我文中都是间接写在参数中的,这种魔法值的写法,并不是很柔美。
  • 生产者中应用了 Junit 单元测试,然而消费者中却在 main 函数中编写,这是因为,咱们心愿消费者处于一个继续运行期待的状态,如果应用 Junit 会导致,程序在执行一次后完结掉。

    • 除了在 main 函数中编写,还能够思考应用 sleep 期待或者 while(true) 让程序不要间接终止掉。

4.2.1 简略队列模式(Hello Word)

  • Producer:音讯的生产者(发送音讯的程序)。
  • Queue:音讯队列,了解为一个容器,生产者向它发送音讯,它把音讯存储,期待消费者生产。
  • Consumer:音讯的消费者(接管音讯的程序)。

4.2.1.1 如何了解

由图所示,简略队列模式,一个生产者,通过一个队列,对应一个消费者。能够看做是点对点的一种传输方式,相较与 3.1.3 中的模型图,最次要的特点就是看不到 Exchange(交换机)和 routekey(路由键),正是因为这种模式简略,所以并不会波及到简单的条件散发等等,因而也不须要用户去显式的思考交换机和路由键的问题。

  • 然而要留神,这种模式并不是生产者间接对接队列,而是用了默认的交换机,默认的替换机会把音讯发送到和 routekey 名称雷同的队列中去,这也是咱们在前面代码中在 routekey 地位填写了队列名称的起因

4.2.1.2 代码实现

4.2.1.2.1 生产者代码

public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        Channel channel = connection.createChannel();
        // 通道绑定音讯队列
        channel.queueDeclare("queue1",false,false,false,null);
        // 公布音讯
        channel.basicPublish("","queue1",null,"This is rabbitmq message 001 !".getBytes());
        // 通过工具敞开 channel 和开释连贯
        RabbitMqUtil.close(channel,connection);
    }
}
  1. 通过工具类获取连贯
  2. 获取连贯通道:依据 3.1.3 的模型图可知,生产者须要在获取到连贯后,再获取信道,能力去拜访前面的交换机队列等。
  3. 通道绑定音讯队列:绑定队列前,应该绑定交换机,然而此模式中荫蔽了交换机的概念,背地应用了默认的交换机,所以间接绑定队列。

    • queueDeclare 办法解释

      • 参数 1:queue(队列名称),如果队列不存在,则主动创立。
      • 参数 2:durable(队列是否长久化),长久化能够保障服务器重启后此队列依然存在。
      • 参数 3:exclusive(排他队列)即是否独占队列,如果此项为 true,该队列仅对首次申明它的连贯可见,并在连贯断开时主动删除。
      • 参数 4:autoDelete(主动删除),最初一个消费者将音讯生产结束后,主动删除队列。
      • 参数 5:arguments(携带附加属性)。
  4. 公布音讯:此处能够指定音讯队列的发送办法,以及内容等,因为此模式比较简单,所以没有波及到全副参数,前面的模式会有具体的解说

    • basicPublish 办法解释

      • 参数 1:exchange(交换机名称)。
      • 参数 2:routingKey(路由 key),此处填写队列名,可了解为把音讯发送到和 routekey 名称雷同的队列中去。
      • 参数 3:props(音讯的管制状态),能够在此处管制音讯的长久化。

        • 参数为:MessageProperties.PERSISTENT_TEXT_PLAIN
      • 参数 4:body(音讯主体),类型是一个字节数组,要转一下类型。
  5. 通过工具敞开 channel 和开释连贯:先敞开通道,再开释连贯。

4.2.1.2.2 消费者代码

public class Consumer {public static void main(String[] args) throws IOException, TimeoutException{
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        Channel channel = connection.createChannel();
        // 通道绑定音讯队列
        channel.queueDeclare("queue1", false, false, false, null);
        // 生产音讯
        channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("new String(body):" + new String(body));
            }
        });
    }
}
  1. 通过工具类获取连贯
  2. 获取连贯通道
  3. 通道绑定音讯队列
  4. 生产音讯:此处用来指定生产哪个队列的音讯,以及一些机制和回调

    • basicConsume 办法解释

      • 参数 1:queue(队列名称),即生产哪个队列的音讯。
      • 参数 2:autoAck(自动应答)开始音讯的主动确认机制,只有生产了就从队列删除音讯。
      • 参数 3:callback(生产时的回调接口),callback 的类型是 Consumer 这里应用了 DefaultConsumer 就是 Consumer 的一个实现类。其中重写 handleDelivery 办法,就能够获取到生产的数据内容了,这里次要应用了其中的 body,即查看音讯主体,其余三个参数临时还没用到,有趣味能够先打印输出一下,能先有个大略的理解。

4.2.2 工作队列模式(Work Queue)

  • Producer:音讯的生产者(发送音讯的程序)。
  • Queue:音讯队列,了解为一个容器,生产者向它发送音讯,它把音讯存储,期待消费者生产。
  • Consumer:音讯的消费者(接管音讯的程序)。

    • 此处咱们假如 Consumer1、Consumer2、Consumer3 别离为实现工作速度不一样快的消费者,这会引出此模式的一个重点问题。

4.2.2.1 如何了解

工作模式由图能够看出,就是在简略队列模式的根底上,减少了多个消费者,也就是让多个消费者绑定同一个队列,独特去生产,这样能解决简略队列模式中,如果生产速速远大于生产速度,而导致的音讯沉积景象。

  • 因为音讯被生产后就会隐没,所以不用放心工作会反复执行。

4.2.2.2 代码实现

注:工作队列模式有两种

  1. 轮询模式:每个消费者均分音讯
  2. 偏心散发模式(能者多劳):按能力散发,处理速度快的散发的多,处理速度慢的散发的少

咱们首先演示的是轮询模式,依据它的毛病,又能引出偏心散发模式

上面只形容与下面有差别的局部,在简略模式中,这些根本的办法都有介绍过

4.2.2.2.1 轮询模式 - 生产者代码

public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        Channel channel = connection.createChannel();
        // 通道绑定音讯队列
        channel.queueDeclare("work", true, false, false, null);
        for (int i = 1; i <= 20; i++) {
            // 公布音讯
            channel.basicPublish("","work", null, (i +" 号音讯 ").getBytes());
        }
        // 通过工具敞开 channel 和开释连贯
        RabbitMqUtil.close(channel, connection);
    }
}

流程和简略队列模式基本一致,有一些小小的改变,生产者中次要就是加了层循环,因为有多个消费者,所以多发送一些音讯,能够看出一些特点和问题。

4.2.2.2.2 轮询模式 - 消费者代码

  • 消费者 1
public class Consumer1 {public static void main(String[] args) throws IOException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        final Channel channel = connection.createChannel();
        // 通道绑定音讯队列
        channel.queueDeclare("work", true, false, false, null);
        // 生产音讯
        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {Thread.sleep(2000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                System.out.println("消费者 1 号:生产 -" + new String(body));
            }
        });
    }
}
  • 消费者 2
public class Consumer2 {public static void main(String[] args) throws IOException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        final Channel channel = connection.createChannel();
        // 通道绑定音讯队列
        channel.queueDeclare("work", true, false, false, null);
        // 生产音讯
        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者 2 号:生产 -" + new String(body));
            }
        });
    }

上述两个消费者都在 basicConsume 中开启了主动 Ack 应答,这一点上面会详述,同时在消费者 1 中,减少了 sleep 2s 的语句,模仿消费者 1 解决音讯速度慢,而消费者 2 解决音讯速度快的场景。

运行后果:

  • Consumer1
消费者 1 号:生产 - 1 号音讯
消费者 1 号:生产 - 3 号音讯
消费者 1 号:生产 - 5 号音讯
消费者 1 号:生产 - 7 号音讯
消费者 1 号:生产 - 9 号音讯
消费者 1 号:生产 -11 号音讯
消费者 1 号:生产 -13 号音讯
消费者 1 号:生产 -15 号音讯
消费者 1 号:生产 -17 号音讯
消费者 1 号:生产 -19 号音讯
  • Consumer2
消费者 2 号:生产 - 2 号音讯
消费者 2 号:生产 - 4 号音讯
消费者 2 号:生产 - 6 号音讯
消费者 2 号:生产 - 8 号音讯
消费者 2 号:生产 -10 号音讯
消费者 2 号:生产 -12 号音讯
消费者 2 号:生产 -14 号音讯
消费者 2 号:生产 -16 号音讯
消费者 2 号:生产 -18 号音讯
消费者 2 号:生产 -20 号音讯

察看执行过程:发现两个消费者尽管每个人最初都各自解决了一半的音讯,而且是依照一人一条调配的,然而消费者 2 号处理速度快,一下子就全副解决完了,然而消费者 1 号,每一次解决都须要 2s 所以,只能迟缓的解决,而消费者 2 号就处于一个闲暇节约的状况了。

如何切换为偏心散发模式呢?

这就和 basicConsume 中的第二个参数,开启主动确认生产无关了,它默认是 true,也就代表只有一旦拿到队列中分发给这个消费者的音讯,我就会主动返回一个确认生产的标识,队列收到后就会主动删除掉队列中的音讯。

  • 然而这其中有一个很重要的问题,这种形式就是将危险交给了消费者,例如消费者收到了本人须要解决的 10 条音讯,刚生产了 4 个,消费者宕机,挂掉了,前面的 6 个音讯就失落了。

如果想要批改为按能力调配的形式,有两个要点

  1. 设置通道一次只能生产一个音讯
  2. 敞开音讯的主动确认,手动确认音讯

4.2.2.2.3 偏心散发模式 - 生产者代码

public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        Channel channel = connection.createChannel();
        // 一次只发送一条音讯
        channel.basicQos(1);
        // 通道绑定音讯队列
        channel.queueDeclare("work", true, false, false, null);
        for (int i = 1; i <= 20; i++) {
            // 公布音讯
            channel.basicPublish("","work", null, (i +" 号音讯 ").getBytes());
        }
        // 通过工具敞开 channel 和开释连贯
        RabbitMqUtil.close(channel, connection);
    }

4.2.2.2.4 偏心散发模式 - 消费者代码

  • 消费者 1
public class Consumer1 {public static void main(String[] args) throws IOException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        final Channel channel = connection.createChannel();
        // 一次只承受一条未确认的音讯
        channel.basicQos(1);
        // 通道绑定音讯队列
        channel.queueDeclare("work", true, false, false, null);
        // 生产音讯
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {Thread.sleep(2000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                System.out.println("消费者 1 号:生产 -" + new String(body));
                // 返回 deliveryTag 代表队列能够删除此音讯了
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
  • 消费者 2
public class Consumer2 {public static void main(String[] args) throws IOException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        final Channel channel = connection.createChannel();
        // 步骤一: 一次只承受一条未确认的音讯
        channel.basicQos(1);
        // 通道绑定音讯队列
        channel.queueDeclare("work", true, false, false, null);
        // 生产音讯
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者 2 号:生产 -" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

运行后果:

  • Consumer1
消费者 1 号:生产 - 1 号音讯
  • Consumer2
消费者 2 号:生产 - 2 号音讯
消费者 2 号:生产 - 3 号音讯
消费者 2 号:生产 - 4 号音讯
消费者 2 号:生产 - 5 号音讯
消费者 2 号:生产 - 6 号音讯
消费者 2 号:生产 - 7 号音讯
消费者 2 号:生产 - 8 号音讯
消费者 2 号:生产 - 9 号音讯
消费者 2 号:生产 -10 号音讯
消费者 2 号:生产 -11 号音讯
消费者 2 号:生产 -12 号音讯
消费者 2 号:生产 -13 号音讯
消费者 2 号:生产 -14 号音讯
消费者 2 号:生产 -15 号音讯
消费者 2 号:生产 -16 号音讯
消费者 2 号:生产 -17 号音讯
消费者 2 号:生产 -18 号音讯
消费者 2 号:生产 -19 号音讯
消费者 2 号:生产 -20 号音讯

4.2.3 公布与订阅模式(Fanout 播送)

  • Producer:音讯的生产者(发送音讯的程序)。
  • Exchange:交换机,负责发送音讯给指定队列。
  • Queue:音讯队列,了解为一个容器,生产者向它发送音讯,它把音讯存储,期待消费者生产。
  • Consumer:音讯的消费者(接管音讯的程序)。

4.2.3.1 如何了解

Fanout 直译为“扇出”然而大家更多的会把它叫做播送或者公布与订阅,它是一种没有路由 key 的模式,生产者将音讯发送给交换机,替换机会把所有音讯复制同步到所有与它绑定过的队列上,而每个队列只能有一个消费者拿到这条音讯,如果在一个消费者连贯中,创立多个通道,则会呈现争抢音讯的后果。

4.2.3.2 代码实现

注:上面只形容与下面有差别的局部,在简略模式中,这些根本的办法都有介绍过

4.2.3.2.1 生产者代码

public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        final Channel channel = connection.createChannel();
        // 申明交换机
        channel.exchangeDeclare("order", "fanout");
        for (int i = 1; i <= 20; i++) {
            // 公布音讯
            channel.basicPublish("order", "", null,"fanout!".getBytes());
        }
        // 通过工具敞开 channel 和开释连贯
        RabbitMqUtil.close(channel, connection);
    }
}
  1. 申明交换机

    • exchangeDeclare 办法解释

      • 参数 1:exchange(交换机名称),如果交换机不存在,则主动创立
      • 参数 2:type(类型),此处抉择 fanout 模式
  2. 公布音讯:在 basicPublish 办法的第一个参数中输出上述定义好的交换机的名字,第二个参数,路由键为空

    • 循环 20 条是为了演示消费者

4.2.3.2.2 消费者代码

  • 消费者 1
public class Consumer1 {public static void main(String[] args) throws IOException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 申明交换机
        channel.exchangeDeclare("order", "fanout");
        // 创立长期队列
        String queue = channel.queueDeclare().getQueue();
        // 绑定长期队列和交换机
        channel.queueBind(queue, "order", "");
        // 生产音讯
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者 1 号:生产 -" + new String(body));
            }
        });
    }
}
  1. 申明交换机
  2. 创立长期队列
  3. 绑定长期队列和交换机

    • queueBind 办法解释

      • 参数 1:queue(长期队列)
      • 参数 2:exchange(交换机)
      • 参数 3:routingKey(路由 key)
  • 消费者 2:演示了一个连贯中,多个通道的状况
public class Consumer2 {public static void main(String[] args) throws IOException {
       // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        
        // 获取连贯通道
        Channel channel = connection.createChannel();
        Channel channel2 = connection.createChannel();
        
        // 申明交换机
        channel.exchangeDeclare("order", "fanout");
        channel2.exchangeDeclare("order", "fanout");
        
        // 创立长期队列
        String queue = channel.queueDeclare().getQueue();
        System.out.println(queue);
        
        // 绑定长期队列和交换机
        channel.queueBind(queue, "order", "");
        channel2.queueBind(queue, "order", "");
        
        // 生产音讯
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者 2 号:生产 -" + new String(body));
            }
        });
        
        // 生产音讯
        channel2.basicConsume(queue, true, new DefaultConsumer(channel2) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者 2 - 2 号:生产 -" + new String(body));
            }
        });
    }
}

运行后果:

消费者 2 号:生产 -fanout!
消费者 2 号:生产 -fanout!
消费者 2 - 2 号:生产 -fanout!
消费者 2 号:生产 -fanout!
消费者 2 号:生产 -fanout!
消费者 2 号:生产 -fanout!
消费者 2 号:生产 -fanout!
消费者 2 号:生产 -fanout!
消费者 2 号:生产 -fanout!
消费者 2 号:生产 -fanout!
消费者 2 号:生产 -fanout!
消费者 2 - 2 号:生产 -fanout!
消费者 2 - 2 号:生产 -fanout!
消费者 2 - 2 号:生产 -fanout!
消费者 2 - 2 号:生产 -fanout!
消费者 2 - 2 号:生产 -fanout!
消费者 2 - 2 号:生产 -fanout!
消费者 2 - 2 号:生产 -fanout!
消费者 2 - 2 号:生产 -fanout!
消费者 2 - 2 号:生产 -fanout!

4.2.3.2.3 为什么消费者中也申明交换机?

从下面的代码中能够看出,在 Producer 和 Conusmer 中咱们都别离申明了交换机,然而消费者由图可知,并不会与交换机有间接的接触,为什么消费者中也申明交换机呢?

这是为了保障 Producer 或者 Producer 执行的时候,永远不会因为交换机还没被申明而出错,例如你只在 Producer 申明了交换机,那么你就必须先启动 Producer,如果间接执行 Conusmer 此时交换机就还不存在,就会报错。而全副写入申明,则能够保障不管先启动谁,都会申明到交换机。

4.2.4 路由模式(Routing / Direct)

  • Producer:音讯的生产者(发送音讯的程序)。
  • Exchange:交换机,负责发送音讯给指定队列。
  • routingKey:路由 key,即上图的 key1,key2 等,相当于在交换机和队列之间又加了一层限度
  • Queue:音讯队列,了解为一个容器,生产者向它发送音讯,它把音讯存储,期待消费者生产。
  • Consumer:音讯的消费者(接管音讯的程序)。

4.2.4.1 如何了解

路由模式的交换机类型是 direct,与 fanout 模式相比,多了路由 key 这个概念。生产者发送携带指定 routingKey(路由 key)的音讯到交换机,交换机拿着此 routingKey 去找到绑定了这个 routingKey 的队列,而后发送到此队列,一个队列能够绑定多个 routingKey。

4.2.4.2 代码实现

4.2.4.2.1 生产者代码

public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        Channel channel = connection.createChannel();
        // 申明交换机
        channel.exchangeDeclare("order_direct", "direct");
        // 指定 routingKey 
        String key = "info";
        // 公布音讯
        channel.basicPublish("order_direct", key, null, ("发送给指定路由" + key + "的音讯").getBytes());
        // 通过工具敞开 channel 和开释连贯
        RabbitMqUtil.close(channel, connection);
    }
}
  1. 指定 routingKey,即在 basicPublish 办法 的第二个参数中,指定 key 的值

4.2.4.2.2 消费者代码

  • 消费者 1
public class Consumer1 {public static void main(String[] args) throws IOException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 申明交换机
        channel.exchangeDeclare("order_direct", "direct");
        // 获取长期队列
        String queue = channel.queueDeclare().getQueue();
        // 绑定长期队列和交换机
        channel.queueBind(queue, "order_direct", "info");
        channel.queueBind(queue, "order_direct", "error");
        channel.queueBind(queue, "order_direct", "warn");
        // 生产音讯
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者 1:生产 -" + new String(body));
            }
        });
    }
}
  1. 只是在绑定队列和交换机的时候,减少了 key 这个值
  • 消费者 2
public class Consumer2 {public static void main(String[] args) throws IOException {
         // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 申明交换机
        channel.exchangeDeclare("order_direct", "direct");
        // 获取长期队列
        String queue = channel.queueDeclare().getQueue();
        // 绑定长期队列和交换机
        channel.queueBind(queue, "order_direct", "error");
        // 生产音讯
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者 2:生产 -" + new String(body));
            }
        });
    }
}

运行后果:只有消费者 1 收到了音讯

消费者 1:生产 - 发送给指定路由 info 的音讯

4.2.5 通配符匹配模式(Topic)

  • Producer:音讯的生产者(发送音讯的程序)。
  • Exchange:交换机,负责发送音讯给指定队列。
  • routingKey:路由 key,即上图的 key1,key2 等,相当于在交换机和队列之间又加了一层限度

    • 然而 Topic 中的 key 为通配符的模式,这样能够大大的提高效率
  • Queue:音讯队列,了解为一个容器,生产者向它发送音讯,它把音讯存储,期待消费者生产。
  • Consumer:音讯的消费者(接管音讯的程序)。

4.2.5.1 如何了解

通配符匹配模式的交换机类型为 topic,因为它与 Direct 模式很类似,所以大家有时候也会把 Direct 模式和 Topic 独特纳入路由模式下,它们的区别就是,Direct 模式的 routingKey 是一个指定的值,而 Topic 模式的 routingKey 能够应用通配符,而且个别都是由一个或多个单词组成,多个单词之间以”.”宰割,例如:ideal.insert。

  • *:匹配正好一个词,例如:order.* 能够匹配到 order.insert
  • #:匹配一个或者多个词,例如:order.# 能够匹配到 order.insert.common

    • # 就像一个多层的概念,而 * 只是一个单层的概念

4.2.5.2 代码实现

4.2.5.2.1 生产者代码

public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("order_topic", "topic");
        // 申明交换机
        String key = "user.query.all";
        // 公布音讯
        channel.basicPublish("order_topic", key, null, ("发送给指定路由" + key + "的音讯").getBytes());
        RabbitMqUtil.close(channel, connection);
    }
}

4.2.5.2.2 消费者代码

  • 消费者 1
public class Consumer1 {public static void main(String[] args) throws IOException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        Channel channel = connection.createChannel();
        // 申明交换机
        channel.exchangeDeclare("order_topic", "topic");
        // 获取长期队列
        String queue = channel.queueDeclare().getQueue();
        // 指定路由 key
        String key = "user.*";
        channel.queueBind(queue, "order_topic", key);
        // 公布音讯
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者 1:生产 -" + new String(body));
            }
        });
    }
}
  • 消费者 2
public class Consumer2 {public static void main(String[] args) throws IOException {
        // 通过工具类获取连贯
        Connection connection = RabbitMqUtil.getConnection();
        // 获取连贯通道
        Channel channel = connection.createChannel();
        // 申明交换机
        channel.exchangeDeclare("order_topic", "topic");
        // 获取长期队列
        String queue = channel.queueDeclare().getQueue();
        // 指定路由 key
        String key = "user.#";
        channel.queueBind(queue, "order_topic", key);
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者 2:生产 -" + new String(body));
            }
        });
    }
}

运行后果:只有消费者 2 收到了音讯,因为音讯是一个多层的构造,只有 user.# 能匹配到

消费者 2:生产 - 发送给指定路由 user.query.all 的音讯

5. Springboot 实现 RabbitMQ

SpringBoot 提供 Spring For RabbitMQ 的启动器,同时提供了一系列注解以及 RabbitTemplate 供咱们应用,可能极大的简化开发 RabbitMQ 的步骤,上面别离演示了【5.1 基于纯注解】以及【5.2 基于注解 + 配置类】的写法,其应用形式大同小异,只是申明和绑定队列交换机等的地位不同。个别认为后者更好保护治理,任选其一即可。

环境筹备:

  1. 首先创立 SprinBoot 我的项目,而后抉择 RabbitMQ 的启动器,以及单元测试等根本启动器
  2. 编写 yml 配置文件,编写连贯 RabbitMQ 须要的数据

RabbitMQ 依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml 配置文件

spring:
  rabbitmq:
    host: 192.168.122.1 # 服务器地址
    port: 5672 # tcp 端口
    username: admin # 用户名
    password: admin # 用户明码
    virtual-host: /rabbitmq_springboot_01 # 虚拟主机

5.1 基于纯注解

注:此形式没有创立配置类来治理队列以及交换机的申明和绑定等,而是全副通过注解的形式间接在消费者中写入

5.1.1 简略队列模式

所有生产音讯的代码,咱们都放到 Test 中去做

  • 生产者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleSendMessage() {rabbitTemplate.convertAndSend("simple_queue", "This is a message !");
    }
}
  1. 第一步就是注入 SpringBoot 提供给咱们的 RabbitTemplate
  2. 通过 RabbitTemplate 的 convertAndSend 办法用来发送音讯,他有多种重载形式,明天别离会用到 2 个 和 3 个参数的

    • convertAndSend 办法详解(两个参数)

      • 参数 1:routingKey(路由 key)
      • 参数 2:object(发送的音讯注释)
    • convertAndSend 办法详解(三个参数)

      • 参数 1:exchange(交换机)
      • 参数 2:routingKey(路由 key)
      • 参数 3:object(发送的音讯注释)
  • 消费者
// 注入容器
@Component
// 监听 RabbitMQ
@RabbitListener(queuesToDeclare = @Queue(value = "simple_queue", durable = "true", exclusive = "false", autoDelete = "false"))
public class SimpleConsumer {
    // 主动回调
    @RabbitHandler
    public void receiveMessage(String message) {System.out.println("消费者:" + message);
    }
}
  1. 注入容器
  2. 监听 RabbitMQ,在 @RabbitListener 注解中,能够实现,队列的申明,以及前面交换机与队列的绑定等

    • @Queue 能够有四个参数,因为其各有默认值,所以只给定 value 值,就会依照 长久化,非独占,非主动删除的形式默认创立

      • 参数 1:value(队列名)
      • 参数 2:durable(长久化音讯队列)RabbitMQ 重启后,队列仍存在,默认 true
      • 参数 3:exclusive(是否独占)示意该音讯队列是否只在以后 Connection 失效,默认是 false
      • 参数 4:auto-delete(主动删除)示意音讯队列没有在应用时将被主动删除,默认是 false
  3. 在办法上增加 @RabbitHandler 注解,就可能实现主动回调,这样咱们就能拿到生产者中的音讯了

    • 注:receiveMessage 这个办法的参数类型,取决于你在生产者有发送了什么类型的数据

5.1.2 工作队列模式

5.1.2.1 轮询模式

  • 生产者:没什么好说的,因为工作模式有多个消费者,所以多发送几条音讯
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testWorkSendMessage() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend("work_queue", "This is a message !, 序号:" + i);
        }
    }
}
  • 消费者
@Component
public class WorkConsumer {  
    // 监听 RabbitMQ
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    // 消费者 1
    public void receiveMessage1(String message) {System.out.println("消费者 1:" + message);
   
    // 监听 RabbitMQ
    @RabbitListener(queuesToDeclare = @Queue("work_queue")
    // 消费者 2
    public void receiveMessage2(String message) {System.out.println("消费者 2:" + message);
    }    
}
  1. @RabbitListener 注解,既能够放在类上,也能够放在办法上,例如上述代码,咱们就别离放在了两个办法上,用来指代不同的消费者。

    • 然而如果在类上退出 @RabbitListener 注解,而在上面两个办法中,增加 @RabbitHandler 注解则会报错,须要别离为每个消费者都创立一个类

5.1.2.2 偏心模式(按能力调配)

5.1.2.2.1 批改配置文件的形式

  • 生产者不变
  • 批改配置文件 yml / properties
spring:
  rabbitmq:
    host: 192.168.122.1 # 服务器地址
    port: 5672 # tcp 端口
    username: admin # 用户名
    password: admin # 用户明码
    virtual-host: /rabbitmq_springboot_01 # 虚拟主机
    # 新增局部
    listener:
      simple:
        acknowledge-mode: manual # 开启 ack 手动应答
        prefetch: 1 # 每次只能生产 1 条音讯
  1. acknowledge-mode 选项介绍

    • auto:主动确认,为默认选项
    • manual:手动确认(按能力调配就须要设置为手动确认)
    • none:不确认,发送后主动抛弃
  • 消费者
@Component
public class WorkConsumer {
    // 监听 RabbitMQ
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    // 消费者 1
    public void receiveMessage(String body, Message message, Channel channel) throws IOException {
        try {
            // 打印输出音讯主题
            System.out.println("消费者 1:" + body);
            // 返回 deliveryTag 代表队列能够删除此音讯了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {e.printStackTrace();
            // 消费者通知队列信息生产失败
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
    
   // 监听 RabbitMQ
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    // 消费者 2
    public void receiveMessage2(String body, Message message, Channel channel) throws IOException{
        try {
            // 提早 2s 代表解决业务慢
            Thread.sleep(2000);
        } catch (InterruptedException e) {e.printStackTrace();
        }

        try {
            // 打印输出音讯主题
            System.out.println("消费者 2:" + body);
            // 返回 deliveryTag 代表队列能够删除此音讯了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {e.printStackTrace();
            // 消费者通知队列信息生产失败
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}
  1. 因为在 yml 配置中开启了手动确认,所以,须要在胜利和失败后别离返回确认音讯
  2. basicAck 办法解释

    • 参数 1:deliveryTag(交付标记,即该音讯的 index),返回即代表确认收到音讯,队列能够删除此音讯了
    • 参数 2:mutiple(是否批量)抉择 true 将一次性回绝所有小于 deliveryTag 的音讯
  3. basicNack 办法解释

    • 参数 1 | 参数 2 同上
    • 参数 3:requeue(被回绝的是否从新进入队列)

运行后果:

消费者 1:This is a message !, 序号:2
消费者 1:This is a message !, 序号:3
消费者 1:This is a message !, 序号:4
消费者 1:This is a message !, 序号:5
消费者 1:This is a message !, 序号:6
消费者 1:This is a message !, 序号:7
消费者 1:This is a message !, 序号:8
消费者 1:This is a message !, 序号:9
消费者 1:This is a message !, 序号:10
消费者 1:This is a message !, 序号:11
消费者 1:This is a message !, 序号:12
消费者 1:This is a message !, 序号:13
消费者 1:This is a message !, 序号:14
消费者 1:This is a message !, 序号:15
消费者 1:This is a message !, 序号:16
消费者 1:This is a message !, 序号:17
消费者 1:This is a message !, 序号:18
消费者 1:This is a message !, 序号:19
消费者 1:This is a message !, 序号:20
    
消费者 2:This is a message !, 序号:1

到当初曾经实现了批改配置文件的形式实现按能力调配,补充几个配置的内容,咱们下面只用了一部分,其余的不便大家参考,yml 和 properties 大家本人抉择即可

# 发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# spring.rabbitmq.publisher-confirms=true(旧版)
# 发送回调
spring.rabbitmq.publisher-returns=true
# 生产手动确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 并发消费者初始化值
spring.rabbitmq.listener.simple.concurrency=1
# 并发消费者的最大值
spring.rabbitmq.listener.simple.max-concurrency=10
# 每个消费者每次监听时可拉取解决的音讯数量
# 在单个申请中解决的音讯个数,他应该大于等于事务数量(unack 的最大数量)
spring.rabbitmq.listener.simple.prefetch=1
# 是否反对重试
spring.rabbitmq.listener.simple.retry.enabled=true

5.1.2.2.1 配置工厂的形式

/**
 * 设置消费者的确认机制,并达到能者多劳的成果
 *
 * @param connectionFactory 连贯工厂
 * @return
 */
@Bean("workListenerFactory")
public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory containerFactory =
        new SimpleRabbitListenerContainerFactory();
    containerFactory.setConnectionFactory(connectionFactory);
    // 批改为手动确认
    containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    // 回绝策略,true 回到队列 false 抛弃,默认是 true
    containerFactory.setDefaultRequeueRejected(true);
    // 默认的 PrefetchCount 是 250 批改为 1
    containerFactory.setPrefetchCount(1);

    return containerFactory;
}
  • 消费者批改
@RabbitListener(queuesToDeclare = @Queue("work_queue"))
// 将下面的监听,减少 containerFactory 属性,而后将配置好的工厂传入
@RabbitListener(queuesToDeclare = @Queue("work_queue"), containerFactory = "workListenerFactory")

5.1.3 公布与订阅模式

  • 生产者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testFanoutSendMessage() {rabbitTemplate.convertAndSend("order_exchange", "","This is a message !");
    }
}
  1. 因为从这个模式开始,就波及到交换机了,所以用的是三个参数的办法
  • 消费者
@Component
public class FanoutConsumer {
    // 绑定长期队列和交换机
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(), // 长期队列
                exchange = @Exchange(name = "order_exchange", type = "fanout") // 交换机与类型
            )
    })
    public void receiveMessage1(String message) {System.out.println("消费者 1:" + message);
    }

    // 绑定长期队列和交换机
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(), // 长期队列
                exchange = @Exchange(name = "order_exchange", type = "fanout") // 交换机与类型
            )
    })
    public void receiveMessage2(String message) {System.out.println("消费者 2:" + message);
    }
}

5.1.4 路由模式(Direct)

  • 生产者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testDirectSendMessage() {rabbitTemplate.convertAndSend("direct_exchange", "info", "This is a message !");
    }
}
  • 消费者
@Component
public class DirectConsumer {
    // 绑定长期队列和交换机
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(), // 长期队列
                    exchange = @Exchange(name = "direct_exchange", type = "direct"), // 交换机和类型
                    key = {"info", "warn", "error"} // 路由 key
            )

    })
    public void receiveMessage1(String message) {System.out.println("消费者 1:" + message);
    }

     // 绑定长期队列和交换机
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(), // 长期队列
                    exchange = @Exchange(name = "direct_exchange", type = "direct"), // 交换机和类型
                    key = {"info", "warn", "error"} // 路由 key
            )

    })
    public void receiveMessage2(String message) {System.out.println("消费者 2:" + message);
    }
}

5.1.5 主题模式

  • 生产者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testTopicSendMessage() {rabbitTemplate.convertAndSend("topic_exchange", "order.insert.common", "This is a message !");
    }
}
  • 消费者
@Component
public class TopicConsumer {
    // 绑定长期队列和交换机
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(), // 长期队列
                    exchange = @Exchange(name = "topic_exchange", type = "topic"), // 交换机和类型
                    key = {"order.*"} // 通配符路由 key
            )

    })
    public void receiveMessage1(String message) {System.out.println("消费者 1:" + message);
    }

    // 绑定长期队列和交换机
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(), // 长期队列
                    exchange = @Exchange(name = "topic_exchange", type = "topic"), // 交换机和类型
                    key = {"order.*"} // 通配符路由 key
            )
    })
    public void receiveMessage2(String message) {System.out.println("消费者 2:" + message);
    }
}

5.2 基于注解 + 配置类

其实这种形式,就是将交换机,队列的申明和绑定都在配置类中进行,一个是消费者中的注解变的简洁了,再有就是对立治理,更加条理,而且生产者和消费者援用的时候也更加不便,日后批改的时候,也不须要对每一处都批改。

因为篇幅过长了,这里演示最简单的 Topic 形式,其余的也是信手拈来。

  • 配置类
@Configuration
public class RabbitMqConfiguration {
    
    public static final String TOPIC_EXCHANGE = "topic_order_exchange";
    public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
    public static final String TOPIC_QUEUE_NAME_2 = "test_topic_queue_2";
    public static final String TOPIC_ROUTINGKEY_1 = "test.*";
    public static final String TOPIC_ROUTINGKEY_2 = "test.#";

    @Bean
    public TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1() {return new Queue(TOPIC_QUEUE_NAME_1);
    }

    @Bean
    public Queue topicQueue2() {return new Queue(TOPIC_QUEUE_NAME_2);
    }

    @Bean
    public Binding bindingTopic1(){return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_1);
    }
    @Bean
    public Binding bindingTopic2(){return BindingBuilder.bind(topicQueue2())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_2);
    }

}
  1. 增加 @Configuration 注解:表明这是一个配置类
  2. 定义常量:将交换机名,队列名,路由 key 等都能够创立为常量,调用,治理和批改都十分不便,还能够创立出一个专门的 RabbitMQ 的常量类。
  3. 定义交换机:因为这个例子是 Topic 所以抉择 TopicExchange 类型
  4. 定义队列:传入队列名常量即可,因为长久化等存在默认值,也能够本人自定长久化,是否独占等参数
  5. 绑定交换机和队列:利用 BindingBuilder 的 bind 办法绑定队列,to 绑定到指定交换机,with 传入路由 key
  • 生产者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testTopicSendMessage() {rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !");
    }
}
  • 消费者
@Component
public class TopicConsumer {
    // 绑定队列即可
    @RabbitListener(queues = {RabbitMqConfiguration.TOPIC_QUEUE_NAME_1})
    public void receiveMessage1(String message) {System.out.println("消费者 1:" + message);
    }
    
    // 绑定队列即可
    @RabbitListener(queues = {RabbitMqConfiguration.TOPIC_QUEUE_NAME_2})
    public void receiveMessage2(String message) {System.out.println("消费者 2:" + message);
    }
}

正文完
 0