关于消息队列:最详解消息队列以及RabbbitMQ之HelloWorld

41次阅读

共计 11260 个字符,预计需要花费 29 分钟才能阅读完成。

1、音讯队列

1、MQ的相干概念

1、什么是 MQ

MQ(message queue),从字面意思上看,实质是个队列,FIFO 先入先出,只不过队列中寄存的内容是 message 而已,还是一种 跨过程 的通信机制,用于上下游传递音讯。

在互联网架构中,MQ 是一种十分常见的上下游“逻辑解耦 + 物了解 耦”的音讯通信服务。应用了 MQ 之后,音讯发送上游只须要依赖 MQ,不必依赖其余服务。

2、为什么要应用 MQ

1、流量消峰

举个例子:如果订单零碎最多能解决一万次订单,这个解决能力应酬失常时段的下单时入不敷出,失常时段咱们下繁多秒后就能返回后果。然而在高峰期,如果有两万次下单操作系统是解决不了的,只能限度订单超过一万后不容许用户下单。应用音讯队列做缓冲,咱们能够勾销这个限度,把一秒内下的订单扩散成一段时间来解决,这时有些用户可能在下单十几秒后能力收到下单胜利的操作,然而比不能下单的体验要好。

2、利用解耦

以电商利用为例,利用中有订单零碎、库存零碎、物流零碎、领取零碎。用户创立订单后,如果耦合调用库存零碎、物流零碎、领取零碎,任何一个子系统出了故障,都会造成下单操作异样。当转变成基于音讯队列的形式后,零碎间调用的问题会缩小很多,比方物流零碎因为产生故障,须要几分钟来修复。在这几分钟的工夫里,物流零碎要解决的内存被缓存在音讯队列中,用户的下单操作能够失常实现。当物流零碎复原后,持续解决订单信息即可,中单用户感触不到物流零碎的故障,晋升零碎的可用性。

3、异步解决

有些服务间调用是异步的,例如 A 调用 B,B 须要破费很长时间执行,然而 A 须要晓得 B 什么时候能够执行完,以前个别有两种形式:

  1. A 过一段时间去调用 B 的查问 api 查问
  2. A 提供一个 callback api,B 执行完之后调用 api 告诉 A 服务。

这两种形式都不是很优雅,应用音讯总线,能够很不便解决这个问题,A 调用 B 服务后,只须要监听 B 解决实现的音讯,当 B 解决实现后,会发送一条音讯给 MQ,MQ 会将此音讯转发给 A 服务。这样 A 服务既不必循环调用 B 的查问 api,也不必提供 callback api。同样 B 服务也不必做这些操作。A 服务还能及时的失去异步解决胜利的音讯。

3、MQ 的分类

1、ActiveMQ

长处:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,音讯可靠性较低的概率失落数据

毛病:官网社区当初对 ActiveMQ 5.x 保护越来越少,高吞吐量场景较少应用。

2、Kafka

大数据的杀手锏,谈到大数据畛域内的音讯传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据畛域的宠儿,在数据采集、传输、存储的过程中施展着无足轻重的作用。目前曾经被 LinkedIn,Uber, Twitter, Netflix 等大公司所驳回。

长处:性能卓越,单机写入 TPS 约在百万条 / 秒,最大的长处,就是 吞吐量高。时效性 ms 级可用性十分高,kafka 是分布式的,一个数据多个正本,多数机器宕机,不会失落数据,不会导致不可用, 消费者采纳 Pull 形式获取音讯, 音讯有序, 通过管制可能保障所有音讯被生产且仅被生产一次; 有优良的第三方 Kafka Web 治理界面 Kafka-Manager;在日志畛域比拟成熟,被多家公司和多个开源我的项目应用;性能反对:性能较为简单,次要反对简略的 MQ 性能,在大数据畛域的实时计算以及日志采集被大规模应用.

毛病:Kafka 单机超过 64 个队列 / 分区,Load 会产生显著的飙高景象,队列越多,load 越高,发送音讯响应工夫变长,应用短轮询形式,实时性取决于轮询间隔时间,生产失败不反对重试;反对音讯程序,然而一台代理宕机后,就会产生音讯乱序,社区更新较慢

3、RocketMQ

RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了本人的一些改良。被阿里巴巴广泛应用在订单,交易,充值,流计算,音讯推送,日志流式解决,binglog 散发等场景。

长处: 单机吞吐量十万级, 可用性十分高,分布式架构, 音讯能够做到 0 失落,MQ 性能较为欠缺,还是分布式的,扩展性好, 反对 10 亿级别的音讯沉积,不会因为沉积导致性能降落, 源码是 java 咱们能够本人浏览源码,定制本人公司的 MQ

毛病:反对的客户端语言不多,目前是 java 及 c++,其中 c++ 不成熟;社区活跃度个别, 没有在 MQ 外围中去实现 JMS 等接口, 有些零碎要迁徙须要批改大量代码

4、RabbitMQ

2007 年公布,是一个在 AMQP(高级音讯队列协定)根底上实现的,可复用的企业音讯零碎,是以后最支流的消息中间件之一。

长处:因为 erlang 语言的高并发个性,性能较好;吞吐量到万级,MQ 性能比拟齐备, 强壮、稳固、易用、跨平台、反对多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,反对 AJAX 文档齐全;开源提供的治理界面十分棒,用起来很好用, 社区活跃度高;更新频率相当高

毛病:商业版须要免费, 学习老本较高

比照
个性 ActiveMQ Kafka RocketMQ RabbitMQ
单机吞吐量 单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级 10 万级,高吞吐,个别配合大数据类的零碎来进行实时数据计算、日志采集等场景 10 万级,撑持高吞吐 吞吐量到万级
topic 数量对吞吐量的影响 topic 从几十到几百个时候,吞吐量会大幅度降落,在等同机器下,Kafka 尽量保障 topic 数量不要过多,如果要撑持大规模的 topic,须要减少更多的机器资源 topic 能够达到几百 / 几千的级别,吞吐量会有较小幅度的降落,这是 RocketMQ 的一大劣势,在等同机器下,能够撑持大量的 topic
时效性 ms 级 提早在 ms 级以内 ms 级 微秒级,这是 RabbitMQ 的一大特点,提早最低
可用性 高,基于主从架构实现高可用 十分高,分布式,一个数据多个正本,多数机器宕机,不会失落数据,不会导致不可用 十分高,分布式架构 高,基于主从架构实现高可用
音讯可靠性 有较低的概率失落数据 通过参数优化配置,能够做到 0 失落 通过参数优化配置,能够做到 0 失落 根本不丢
性能反对 MQ 畛域的性能极其齐备 性能较为简单,次要反对简略的 MQ 性能,在大数据畛域的实时计算以及日志采集被大规模应用 MQ 性能较为欠缺,还是分布式的,扩展性好 基于 erlang 开发,并发能力很强,性能极好,延时很低

4、MQ 的抉择

对音讯队列进行技术选型时,须要通过以下指标掂量你所抉择的音讯队列,是否能够满足你的需要:

  • 音讯程序:发送到队列的音讯,生产时是否能够保障生产的程序,比方 A 先下单,B 后下单,应该是 A 先去扣库存,B 再去扣,程序不能反。
  • 音讯路由:依据路由规定,只订阅匹配路由规定的音讯,比方有 A / B 两者规定的音讯,消费者能够只订阅 A 音讯,B 音讯不会生产。
  • 音讯可靠性:是否会存在丢音讯的状况,比方有 A / B 两个音讯,最初只有 B 音讯能生产,A 音讯失落。
  • 音讯时序:次要包含“音讯存活工夫”和“提早 / 预约的音讯”,“音讯存活工夫”示意生产者能够对音讯设置 TTL,如果超过该 TTL,音讯会主动隐没;“提早 / 预约的音讯”指的是能够提早或者预订生产音讯,比方延时 5 分钟,那么音讯会 5 分钟后能力让消费者生产,工夫未到的话,是不能生产的。
  • 音讯留存:音讯生产胜利后,是否还会持续保留在音讯队列。
  • 容错性:当一条音讯生产失败后,是否有一些机制,保障这条音讯是一种能胜利,比方异步第三方退款音讯,须要保障这条音讯生产掉,能力确定给用户退款胜利,所以必须保障这条音讯生产胜利的准确性。
  • 伸缩:当音讯队列性能有问题,比方生产太慢,是否能够疾速反对库容;当生产队列过多,节约系统资源,是否能够反对缩容。
  • 吞吐量:反对的最高并发数
1、kafka

Kafka 次要特点是基于 Pull 的模式来解决音讯生产,谋求高吞吐量,一开始的目标就是用于日志收集和传输,适宜产生大量数据的互联网服务的数据收集业务。大型公司倡议能够选用,如果有日志采集、实时计算性能,必定是首选 kafka。

2、RocketMQ

天生为金融互联网畛域而生,对于可靠性要求很高的场景,尤其是电商外面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无奈及时处理的状况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 曾经经验了屡次考验,如果你的业务有上述并发场景,倡议能够抉择 RocketMQ。

3、RabbitMQ

== 联合 erlang== 语言自身的并发劣势,性能好时效性微秒级,社区活跃度也比拟高,治理界面用起来非常不便,如果你的数据量没有那么大,中小型公司优先选择性能比拟齐备的 RabbitMQ。

2、音讯队列模式

1、点对点模式

一个具体的音讯只能由一个消费者生产,多个生产者能够向同一个音讯队列发送音讯,然而一个音讯在被一个音讯者解决的时候,这个音讯在队列上会被锁住或者被移除并且其余消费者无奈解决该音讯。

须要额定留神的是,如果消费者解决一个音讯失败了,音讯零碎个别会把这个音讯放回队列,这样其余消费者能够持续解决。

2、公布 / 订阅模式

单个音讯能够被多个订阅者并发的获取和解决。一般来说,订阅有两种类型:

  • 长期(ephemeral)订阅:这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出,相应的订阅以及尚未解决的音讯就会失落。
  • 长久(durable)订阅:这种订阅会始终存在,除非被动去删除。消费者退出后,音讯零碎会持续保护该订阅,并且后续音讯能够被持续解决。

3、RabbitMQ

1、什么是RabbitMQ

RabbitMQ是一个消息中间件:它承受并转发音讯。你能够把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,依照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的次要区别在于,它不解决快件而是接管,存储和转发音讯数据。

2、四大外围概念

1、生产者

产生数据发送音讯的程序是生产者

2、交换机

交换机是 RabbitMQ 十分重要的一个部件,一方面它接管来自生产者的音讯,另一方面它将音讯推送到队列中。交换机必须确切晓得如何解决它接管到的音讯,是将这些音讯推送到特定队列还是推送到多个队列,亦或者是把音讯抛弃,这个得有交换机类型决定。

3、队列

队列是 RabbitMQ 外部应用的一种数据结构,只管音讯流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限度的束缚,实质上是一个大的音讯缓冲区。许多生产者能够将音讯发送到一个队列,许多消费者能够尝试从一个队列接收数据。这就是咱们应用队列的形式

4、消费者

生产与接管具备类似的含意。消费者大多时候是一个期待接管音讯的程序。请留神生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既能够是生产者又是能够是消费者。

3、基本概念

提到 RabbitMQ,就不得不提AMQP 协定。AMQP协定是具备古代特色的二进制协定。是一个提供对立音讯服务的应用层规范高级音讯队列协定,是应用层协定的一个凋谢规范,为面向音讯的中间件设计。

先理解一下 AMQP 协定两头的几个重要概念:

  • Server:接管客户端的连贯,实现 AMQP 实体服务。
  • Connection:连贯,应用程序与 Server 的网络连接,TCP 连贯。
  • Channel:信道,音讯读写等操作在信道中进行。客户端能够建设多个信道,每个信道代表一个会话工作。如果每一次拜访 RabbitMQ 都建设一个 Connection,在音讯量大的时候建设 TCP Connection 的开销将是微小的,效率也较低。Channel 是在 connection 外部建设的逻辑连贯,如果应用程序反对多线程,通常每个 thread 创立独自的 channel 进行通信,AMQP method 蕴含了 channel id 帮忙客户端和 message broker 辨认 channel,所以 channel 之间是齐全隔离的。Channel 作为轻量级的

    Connection 极大缩小了操作系统建设 TCP connection 的开销

  • Message:音讯,应用程序和服务器之间传送的数据,音讯能够非常简单,也能够很简单。由 Properties 和 Body 组成。Properties 为外包装,能够对音讯进行润饰,比方音讯的优先级、提早等高级个性;Body 就是音讯体内容。
  • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机外面能够有若干个 Exchange 和 Queue,同一个虚拟主机外面不能有雷同名称的 Exchange 或 Queue。
  • Exchange:交换器,接管音讯,依照路由规定将音讯路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者间接抛弃。RabbitMQ罕用的交换器罕用类型有 direct、topic、fanout、headers 四种,前面具体介绍。
  • Binding:绑定,交换器和音讯队列之间的虚构连贯,绑定中能够蕴含一个或者多个RoutingKey,Binding 信息被保留到 exchange 中的查问表中,用于 message 的散发根据
  • RoutingKey:路由键,生产者将音讯发送给交换器的时候,会发送一个RoutingKey,用来指定路由规定,这样交换器就晓得把音讯发送到哪个队列。路由键通常为一个“.”宰割的字符串,例如“com.rabbitmq”
  • Queue:音讯队列,用来保留音讯,供消费者生产。

4、工作原理

不得不看一下经典的图了,如下

AMQP 协定模型由三局部组成:生产者、消费者和服务端,执行流程如下:

  1. 生产者是连贯到 Server,建设一个连贯,开启一个信道。
  2. 生产者申明交换器和队列,设置相干属性,并通过路由键将交换器和队列进行绑定。
  3. 消费者也须要进行建设连贯,开启信道等操作,便于接管音讯。
  4. 生产者发送音讯,发送到服务端中的虚拟主机。
  5. 虚拟主机中的交换器依据路由键抉择路由规定,发送到不同的音讯队列中。
  6. 订阅了音讯队列的消费者就能够获取到音讯,进行生产。

5、环境搭建

我应用的 Linux 搭建,搭建流程可参考官网:https://www.rabbitmq.com/inst…

查看零碎版本号,Rabbitmq 对 Erlang 有版本要求,不能应用太旧的 Erlang 版本:https://www.rabbitmq.com/whic…

Erlang 下载地址:https://github.com/rabbitmq/e…

1、下载 rpm 包
  • erlang-23.3.4.8-1.el7.x86_64.rpm
  • rabbitmq-server-3.10.5-1.el8.noarch.rpm
2、上传至服务器
# 创立寄存目录
mkdir -p /usr/rabbitmq
3、安装文件
[root@xiaobear rabbitmq]# rpm -Uvih erlang-25.0.2-1.el8.x86_64.rpm 
正告:erlang-25.0.2-1.el8.x86_64.rpm: 头 V4 RSA/SHA256 Signature, 密钥 ID cc4bbe5b: NOKEY
谬误:依赖检测失败:libcrypto.so.1.1()(64bit) 被 erlang-25.0.2-1.el8.x86_64 须要
    libcrypto.so.1.1(OPENSSL_1_1_0)(64bit) 被 erlang-25.0.2-1.el8.x86_64 须要
    libcrypto.so.1.1(OPENSSL_1_1_1)(64bit) 被 erlang-25.0.2-1.el8.x86_64 须要
    libstdc++.so.6(CXXABI_1.3.9)(64bit) 被 erlang-25.0.2-1.el8.x86_64 须要
    libtinfo.so.6()(64bit) 被 erlang-25.0.2-1.el8.x86_64 须要
    libz.so.1(ZLIB_1.2.7.1)(64bit) 被 erlang-25.0.2-1.el8.x86_64 须要

PS:这是因为下载版本的问题,el8 下载的是 8 的版本,改回下载 7 的就能够了

我下载的版本:erlang-23.3.4.8-1.el7.x86_64.rpm

[root@xiaobear rabbitmq]# rpm -Uivh erlang-23.3.4.8-1.el7.x86_64.rpm 
正告:erlang-23.3.4.8-1.el7.x86_64.rpm: 头 V4 RSA/SHA256 Signature, 密钥 ID cc4bbe5b: NOKEY
筹备中...                          ################################# [100%]
正在降级 / 装置...
   1:erlang-23.3.4.8-1.el7            ################################# [100%]
[root@xiaobear rabbitmq]# yum install socat -y

查看版本

# 测试
erl -version

rabbitmq 在装置过程中须要依赖 socat 这个插件,须要先装置

[root@xiaobear rabbitmq]# rpm -ivh rabbitmq-server-3.10.5-1.el8.noarch.rpm 
正告:rabbitmq-server-3.10.5-1.el8.noarch.rpm: 头 V4 RSA/SHA512 Signature, 密钥 ID 6026dfca: NOKEY
筹备中...                          ################################# [100%]
正在降级 / 装置...
   1:rabbitmq-server-3.10.5-1.el8     ################################# [100%]
[root@xiaobear rabbitmq]# 
4、常用命令
# 开机启动 chkconfig rabbitmq-server on 会转发到上面命令
systemctl enable rabbitmq-server.service
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态,running 示意启动胜利
systemctl status rabbitmq-server.service
# 开机自启动
systemctl enable rabbitmq-server
# 进行服务
systemctl stop rabbitmq-server
5、装置 Web 治理插件
rabbitmq-plugins enable rabbitmq_management
#装置实现后,重启服务
systemctl restart rabbitmq-server
6、拜访 web 页面

拜访地址:服务器 IP+ 端口号(默认 15672), 若没有反馈,请凋谢端口,执行上面命令

# 防火墙凋谢 15672 端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload

留神:

  • 在对应服务器(阿里云,腾讯云等)的平安组中凋谢 15672 端口(rabbitmq 默认端口号),5672 端口后续程序须要应用也要凋谢
  • rabbitmq 有一个默认账号和明码都是:guest默认状况只能在 localhost 本计下拜访,所以须要增加一个近程登录的用户
7、增加用户
  • 创立账号

    rabbitmqctl add_user admin admin123
  • 调配角色

    rabbitmqctl set_user_tags admin administrator

    用户操作权限分四种级别:

    1. administrator:能够登录控制台、查看所有信息、能够对 rabbitmq 进行治理
    2. monitoring:监控者 登录控制台,查看所有信息
    3. policymaker:策略制定者 登录控制台,指定策略
    4. managment 一般管理员 登录控制台
  • 设置权限

    set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

    用户 user_admin 具备 /vhost1 这个 virtual host 中所有资源的配置、写、读权限以后用户和角色

    rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
  • 再次拜访登录,即可胜利
8、重置命令
# 敞开利用的命令
rabbitmqctl stop_app
#革除的命令
rabbitmqctl reset
#重新启动命令
rabbitmqctl start_app
9、其余命令
# 增加账号、明码
rabbitmqctl add_user
# 设置账号为管理员
rabbitmqctl set_user_tags 账号 administrator
# 批改账号密码
rabbitmqctl change_password Username Newpassword
# 查看用户清单
rabbitmqctl list_users
# 增加账号查看资源的权限
rabbitmqctl set_permissions -p / 用户名 ".*"".*"".*"

2、Hello World

官网 Hello World 地址:https://www.rabbitmq.com/tuto…

1、新建 maven 我的项目

增加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>xiaobear-RabbitMQ</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.13.1</version>
        </dependency>
    </dependencies>

</project>

2、建生产者类

public class Product {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        // 创立连贯
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.130.134");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        //channel 实现了主动 close 接口 主动敞开 不须要显示敞开
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            /**
             * 生成一个队列
             * 1. 队列名称
             * 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
             * 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
             * 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
             * 5. 其余参数
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            /**
             * 发送一个音讯
             * 1. 发送到那个交换机
             * 2. 路由的 key 是哪个
             * 3. 其余的参数信息
             * 4. 发送音讯的音讯体
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent'" + message + "'");
        }
    }
}

执行后,关上治理面板,会发现,存在一条音讯待生产

3、建消费者类

public class RabbitMQConfig {

    /**
     * rabbitmq 连贯信息
     * @return
     */
    public static ConnectionFactory connectRabbitMq(){
        // 创立连贯
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.130.134");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        return factory;
    }
}
public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        // 创立连贯诶信息
        ConnectionFactory factory = RabbitMQConfig.connectRabbitMq();

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("[*] Waiting for messages. To exit press CTRL+C");
        // 推送的音讯如何进行生产的接口回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] Received'" + message + "'");
        };
        /**
         * 消费者生产音讯
         * 1. 生产哪个队列
         * 2. 生产胜利之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3. 消费者未胜利生产的回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

执行消费者后,控制台打印出生产的音讯,此时治理面板待生产的音讯为 0

4、遇到的问题

1、Connection timed out: connect

查看 5672 端口是否凋谢

2、connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

Virtual Host 为【/】的 set permission 给用户

正文完
 0