关于rabbitmq:RabbitMQ

ribbitmq益处服务解耦防止服务之间耦合度过于严密,只须要降须要解决的音讯发送至音讯队列,单方只须要跟音讯服务器沟通即可 流量销峰为了防止高并发送的音讯须要解决,将产生的须要解决的音讯保留到音讯队列,解决一个拿一个,加重后盾解决音讯的压力 异步调用为了晋升用户的体验,将音讯发送到音讯队列后及时返回信息,如果有必要期待解决完结过后向用户发送处理完毕相应 rabbitmq的长久化1.队列长久化ch.queueDeclare(uuid,false/true为长久化队列, true, true, null);2.音讯长久化ch.basicPublish("logs",//交换机名 "", //陆游建指定 MessageProperties.PERSISTENT_TEXT_PLAIN 其余属性音讯长久化 msg.getBytes()); //转换字节ribbitmq的六大模式简略模式消费者共享队列,音讯队列实现了负载平衡,轮询发送音讯给所有的消费者 工作模式增加手动ack回执音讯并独自拉取音讯机制实现正当调配音讯并在音讯队列失去了缓存 订阅模式应用fanout模式交换机绑定音讯队列,分发送所有绑定的音讯队列 陆游模式应用直连模式的交换机(disrect模式),绑定音讯队列.音讯和队列绑定陆游建互相匹配则发送该音讯 主题模式非凡的陆游模式,动静的匹配陆建来发送音讯 RPC模式

September 30, 2020 · 1 min · jiezi

关于rabbitmq:RabbitMQ-04-订阅模式路由模式

rabbitmq六种工作模式3.公布订阅模式即向多个消费者传递同一条信息 1).Exchanges 交换机RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何音讯间接发送到队列。 相同,生产者只能向交换机(Exchange)发送音讯。交换机是一个非常简单的货色。一边接管来自生产者的音讯,另一边将音讯推送到队列。交换器必须确切地晓得如何解决它接管到的音讯。它应该被增加到一个特定的队列中吗?它应该增加到多个队列中吗?或者它应该被抛弃。这些规定由exchange的类型定义。 有几种可用的替换类型:direct、topic、header和fanout。创立fanout交换机logs: c.exchangeDeclare("logs", "fanout");或c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); fanout交换机非常简单。它只是将接管到的所有音讯播送给它所晓得的所有队列。 2).绑定 Bindings 创立了一个fanout交换机和一个队列。当初咱们须要通知exchange向指定队列发送音讯。exchange和队列之间的关系称为绑定。 //指定的队列,与指定的交换机关联起来//称为绑定 -- binding//第三个参数时 routingKey, 因为是fanout交换机, 这里疏忽 routingKeych.queueBind(queueName, "logs", "");3).整体代码1.生产者最重要的更改是,咱们当初心愿将音讯公布到logs交换机,而不是无名的日志交换机。咱们须要在发送时提供一个routingKey,然而对于fanout交换机类型,该值会被疏忽。 public class Producer { public static void main(String[] args) throws Exception { //建设连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //定义fanout类型交换机:logs //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //向交换机发送信息 while (true){ System.out.println("输出音讯:"); String msg = new Scanner(System.in).nextLine(); c.basicPublish("logs", "", null, msg.getBytes()); } }}2.消费者如果还没有队列绑定到交换器,音讯就会失落,但这对咱们来说没有问题;如果还没有消费者在听,咱们能够平安地抛弃这些信息。 ...

September 28, 2020 · 2 min · jiezi

关于rabbitmq:RabbitMQ-03

rabbitmq六种工作模式1.简略模式RabbitMQ是一个消息中间件,你能够设想它是一个邮局。当你把函件放到邮箱里时,可能确信邮递员会正确地递送你的函件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。 发送音讯的程序是生产者队列就代表一个邮箱。尽管音讯会流经RbbitMQ和你的应用程序,但音讯只能被存储在队列里。队列存储空间只受服务器内存和磁盘限度,它实质上是一个大的音讯缓冲区。多个生产者能够向同一个队列发送音讯,多个消费者也能够从同一个队列接管音讯.消费者期待从队列接管音讯 1.pom.xml增加 slf4j 依赖, 和 rabbitmq依赖 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tedu</groupId> <artifactId>rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.8.0-alpha2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.8.0-alpha2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build></project>2.生产者发送音讯 public class Test1 { public static void main(String[] args) throws Exception { //创立连贯工厂,并设置连贯信息 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672);//可选,5672是默认端口 f.setUsername("admin"); f.setPassword("admin"); /* * 与rabbitmq服务器建设连贯, * rabbitmq服务器端应用的是nio,会复用tcp连贯, * 并开拓多个信道与客户端通信 * 以加重服务器端建设连贯的开销 */ Connection c = f.newConnection(); //建设信道 Channel ch = c.createChannel(); /* * 申明队列,会在rabbitmq中创立一个队列 * 如果曾经创立过该队列,就不能再应用其余参数来创立 * * 参数含意: * -queue: 队列名称 * -durable: 队列长久化,true示意RabbitMQ重启后队列仍存在 * -exclusive: 排他,true示意限度仅以后连贯可用 * -autoDelete: 当最初一个消费者断开后,是否删除队列 * -arguments: 其余参数 */ ch.queueDeclare("helloworld", false,false,false,null); /* * 公布音讯 * 这里把音讯向默认交换机发送. * 默认交换机隐含与所有队列绑定,routing key即为队列名称 * * 参数含意: * -exchange: 交换机名称,空串示意默认交换机"(AMQP default)",不能用 null * -routingKey: 对于默认交换机,路由键就是指标队列名称 * -props: 其余参数,例如头信息 * -body: 音讯内容byte[]数组 */ ch.basicPublish("", "helloworld", null, "Hello world!".getBytes()); System.out.println("音讯已发送"); c.close(); }}3.消费者接管队列 ...

September 25, 2020 · 2 min · jiezi

关于rabbitmq:RabbitMQ-02

RabbitMQ 应用场景服务解耦1.假如有这样一个场景, 服务A产生数据, 而服务B,C,D须要这些数据, 那么咱们能够在A服务中间接调用B,C,D服务,把数据传递到上游服务即可 然而,随着咱们的利用规模不断扩大,会有更多的服务须要A的数据,如果有几十甚至几百个上游服务,而且会一直变更,再加上还要思考上游服务出错的状况,那么A服务中调用代码的保护会极为艰难 这是因为服务之间耦合度过于严密 2.再来思考用RabbitMQ解耦的状况 A服务只须要向音讯服务器发送音讯,而不必思考谁须要这些数据;上游服务如果须要数据,自行从音讯服务器订阅音讯,不再须要数据时则勾销订阅即可 流量削峰1.假如咱们有一个利用,平时访问量是每秒300申请,咱们用一台服务器即可轻松应答 而在高峰期,访问量霎时翻了十倍,达到每秒3000次申请,那么单台服务器必定无奈应答,这时咱们能够思考减少到10台服务器,来扩散拜访压力 但如果这种刹时顶峰的状况每天只呈现一次,每次只有半小时,那么咱们10台服务器在少数工夫都只分担每秒几十次申请,这样就有点浪费资源了 2.这种状况,咱们就能够应用RabbitMQ来进行流量削峰,顶峰状况下,霎时呈现的大量申请数据,先发送到音讯队列服务器,排队期待被解决,而咱们的利用,能够缓缓的从音讯队列接管申请数据进行解决,这样把数据处理工夫拉长,以加重刹时压力 这是音讯队列服务器十分典型的利用场景 异步调用思考定外卖领取胜利的状况 领取后要发送领取胜利的告诉,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程十分耗时,尤其是高峰期,可能要期待几十秒甚至更长 这样就造成整条调用链路响应十分迟缓 而如果咱们引入RabbitMQ音讯队列,订单数据能够发送到音讯队列服务器,那么调用链路也就能够到此结束,订单零碎则能够立刻失去响应,整条链路的响应工夫只有200毫秒左右 寻找外卖小哥的利用能够以异步的形式从音讯队列接管订单音讯,再执行耗时的寻找操作

September 25, 2020 · 1 min · jiezi

关于rabbitmq:RabbitMQ-01

搭建 RabbitMQ 服务器1.克隆 centos-7-1908 :rabbitmq 2.设置ip ./ip-staticip: 192.168.64.140ifconfigping www.baidu.com3.mobaxterm 连贯 140 服务器 4.上传 Rabbitmq 的离线安装文件 --> 解压缩 rabbitmq-install.zip --> 把 rabbitmq-install 文件夹上传到 /root/5.切换到rabbitmq-install目录 cd rabbitmq-install`装置 rpm -ivh *.rpm6.启动rabbitmq服务器 # 设置服务,开机主动启动systemctl enable rabbitmq-server# 启动服务systemctl start rabbitmq-server7.rabbitmq治理界面启用治理界面 # 开启治理界面插件rabbitmq-plugins enable rabbitmq_management# 防火墙关上 15672 治理端口firewall-cmd --zone=public --add-port=15672/tcp --permanentfirewall-cmd --reload8.重启RabbitMQ服务 systemctl restart rabbitmq-server9.拜访拜访服务器的15672端口,例如:http://192.168.64.140:15672 10.增加用户 # 增加用户rabbitmqctl add_user admin admin# 新用户设置用户为超级管理员rabbitmqctl set_user_tags admin administrator11.凋谢客户端连贯端口 # 关上客户端连贯端口firewall-cmd --zone=public --add-port=5672/tcp --permanentfirewall-cmd --reloa次要端口介绍 4369 – erlang发现口5672 – client端通信口15672 – 治理界面ui端口25672 – server间外部通信口

September 25, 2020 · 1 min · jiezi

关于rabbitmq:深入理解RabbitMQ的前世今生

原文:https://sq.163yun.com/blog/ar...作者:Java大蜗牛 对于RabbitMQ出身:诞生于金融行业的音讯队列语言:Erlang协定:AMQP(Advanced Message Queuing Protocol 高级音讯队列协定)关键词:内存队列,高可用,一条音讯队列构造 Producer/Consumer:生产者消费者Exchange:交换器,能够了解为队列的路由逻辑,交换器次要有三种,图中是Direct交换器Queue:队列Binding:绑定关系,理论是交换器上映射队列的规定发送和生产一条音讯在上图的模式下,交换器的类型为Direct,伪代码示意音讯的生产和生产 音讯生产#音讯发送办法#messageBody 音讯体#exchangeName 交换器名称#routingKey 路由键publishMsg(messageBody,exchangeName,routingKey){ ......}#音讯发送publishMsg("This is a warning log","exchange","log.warning"); RoutingKey=log.warning,和队列A与交换器的绑定统一,所以音讯被路由到了队列A上。 音讯生产对于音讯生产而言,消费者间接指定要生产的队列即可,比方指定生产队列A的数据。 须要留神的是,在消费者生产实现数据后,返回给RabbitMq ACK音讯,RabbitMq会删掉队列中的该条信息。 多种音讯路由模式在Exchange这个模块上,RabbitMq次要反对了Direct,Fanout,Topic三种路由模式,RabbitMq在路由模式上下功夫,也阐明了他在设计上想要满足多样化的需要。 Direct和Fanout模式比拟好了解,相似于单播和播送模式,Topic模式比拟有意思,它反对自定义匹配规定,依照规定把所有满足条件的音讯路由到指定队列,可能帮忙开发者灵便应答各类需要。 音讯的存储RabbitMQ的音讯默认是在内存里的,实际上不光是音讯,Exchange路由等信息理论都在内存中。内存的长处是高性能,问题在于故障后无奈复原。所以RabbitMQ也反对长久化的存储,也就是写磁盘。 要在RabbitMQ中长久化音讯,要同时满足三个条件: 音讯投体时应用长久化投递模式指标交换器是配置为长久化的指标队列是配置为长久化的RabbitMQ长久化音讯的形式是常见的写日志形式: 当一条长久化音讯发送到长久化的Exchange上时,RabbitMQ会在音讯提交到日志文件后,才发送响应。一旦这条音讯被生产后,RabbitMQ会将会把日志中该条音讯标记为期待垃圾收集,之后会从日志中革除。如果呈现故障,主动重建Exchange,Bindings和Queue,同时通过重播长久化日志来复原音讯。音讯长久化的优缺点很显著,领有故障恢复能力的同时,也带来了性能的急剧下降。同时,因为RabbitMQ默认状况下是没有冗余的,假如一个长久化节点解体,统一到该节点复原前,音讯和队列都无奈复原。 音讯投递模式1.发后即忘 RabbitMQ默认公布音讯是不会返回任何后果给生产者的,所以存在发送过程中失落数据的危险。 2.AMQP事务 AMQP事务保障RabbitMQ不仅收到了音讯,并胜利将音讯路由到了所有匹配的订阅队列,AMQP事务将使得生产者和RabbitMQ产生同步。 尽管事务使得生产者能够确定音讯曾经达到RabbitMQ中的对应队列,然而却会升高2~10倍的音讯吞吐量。 3.发送方确认 开启发送方确认模式后,音讯会有一个惟一的ID,一旦音讯被投递给所有匹配的队列后,会回调给发送方应用程序(蕴含音讯的惟一ID),使得生产者晓得音讯曾经平安达到队列了。 如果音讯和队列是配置成了长久化,这个确认音讯只会在队列将音讯写入磁盘后才会返回。如果RabbitMQ外部产生了谬误导致这条音讯失落,那么RabbitMQ会发送一条nack音讯,当然我了解这个是不能保障的。 这种模式因为不存在事务回滚,同时整体依然是一个异步过程,所以更加轻量级,对服务器性能的影响很小。 RabbitMQ RPC个别的异步服务间,可能会用两组队列实现两个服务模块之前的异步通信,乏味的是RabbitMQ就内建了这个性能。 RabbitMQ反对音讯应答性能,每个AMQP音讯头中有一个Reply_to字段,通过该字段指定音讯返回到的队列名称(这是一个公有队列)音讯的生产者能够监听该字段对应的队列。 RabbitMQ集群RabbitMQ集群的设计指标: 容许消费者和生产者在RabbitMQ节点解体的状况下持续运行能过通过增加节点来线性扩大音讯通信吞吐量从理论后果看,RabbitMQ实现设计指标上并不非常杰出,次要起因在于默认的模式下,RabbitMQ的队列实例子只存在在一个节点上(尽管后续也反对了镜像队列),既不能保障该节点解体的状况下队列还能够持续运行,也不能线性扩大该队列的吞吐量。 集群构造RabbitMQ外部的元数据次要有: 队列元数据-队列名称和属性交换器元数据-交换器名称,类型和属性绑定元数据-路由信息尽管RabbitMQ的队列理论只会在一个节点上,但元数据能够存在各个节点上。举个例子来说,当创立一个新的交换器时,RabbitMQ会把该信息同步到所有节点上,这个时候客户端不论连贯的那个RabbitMQ节点,都能够拜访到这个新的交换器,也就能找到交换器下的队列。 如上图所示,队列A的实例理论只在一个RabbitMQ节点上,其它节点理论存储的是只想该队列的指针。 为什么RabbitMQ不在各个节点间做复制了,《RabbitMQ实战》给出了两个起因: 存储老本-RabbitMQ作为内存队列,复制对存储空间的影响,毕竟内存是低廉而无限的性能损耗-公布音讯须要将音讯复制到所有节点,特地是对于长久化队列而言,性能的影响会很大我了解老本这个起因并不齐全成立,复制并不一定要复制到所有节点,比方一个队列能够只做两个正本,复制带来的内存老本能够交给应用方来评估,毕竟在内存中没有沉积的状况下,实际上队列是不会占用多大内存的。 还有一点是RabbitMQ自身并没有保障音讯生产的有序性,所以实际上队列被Partition到各个节点上,这样能力真正达到线性扩容的目标(以RabbitMQ的现状来说,单队列理论是无奈扩容的,只有在业务层做切分)。 注:RabbitMQ集群中的节点能够是内存节点也能够是磁盘节点,但要求至多有一个磁盘节点,这样呈现故障时能力复原数据。 镜像队列镜像队列架构RabbitMQ本人也思考到了咱们之前剖析的单节点长时间故障无奈复原的问题,所以RabbitMQ 2.6.0之后它也反对了镜像队列,换个说法也就是正本。 除了发送音讯,所有的操作理论都在主拷贝上,从拷贝理论只是个冷备(默认的状况下所有RabbitMQ节点上都会有镜像队列的拷贝),如果应用音讯确认模式,RabbitMQ会在主拷贝和从拷贝都平安的承受到音讯时才告诉生产者。 从这个构造上来看,如果从拷贝的节点挂了,理论没有任何影响,如果主拷贝挂了,那么会有一个从新选主的过程,这也是镜像队列的长处,除非所有节点都挂了,才会导致音讯失落。从新选主后,RabbitMQ会给消费者一个消费者勾销告诉(Consumer Cancellation),让消费者重连新的主拷贝。 镜像队列原理1.RabbitMQ构造 AMQPQueue:负责AMQP协定相干的音讯解决,包含接管音讯,投递音讯,Confirm音讯等BackingQueue:提供AMQQueue调用的接口,实现音讯的存储和长久化工作BackingQueue由Q1,Q2,Delta,Q3,Q4五个子队列形成,在Backing中,音讯的生命周期有四个状态: Alpha:音讯的内容和音讯索引都在RAM中。(Q1,Q4)Beta:音讯的内容保留在Disk上,音讯索引保留在RAM中。(Q2,Q3)Gamma:音讯的内容保留在Disk上,音讯索引在DISK和RAM上都有。(Q2,Q3)Delta:音讯内容和索引都在Disk上。(Delta)这里以长久化音讯为例(能够看到非长久化音讯的生命周期会简略很多),从Q1到Q4,音讯理论经验了一个RAM->DISK->RAM这样的过程,BackingQueue这么设计的目标有点相似于Linux的Swap,当队列负载很高时,通过将局部音讯放到磁盘上来节俭内存空间,当负载升高时,音讯又从磁盘回到内存中,让整个队列有很好的弹性。因而触发音讯流动的次要因素是:1.音讯被生产;2.内存不足。 RabbitMQ会更具音讯的传输速度来计算以后内存中容许保留的最大音讯数量(Traget_RAM_Count),当:内存中保留的音讯数量+期待ACK的音讯数量>Target_RAM_Count时,RabbitMQ才会把音讯写到磁盘上,所以说尽管实践上音讯会依照Q1->Q2->Delta->Q3->Q4的程序流动,然而并不是每条音讯都会经验所有的子队列以及对应的生命周期。 从RabbitMQ的Backing Queue构造来看,当外部有余时,音讯要经验多个生命周期,在Disk和RAM之间置换,者理论会升高RabbitMQ的解决性能(后续的流控就是关联的解决办法)。 2.镜像队列构造 所有对镜像队列主拷贝的操作,都会通过Guarented Multicasting(GM)同步到各个Salve节点,Coodinator负责组播后果的确认。 GM是一种牢靠的组播通信协议,保障组组内的存活节点都收到音讯。 GM的主播并不是由Master节点来负责告诉所有Slave的(目标是为了防止Master压力过大,同时防止Master生效导致音讯无奈最终Ack),RabbitMQ把一个镜像队列的所有节点组成一个链表,由主拷贝发动,由主拷贝最终确认告诉到了所有的Slave,而两头由Slave接力的形式进行音讯流传。 从这个构造来看,音讯实现整个镜像队列的同步耗时实践上是不低的,然而因为RabbitMQ音讯的音讯确认自身是异步的模式,所以整体的吞吐量并不会受到太大影响。 流控当RabbitMQ呈现内存(默认是0.4)或者磁盘资源达到阈值时,会触发流控机制,阻塞Producer的Connection,让生产者不能持续发送音讯,直到内存或者磁盘资源失去开释。 RabbitMQ基于Erlang/OTP开发,一个音讯的生命周期中,会波及多个过程间的转发,这些Erlang过程之间不共享内存,每个过程都有本人独立的内存空间,如果没有适合的流控机制,可能会导致某个过程占用内存过大,导致OOM。因而,要保障各个过程占用的内容在一个正当的范畴,RabbitMQ的流控采纳了一种信用证机制(Credit),为每个过程保护了四类键值对: ...

September 19, 2020 · 1 min · jiezi

关于rabbitmq:消息中间件RabbitMQ一-初识RabbitMQ

说到RabbitMQ,就不能不提AMQP协定,因为RabbitMQ就是基于AMQP协定来实现的,咱们先来看看AMQP协定到底是什么 AMQP协定 AMQP,即Advanced Message Queuing Protocol,一个提供对立音讯服务的应用层规范高级音讯队列协定,是应用层协定的一个凋谢规范,为面向音讯的中间件设计。基于此协定的客户端与消息中间件可传递音讯,并不受客户端/中间件不同产品,不同的开发语言等条件的限度。更多的对于AMQP的信息,大家能够去百度百科查看RabbitMQ次要组件 1.生产者(Producer):音讯生产者,生产音讯投递到交换机中的客户端 2.消费者(Consumer):音讯消费者,从队列中获取音讯生产的客户端 3.音讯(Message):蕴含有效载荷和标签,有效载荷指要传输的数据,标签形容了有效载荷,并且rabbitmq用它来决定谁取得音讯 4.信道(Channel):多路复用连贯中的一条独立的双向数据流通道 5.交换器(Exchange):服务器中的实体,用来接管生产者发送的音讯并将这些音讯路由给服务器中的队列 6.队列(Queue):一个命名实体,用来保留音讯直到发送给消费者 7.路由键(Routing Key):一个音讯头,交换器能够用这个音讯头决定如何路由某条音讯8.虚拟主机(Virtual Host):一批交换器、音讯队列和相干对象。虚拟主机是共享雷同的身份认证和加密环境的独立服务器域。客户端应用程序在登录到服务器之后,能够抉择一个虚拟主机 RabbitMQ简略架构模型 队列通过路由键绑定到交换器,当生产者生产音讯后,将音讯投递到交换器中,交换器依据绑定的路由键,将音讯路由到对应的队列,而后由订阅该队列的消费者接管音讯并生产 对于RabbitMQ的装置,网上资源很多,这里就不再多说。RabbitMQ的依赖较多,能够应用Dokcer去装置

August 13, 2020 · 1 min · jiezi

关于rabbitmq:RabbitMQ什么是死信队列

一 什么是死信队列当一条音讯在队列中呈现以下三种状况的时候,该音讯就会变成一条死信。 音讯被回绝(basic.reject / basic.nack),并且requeue = false音讯TTL过期队列达到最大长度当音讯在一个队列中变成一个死信之后,如果配置了死信队列,它将被从新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。 二 实现死信队列2.1 原理图 2.2 创立消费者创立一个消费者,绑定生产队列及死信交换机,交换机默认为direct模型,死信交换机也是,arguments绑定死信交换机和key。(注解反对的具体参数文末会附上) public class DirectConsumer { @RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "javatrip",arguments = {@Argument(name="x-dead-letter-exchange",value = "deadExchange"), @Argument(name="x-dead-letter-routing-key",value = "deadKey") }), exchange = @Exchange(value="javatripDirect"), key = {"info","error","warning"} ) })public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{ System.out.println("消费者1"+message);}2.3 创立生产者public void publishMessage(String message){ rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatripDirect","info",message);}三 造成死信的三种状况3.1 回绝音讯,并且禁止从新入队设置yml为手动签收模式spring: rabbitmq: listener: simple: acknowledge-mode: manual设置回绝音讯并禁止从新入队Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicNack(deliverTag,false,false);绑定死信队列@RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "javatripDead"), exchange = @Exchange(value = "deadExchange"), key = "deadKey" )})public void receive2(String message){ System.out.println("我是一条死信:"+message);}3.2 音讯TTL过期绑定业务队列的时候,减少音讯的过期时长,当音讯过期后,音讯将被转发到死信队列中。 ...

August 7, 2020 · 2 min · jiezi

关于rabbitmq:RabbitMQ如何保证消息不被重复消费

一 反复音讯为什么会呈现音讯反复?音讯反复的起因有两个:1.生产时音讯反复,2.生产时音讯反复。 1.1 生产时音讯反复因为生产者发送音讯给MQ,在MQ确认的时候呈现了网络稳定,生产者没有收到确认,实际上MQ曾经接管到了音讯。这时候生产者就会从新发送一遍这条音讯。 生产者中如果音讯未被确认,或确认失败,咱们能够应用定时工作+(redis/db)来进行音讯重试。 @Component@Slf4Jpublic class SendMessage { @Autowired private MessageService messageService; @Autowired private RabbitTemplate rabbitTemplate; // 最大投递次数 private static final int MAX_TRY_COUNT = 3; /** * 每30s拉取投递失败的音讯, 从新投递 */ @Scheduled(cron = "0/30 * * * * ?") public void resend() { log.info("开始执行定时工作(从新投递音讯)"); List<MsgLog> msgLogs = messageService.selectTimeoutMsg(); msgLogs.forEach(msgLog -> { String msgId = msgLog.getMsgId(); if (msgLog.getTryCount() >= MAX_TRY_COUNT) { messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL); log.info("超过最大重试次数, 音讯投递失败, msgId: {}", msgId); } else { messageService.updateTryCount(msgId, msgLog.getNextTryTime());// 投递次数+1 CorrelationData correlationData = new CorrelationData(msgId); rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 从新投递 log.info("第 " + (msgLog.getTryCount() + 1) + " 次从新投递音讯"); } }); log.info("定时工作执行完结(从新投递音讯)"); }}1.2生产时音讯反复消费者生产胜利后,再给MQ确认的时候呈现了网络稳定,MQ没有接管到确认,为了保障音讯被生产,MQ就会持续给消费者投递之前的音讯。这时候消费者就接管到了两条一样的音讯。 ...

August 6, 2020 · 2 min · jiezi

关于rabbitmq:RabbitMQ如何保证消息的可靠性

一条生产胜利被生产经验了生产者->MQ->消费者,因而在这三个步骤中都有可能造成音讯失落。 一 音讯生产者没有把音讯胜利发送到MQ1.1 事务机制AMQP协定提供了事务机制,在投递音讯时开启事务反对,如果音讯投递失败,则回滚事务。 自定义事务管理器 @Configurationpublic class RabbitTranscation { @Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ return new RabbitTemplate(connectionFactory); }}批改yml spring: rabbitmq: # 音讯在未被队列收到的状况下返回 publisher-returns: true开启事务反对 rabbitTemplate.setChannelTransacted(true);音讯未接管时调用ReturnCallback rabbitTemplate.setMandatory(true);生产者投递音讯 @Servicepublic class ProviderTranscation implements RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ // 设置channel开启事务 rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("这条音讯发送失败了"+message+",请解决"); } @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager") public void publishMessage(String message) throws Exception { rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatrip",message); }}然而,很少有人这么干,因为这是同步操作,一条音讯发送之后会使发送端阻塞,以期待RabbitMQ-Server的回应,之后能力持续发送下一条音讯,生产者生产音讯的吞吐量和性能都会大大降低。 ...

August 5, 2020 · 2 min · jiezi

关于rabbitmq:操作rabbitMQ时误删guest账户无法登录

1,以管理员身份关上cmd2,进入rabbitMQ安装文件件sbin目录下3,设置账户和明码rabbitmqctl.bat add_user guest guest4,设置为管理员rabbitmqctl.bat set_user_tags guest administrator5,设置权限rabbitmqctl set_permissions -p /develop guest "." "." ".*"

July 24, 2020 · 1 min · jiezi

关于rabbitmq:深入浅出-RabbitMQ

什么是 RabbitMQ简介(长处)基于 ErLang 语言开发有高可用高并发的长处,适宜集群。开源、稳固、易用、跨平台、反对多种语言、文档齐全。有音讯确认机制和长久化机制,可靠性高。概念生产者和消费者Producer:音讯的生产者Consumer:音讯的消费者Queue音讯队列提供了 FIFO 的解决机制,具备缓存音讯的能力。在 RabbitMQ 中,队列音讯能够设置为长久化,长期或者主动删除。如果是长久化的队列,Queue 中的音讯会在 Server 本地硬盘存储一份,避免零碎 Crash 数据失落。如果是长期的队列,Queue 中的数据在零碎重启之后就会失落。如实是主动删除的队列,当不存在用户连贯到 Server,队列中的数据会被主动删除。ExChangeExChange 相似于数据通信网络中的交换机,提供音讯路由策略。 在 RabbitMQ 中,生产者不是将音讯间接发送给 Queue,而是先发送给 ExChange,ExChange 依据生产者传递的 key 依照特定的路由算法将音讯给指定的 Queue。一个 ExChange 能够绑定多个 Queue。和 Queue 一样,ExChange 也能够设置为长久化、长期或者主动删除。 Binding所谓绑定就是将一个特定的 ExChange 和一个特定的 Queue 绑定起来。ExChange 和 Queue 的绑定能够是多对多的关系。 Virtual Host在 RabbitMQ Server上能够创立多个虚构的 Message Broker(又叫做 Virtual Hosts)。每一个 vhost 实质上是一个迷你的 RabbitMQ Server,别离治理各自的 ExChange 和 binding。生产者和消费者连贯 RabbitMQ Server 须要指定一个 Virtual Host。 应用过程客户端连贯到音讯队列服务器,关上一个 Channel。客户端申明一个 ExChange,并设置相干属性。客户端申明一个 Queue,并设置相干属性。客户端应用 Routing Key,在 ExChange 和 Queue 之间建设好绑定关系。客户端投递音讯到 ExChange。ExChange 接管到音讯后,就依据音讯的 key 和曾经设置的 bingding,进行音讯路由,将音讯投递到一个或多个队列里。部署 RabbitMQ应用 Docker Compose 部署创立 docker-compose.yml ...

July 18, 2020 · 2 min · jiezi

关于rabbitmq:RabbitMQ的开发应用

1.介绍RabbitMQ 是一个由erlang语言编写的、开源的、在AMQP根底上残缺的、可复用的企业音讯零碎。反对多种语言,包含java、Python、ruby、PHP、C/C++等。 1.1.AMQP模型AMQP:advanced message queuing protocol ,一个提供对立音讯服务的应用层规范高级音讯队列协定,是应用层协定的一个凋谢规范,为面向音讯的中间件设计。基于此协定的客户端与消息中间件可传递音讯并不受客户端/中间件不同产品、不同开发语言等条件的限度。 AMQP模型图 1.1.1.工作过程发布者(Publisher)公布音讯(Message),经由交换机(Exchange)。 交换机依据路由规定将收到的音讯分发给与该交换机绑定的队列(Queue)。 最初 AMQP 代理会将音讯投递给订阅了此队列的消费者,或者消费者依照需要自行获取。 1、发布者、交换机、队列、消费者都能够有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,音讯代理 能够别离存在于不同的设施上。 2、发布者公布音讯时能够给音讯指定各种音讯属性(Message Meta-data)。有些属性有可能会被音讯代理(Brokers)应用,然而其余的属性则是齐全不通明的,它们只能被接管音讯的利用所应用。 3、从平安角度思考,网络是不牢靠的,又或是消费者在解决音讯的过程中意外挂掉,这样没有解决胜利的音讯就会失落。基于此起因,AMQP 模块蕴含了一个音讯确认(Message Acknowledgements)机制:当一个音讯从队列中投递给消费者后,不会立刻从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才齐全从队列中删除。 4、在某些状况下,例如当一个音讯无奈被胜利路由时(无奈从交换机散发到队列),音讯或者会被返回给发布者并被抛弃。或者,如果音讯代理执行了延期操作,音讯会被放入一个所谓的死信队列中。此时,音讯发布者能够抉择某些参数来解决这些非凡状况。 1.1.2.Exchange交换机交换机是用来发送音讯的 AMQP 实体。交换机拿到一个音讯之后将它路由给一个或零个队列。它应用哪种路由算法是由交换机类型和绑定(Bindings)规定所决定的。常见的交换机有如下几种: direct 直连交换机:Routing Key==Binding Key,严格匹配。fanout 扇形交换机:把发送到该 Exchange 的音讯路由到所有与它绑定的 Queue 中。topic 主题交换机:Routing Key==Binding Key,含糊匹配。headers 头交换机:依据发送的音讯内容中的 headers 属性进行匹配。 具体无关这五种交换机的阐明和用法,后续会有章节具体介绍。1.1.3.Queue队列AMQP 中的队列(queue)跟其余音讯队列或工作队列中的队列是很类似的:它们存储着行将被利用生产掉的音讯。队列跟交换机共享某些属性,然而队列也有一些另外的属性。 Durable(音讯代理重启后,队列仍旧存在)Exclusive(只被一个连贯(connection)应用,而且当连贯敞开后队列即被删除)Auto-delete(当最初一个消费者退订后即被删除)Arguments(一些音讯代理用他来实现相似与 TTL 的某些额定性能)1.2.rabbitmq和kafka比照rabbitmq遵循AMQP协定,用在实时的对可靠性要求比拟高的消息传递上。kafka次要用于解决沉闷的流式数据,大数据量的数据处理上。次要体现在: 1.2.1.架构rabbitmq:RabbitMQ遵循AMQP协定,RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了音讯的路由键;客户端Producer通过连贯channel和server进行通信,Consumer从queue获取音讯进行生产(长连贯,queue有音讯会推送到consumer端,consumer循环从输出流读取数据)。rabbitMQ以broker为核心。kafka:kafka听从个别的MQ构造,producer,broker,consumer,以consumer为核心,音讯的生产信息保留的客户端consumer上,consumer依据生产的点,从broker上批量pull数据。1.2.2.音讯确认rabbitmq:有音讯确认机制。kafka:无音讯确认机制。1.2.3.吞吐量rabbitmq:rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不一样,rabbitMQ反对对音讯的牢靠的传递,反对事务,不反对批量的操作;基于存储的可靠性的要求存储能够采纳内存或者硬盘。kafka:kafka具备高的吞吐量,外部采纳音讯的批量解决,zero-copy机制,数据的存储和获取是本地磁盘程序批量操作,具备O(1)的复杂度,音讯解决的效率很高。 (备注:kafka零拷贝,通过sendfile形式。(1)一般数据读取:磁盘->内核缓冲区(页缓存 PageCache)->用户缓冲区->内核缓冲区->网卡输入;(2)kafka的数据读取:磁盘->内核缓冲区(页缓存 PageCache)->网卡输入。1.2.4.可用性rabbitmq:(1)一般集群:在多台机器上启动多个rabbitmq实例,每个机器启动一个。然而你创立的queue,只会放在一个rabbtimq实例上,然而每个实例都同步queue的元数据。完了你生产的时候,实际上如果连贯到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过去。(2)镜像集群:跟一般集群模式不一样的是,你创立的queue,无论元数据还是queue里的音讯都会存在于多个实例上,而后每次你写音讯到queue的时候,都会主动把音讯到多个实例的queue里进行音讯同步。这样的话,益处在于,一个机器宕机了,没事儿,别的机器都能够用。害处在于,第一,这个性能开销太大了,音讯同步所有机器,导致网络带宽压力和耗费很重。第二,这么玩儿,就没有扩展性可言了,如果某个queue负载很重,你加机器,新增的机器也蕴含了这个queue的所有数据,并没有方法线性扩大你的queuekafka:kafka是由多个broker组成,每个broker是一个节点;每创立一个topic,这个topic能够划分为多个partition,每个partition能够存在于不同的broker上,每个partition就放一部分数据。这就是人造的分布式音讯队列,就是说一个topic的数据,是扩散放在多个机器上的,每个机器就放一部分数据。每个partition的数据都会同步到其余机器上,造成本人的多个replica正本,而后所有replica会选举一个leader进去,主从构造。1.2.5.集群负载平衡rabbitmq:rabbitMQ的负载平衡须要独自的loadbalancer进行反对,如HAProxy和Keepalived等。kafka:kafka采纳zookeeper对集群中的broker、consumer进行治理,能够注册topic到zookeeper上;通过zookeeper的协调机制,producer保留对应topic的broker信息,能够随机或者轮询发送到broker上;并且producer能够基于语义指定分片,音讯发送到broker的某分片上。2.构造2.1.交换机模式RabbitMQ罕用的Exchange Type有fanout、direct、topic、headers这四种。 2.1.1.Direct Exchangedirect类型的Exchange路由规定很简略,它会把音讯路由到那些binding key与routing key齐全匹配的Queue中。 2.1.2.Topic Exchange后面讲到direct类型的Exchange路由规定是齐全匹配binding key与routing key,但这种严格的匹配形式在很多状况下不能满足理论业务需要。topic类型的Exchange与direct类型的Exchage类似,也是将音讯路由到binding key与routing key相匹配的Queue中,但反对含糊匹配: routing key为一个句点号“. ”分隔的字符串(咱们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”binding key与routing key一样也是句点号“. ”分隔的字符串binding key中能够存在两种特殊字符"*"与“#”,用于做含糊匹配,其中" * "用于匹配一个单词,“#”用于匹配多个单词(能够是零个)2.1.3.Fanout Exchangefanout类型的Exchange路由规定非常简单,它会把所有发送到fanout Exchange的音讯都会被转发到与该Exchange 绑定(Binding)的所有Queue上。 Fanout Exchange 不须要解决RouteKey 。只须要简略的将队列绑定到exchange 上。这样发送到exchange的音讯都会被转发到与该交换机绑定的所有队列上。相似子网播送,每台子网内的主机都取得了一份复制的音讯。所以,Fanout Exchange 转发音讯是最快的。 ...

July 17, 2020 · 5 min · jiezi

复制粘贴就完事的Linux-安装RabbitMQ

1.linux装置四个依赖 yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel httpd python-simplejson2.下载erlang的tar.gz包,如我下载的otp_src_19.3.tar.gz,https://www.erlang.org/downlo... 3.上传到opt目录下,解压文件 tar -zxvf otp_src_19.3.tar.gz4.执行查看命令 cd otp_src_19.3./configure --prefix=/opt/erlang5.执行编译装置命令 make && make install6.增加环境变量 vi /etc/profileERLANG_HOME=/opt/erlangexport PATH=$PATH:$ERLANG_HOME/binexport ERLANG_HOME7.配置文件失效 source /etc/profile8.验证是否装置胜利 erl9.下载rabbitmq的tar.xz包,如我下载的rabbitmq-server-generic-unix-3.6.10.tar.xz,https://www.rabbitmq.com/rele... 10.上传到opt目录下,解压文件 xz -d rabbitmq-server-generic-unix-3.6.10.tar.xztar -xvf rabbitmq-server-generic-unix-3.6.10.tar11.重命名,此步骤可跳过 mv rabbitmq_server-3.6.10/ rabbitmq12.增加环境变量 vi /etc/profileexport PATH=$PATH:/opt/rabbitmq/sbinexport RABBITMQ_HOME=/opt/rabbitmq13.配置文件失效 source /etc/profile14.启动rabbitmq rabbitmq-server -detached

July 13, 2020 · 1 min · jiezi

rabbitMQ

作者太懒了,总感觉做进去就会了,看过也会了,没有做笔记,前面找工夫补上 RabbitMQ基础知识解说RabbitMQ的装置RabbitMQ工作模式 - 发送端和接收端封装 RabbitMQ - 五种工作模式 - 简略模式和work工作模式RabbitMQ - 五种工作模式 - 订阅模式RabbitMQ - 五种工作模式 - 路由模式RabbitMQ - 五种工作模式 - 主题模式 RabbitMQ - 音讯长久化和手动应答RabbitMQ - 死信队列

July 10, 2020 · 1 min · jiezi

RabbitMQ跨机房迁移数据零丢失

一、背景介绍 公司以前大部分服务在私有云上,因使用有一段时间了,机器比较老化再加上运维成本高,计划将整个机房上云,因负责中间件一块,所以最近将RabbitMQ顺利地迁移到云上。 先说下大概部署结构,为 保证高可用,在私有云上部署了3台Broker,应用在配置文件中直接配置3个IP,每次请求时由客户端随机选择1台。 本次迁移的目标是: 1、零数据丢失,但不保证消息不重复消费; 2、不出现整个MQ集群长时间(2分钟以上)不可用; 二、方案分析 关于数据丢失,我们先要知道RabbitMQ中有哪些数据: Exchange、Queue、Message。 关于Exchange和Queue,可以设置在不存在的时候创建,但这样难以管控,所以一般我们都是在后台由管理员创建Exchange和Queue,并设置好相应属性,一般来说都要设置为持久化,即durable为true,这样保证在Broker重启时Exchange和Queue还存在。 Message的持久化则要在发送的时候设置相应的参数,如果用的amqp-client这个包,则代码如下: channel.basicPublish(exchange, routingKey, basicProperties, payload);其中为basicProperties为消息属性,类型为AMQP.BasicProperties public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties { private String contentType; private String contentEncoding; private Map<String,Object> headers; private Integer deliveryMode; private Integer priority; private String correlationId; private String replyTo; private String expiration; private String messageId; private Date timestamp; private String type; private String userId; private String appId; private String clusterId;关注deliveryMode属性,要设为2消息才会持久化。 ...

June 26, 2020 · 1 min · jiezi

RabbitMQ实践经验

虚拟主机可以将一台RabbitMQ服务器服务多个不同的应用,应用间通过不同虚拟主机的划分提供了消息逻辑上的独立。 #创建虚拟主机rabbitmqctl add_vhost test#删除虚拟主机rabbitmqctl delete_vhost test#查询当前RabbitMQ服务器上所有虚拟机rabbitmqctl list_vhosts消息保存RabbitMQ对Queue中消息的保存方式有disk和RAM两种。采用disk方式消息数据会被保存到以.rdq后缀命名的文件中。绝大部分情况下,对消息相关数据保存采用disk方式,如果有其他高可用手段,也可选用RAM。消息持久化涉及Queue、Message、Exchange三部分。1.Queue持久化通过设置durable为true来实现。 com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException;/*** exclusive表示是否为排他队列,为true表示为排他队列* autoDelete表示是否自动删除,为true表示队列会在没有任何订阅消费者时被自动删除。*/com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;void queueDeclareNoWait(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclarePassive(String var1) throws IOException;2.Message持久化如果想要重启后Queue里面没有发送的消息也继续存在,需要设置消息持久化。 /*** BasicProperties可以使用PERSISTENT_TEXT_PLAIN表示发送的是需要持久化的消息,其实也就是将BasicProperties中的deliveryMode设置为2*/void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;void basicPublish(String var1, String var2, boolean var3, BasicProperties var4, byte[] var5) throws IOException;void basicPublish(String var1, String var2, boolean var3, boolean var4, BasicProperties var5, byte[] var6) throws IOException;3.Exchange持久化在声明Exchange时使用支持durable入参的方法,将其设置为true。 ...

June 22, 2020 · 1 min · jiezi

基于RabbitMQ的消息推送

基于RabbitMQ的消息推送H5定义了WebSocket,能实现浏览器和服务器之间全双工通信,服务器可以推送数据给客户端浏览器。针对WebSocket通信,RabbitMQ需要提供Web STOMP插件,这样浏览器就可以使用RabbitMQ。前端需要引入一个stomp.js文件,可到GitHub上下载该文件,https://github.com/jmesnil/stomp-websocket,js文件地址:https://github.com/jmesnil/stomp-websocket/blob/master/lib/stomp.js

June 22, 2020 · 1 min · jiezi

RabbitMQ入门6死信和备份交换机

1. 死信交换机(Dead-Letter-Exchange) 当消息在一个队列中由于过期,被拒绝等原因变成死信(dead message)之后,它能被重新发送到一个交换机中这个交换机就是死信交换机,绑定死信交换机的队列就称之为死信叫交换机2. 判断一个消息是否是死信消息的依据:消息被拒绝 (msg.Reject) 并且设置requeue值设置为false消息过期后,消息过期时间设置主要有两种方式 设置队列的过期时间,这样改队列所有的消息都存在相同的过期时间 在队列申明的时候使用 x-message-ttl参数 单位为:毫秒单独设置某个消息的过期时间,每条消息的过期时间不一样(设置消息属性的 exporation 参数的值,单位为毫秒如果同时使用了两种方式设置过期四件,以两者之间较小的那个数值为准;队列已满,无法再添加消息到mq中 申明队列的时候设置 x-dead-letter-exchange参数备份交换机 未被正确路由的消息将会结果此交换机 申明交换机的时候设置 alternate-exchange 参数

June 20, 2020 · 1 min · jiezi

RabbitMQ功能实现1-红包未领取退回

生产者代码:package mainimport ( uuid "github.com/satori/go.uuid" "github.com/streadway/amqp" "github.com/wonderivan/logger" "rmq/db/rmq" "time")const ( DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机 DeadLettersQueueName = "dlx_queue_packet" // 死信队列 QueueName = "queue_packet" // 目标队列 ExchangeName = "exchange_packet" // 目标交换机)var ( ch *amqp.Channel err error conn *amqp.Connection queue amqp.Queue dlxQueue amqp.Queue)func main() { if conn, err = rmq.GetConn(); err != nil { logger.Error("连接RabbitMQ服务器失败:%s", err.Error()) return } defer conn.Close() if ch, err = conn.Channel(); err != nil { logger.Error("获取Channel失败:%s", err.Error()) return } defer ch.Close() // 声明队列交换机 if err = ch.ExchangeDeclare(ExchangeName, amqp.ExchangeFanout, true, false, false, false, nil); err != nil { logger.Error("声明业务交换机失败:%s", err.Error()) return } // 创建死信交换机 if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil { logger.Error("创建死信交换机:%s", err.Error()) return } // 创建死信队列 if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil { logger.Error("创建死信队列失败:%s", err.Error()) return } // 创建业务队列 if queue, err = ch.QueueDeclare(QueueName, true, false, false, false, amqp.Table{ "x-message-ttl": 6000, // 消息过期时间 毫秒 "x-dead-letter-exchange": DeadLettersExchangeName, // 死信交换机 // "x-dead-letter-routing-key": "dlxKey", // 死信路由key }); err != nil { logger.Warn("创建业务队列失败:%s", err.Error()) return } // 业务队列绑定交换机 if err = ch.QueueBind(queue.Name, "", ExchangeName, false, nil); err != nil { logger.Error("绑定业务交换机失败:%s", err.Error()) return } // 死信队列绑定死信交换机 if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil { logger.Error("绑定死信交换机失败:%s", err.Error()) } for i := 1; i <= 10; i++ { msg := amqp.Publishing{ MessageId: uuid.NewV4().String(), ContentType: "text/plain", Body: []byte("红包退回"), } // 发布消息 err = ch.Publish( ExchangeName, "", false, false, msg, ) if err != nil { logger.Error("发送失败: %s", err.Error()) return } else { logger.Info("发送成功:%s", msg.MessageId) } }}消费者代码package mainimport ( uuid "github.com/satori/go.uuid" "github.com/streadway/amqp" "github.com/wonderivan/logger" "rmq/db/rmq" "time")const ( DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机 DeadLettersQueueName = "dlx_queue_packet" // 死信队列 QueueName = "queue_packet" // 目标队列 ExchangeName = "exchange_packet" // 目标交换机)var ( ch *amqp.Channel err error conn *amqp.Connection queue amqp.Queue dlxQueue amqp.Queue)func main() { if conn, err = rmq.GetConn(); err != nil { logger.Error("连接RabbitMQ服务器失败:%s", err.Error()) return } defer conn.Close() if ch, err = conn.Channel(); err != nil { logger.Error("获取Channel失败:%s", err.Error()) return } defer ch.Close() // 创建死信交换机 if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil { logger.Error("创建死信交换机:%s", err.Error()) return } // 创建死信队列 if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil { logger.Error("创建死信队列失败:%s", err.Error()) return } // 死信队列绑定死信交换机 if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil { logger.Error("绑定死信交换机失败:%s", err.Error()) } msgList, err := ch.Consume(dlxQueue.Name, "", false, false, false, false, nil) if err != nil { logger.Error("消费者监听失败:%s", err.Error()) return } for { select { case message, ok := <-msgList: if !ok { continue } go func(msg amqp.Delivery) { logger.Info("messageID: %s", msg.MessageId) logger.Info("messageBody: %s", msg.Body) if err = msg.Ack(false); err != nil { logger.Error("确认消息失败") } }(message) case <-time.After(time.Second): } }}

June 20, 2020 · 3 min · jiezi

RabbitMQ入门4工作模式和交换机类型

工作模式1. 简单队列模式(simple queue)只包含一个生产者和一个消费者 生产者将消息发送到队列中 消费者从队列中接收消息2. 工作队列模式(work Queues)一个生产者对应多个消费者,一条消息只被一个消费者进行消费工作队列有轮询分发和公平分发两种模式2.1平均分配公平分配,每次只给一个消费者分配一个ch.Qos(1, 0, false)3. 发布-订阅模式(Publish/SubScribe)一个生产者,多个消费者每个消费者都有自己的消息队列,分别绑定到不同的队列上生产者没有把消息发送到队列,而是发送到交换机上每个队列都需要绑定到交换机上生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费如果消息发送到没有队列绑定的交换机时,消息将会消失,因为交换机没有存储消息的能力,只有队列才有存储消息的能力;4. 路由模式(routing)生产者将消息发送到direct交换机,它会吧消息路由到那些binding key 与 routing key 完全匹配的queue中这样就能实现消费者有选择的去消费消息5. 主题模式(Tipic)交换机通过模式匹配分配消息的路由键属性将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上,它将路由键和绑定的字符串切分为单词,这些单词之间用点隔开(use.news)同样也会识别两个通配符, "#" 和 "*"# 匹配0个或者多个单词, * 匹配多个单词binding keyrouting keylogger.#logger.errorlogger.infologger.debug#.infologger.info交换机类型参数名类型解释direct直连交换机fanout扇型交换机topic主题交换机headers头交换机直连交换机是根据消息携带的路由键(routing key) 将消息投递到对应的队列,步骤如下:将一个队列绑定到某个交换机上,同时赋予绑定一个路由键(routing key);当一个携带着路由键值为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列扇形交换机将消息路由给绑定到它身上的所有队列,不同于直连交换机,路由键在此类型上不启任务作用如果N个队列绑定到某个扇形交换机上,当有消息发送给此扇形交换机时,交换机会将此消息发送给这所有的N个队列主题交换机队列通过路由键绑定到交换机上,然后交换机根据消息里面的路由值,将消息路由给一个或多个绑定的队列扇形交换机和主题交换机异同对于扇形交换机 路由键是没有意义的,只要有消息,它都发送消息到它所绑定的所有队列上对于主题交换机 路由规则由路由键决定,只有满足路由键的规则,消息才可以路由到对应的队列上头交换机类似主题交换机,但是头交换机所有多个消息属性来替代路由键建立路由规则,通过判断消息头的值能否与指定的绑定相匹配来确立路由规则此交换机有个重要的参数: x-match当x-match为any时: 消息头的任意一个值被匹配就可以满足条件当x-match为all的时候,就需要消息头的所有值都匹配成功

June 20, 2020 · 1 min · jiezi

RabbitMQ入门3api参数

ch.QueueDeclare queue, err = ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments )参数名参数类型解释namestring队列名称durablebool是否持久化,队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库autoDeletebool是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除exclusivebool是否排外的,有两个作用,1:当连接关闭时该队列是否会自动删除;2:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常;一般等于true的话用于一个队列只能有一个消费者来消费的场景no-waitbool是否等待服务器返回argumentsmap[string]interface{}设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。ch.Publishch.Publish( "", // exchange "hello", // routing key false, // mandatory false, // immediate body, // msg) 参数名参数类型解释exchangestring交换机routing keystring路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用mandatorybooltrue:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉immediatebooltrue:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。msg 消息内容ch.Consumech.Consume( "hello", // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args)参数名参数类型解释queuestring consumerstring auto-ackbool是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答exclusivebool no-localbool no-waitbool是否等待服务器返回args ch.ExchangeDeclarech.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments)参数名参数类型解释namestring typestring交换机类型: direct fanout topic headers其中一种durablebool是否持久化,durable设置为true表示持久化,反之是非持久化,持久化的可以将交换器存盘,在服务器重启的时候不会丢失信息auto-deletedbool是否自动删除,设置为TRUE则表是自动删除,自删除的前提是至少有一个队列或者交换器与这交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,一般都设置为faseinternalbool是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式no-waitbool是否等待服务器返回arguments 其它一些结构化参数比如alternate-exchange

June 20, 2020 · 1 min · jiezi

RabbitMQ入门1下载安装

安装erlangrabbitMQ是erlang语言开发的,所以安装的的时候需要erlang环境yum -y install erlang 测试erlang安装完毕erl -version下载地址http://erlang.org/download/ 下载到本地wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-generic-unix-3.8.5.tar.xz解压tar -xvf rabbitmq-server-generic-unix-3.8.5.tar.xz移动目录mv rabbitmq_server-3.8.5 /opt/rabbitmq添加环境变量vim ~/.bash_profileexport RABBITMQ=/opt/rabbitmqPATH=$PATH:$HOME/bin:$RABBITMQ/sbin使环境变量生效source ~/.bash_profile查看环境变量echo $RABBITMQ后台启动rabbitmqrabbitmq-server -detached错误Crash dump was written to: erl_crash.dumpinit terminating in do_boot ()该错误是erlang和rabbitmq不符合 只能重新安装erlang对应版本查询: https://www.rabbitmq.com/whic... 编译安装erlang卸载之前安装的erlangyum remove erlangyum list installed | grep erlang-ertsyum remove erlang-erts.x86_64下载wget http://erlang.org/download/otp_src_21.3.tar.gz解压tar -zxvf otp_src_21.3.tar.gz 进入目录cd otp_src_21.3生成makefile./configure --prefix=/opt/erlang编译安装make make install配置环境变量vim ~/.bash_profile# 使环境变量生效source ~/.bash_profile查看是否安装成功erl -version

June 20, 2020 · 1 min · jiezi

RabbitMQ入门2工作原理和基本操作

组成部分:名称解释Broker消息队列服务进程,该进程包含2个部分,Exchange和QueueExchange消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤!Queue存储消息的队列,消息到达队列并转发给消费方Producer消息生产者,即生产方客户端,生产方客户端将消息发送到MQConsumer消息消费者,消费方客户端,接收MQ转发的消息消息发布流程:生产者和Broker建立TCP连接生产者和Broker建立通道.生产者通过通道把消息发送给Broker,由Exchange将消息转发.Exchange将消息发送给指定的Queue(队列)消息接收流程:消费者和Broker建立TCP连接消费者和Broker建立通道消费者监听指定的Queue当有消息到达Queue时Broker默认将消息推给消费者消费者接收到消息.基本操作后台启动rabbitmqrabbitmq-server -detached查看单节点状态rabbitmqctl status查看日志cat $RABBITMQ/var/log/rabbitmq/rabbit@$HOSTNAME.log查看集群状态rabbitmqctl cluster_status新增用户rabbitmqctl add_user lee lee新增授权rabbitmqctl set_permissions -p / lee ".*" ".*" ".*"设置管理者权限rabbitmqctl set_user_tags lee administrator启动web管理插件rabbitmq-plugins enable rabbitmq_management打开 http://ip:15672/ 即可打开web管理界面

June 20, 2020 · 1 min · jiezi

一文带你深入理解消息中间件技术RabbitMQ服务

什么叫消息队列?消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。 为何用消息队列?消息队列是一种应用间的异步协作机制,那什么时候需要使用消息队列呢?像用户下单之后、生成订单、结算,定时给系统注册用户推送活动消息,一些常见的流程类的业务都会用到消息队列服务。 一、RabbitMQ简介RabbitMQ是一个消息的代理器,用于接收和发送消息,你可以这样想,他就是一个邮局,当您把需要寄送的邮件投递到邮筒之时,你可以确定的是邮递员先生肯定会把邮件发送到需要接收邮件的人的手里,不会送错的。在这个比喻中,RabbitMQ就是一个邮箱,也可以理解为邮局和邮递员,他们负责把消息发送出去和用于接收信息。 RabbitMQ和邮局这两者之间的主要区别是它不会处理纸质邮件,取而代之的是接收、存储和发送二进制数据块,也就是我们通常所说的消息。 二 、RabbitMQ基本概念下图是RabbitMQ服务的内部结构 1)Message 消息,它由消息头和消息体两部分组成。消息体是不透明的,但消息头是由一些属性组成的,其中包括:routing-key(路由键)、priority(优先权)、delivery-mode(持久存储)。2)Publisher 生产者,也是消息的生产者,它是向交换器发布消息的应用程序3)Exchange 交换器,用来接收生产者传递过来的消息,然后将这些消息路由至服务器中的队列4)Binding 绑定,用于消息队列与交换器之间的沟通。也是消息路由的规则,相当于一个路由表。5)Queue 消息队列,用来保存消息直到发送给消费者。一个消息可以进入一个或多个队列,除消费者取走消息,否则它一直在消息队列里。6)Connection 网络连接,如:一个TCP连接7)Channel 信道,多路复用连接中一个独立的双向数据传输通道。无论是发布消息、订阅队列、接收消息都是通过信道来完成。复用信道是为了降低系统资源的消耗。8)Consumer 消费者,也就是接收生产者发来的消息的客户端应用。9)Virtual Host 虚拟主机,交换器、消息队列相关的对象。一个VHOST其实可以看成一个rabbitmp服务器,它拥有自己的队列、交换器、绑定与权限机制等。Rabbitmq默认vhost是/。三、RabbitMQ 特点RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。 RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 具体特点包括:1)可靠性(Reliability RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 2)灵活的路由(Flexible Routing) 在消息进入队列之前,通过Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的Exchange 来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的 Exchange 。 3)消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 4)高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 5)多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 6)多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 7)管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 8)跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 9)插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。参考文章 http://www.rabbitmq.com/四、Rabbitmq的工作过程1)客户端连接到消息队列服务器,开启一个channel2)客户端声明一个exchange、queue,并配置相关属性3)客户端使用routing key,在exchange与queue之间建立好绑定关系4)客户端传递消息到交换器5)交换器接收到消息后,根据预定的KEY与绑定关系,对消息进行路由至消息队列 ...

June 10, 2020 · 2 min · jiezi

RabbitMQ-延迟队列

rabbitmq的延迟队列,我们可以通过死信交换器来实现。生产者发送消息,定义2秒后消息过期,消息就会进入死信交换器,最后到死信队列。 // 定义队列的名称public final static String QUEUE_NAME = "queue.scheduler";// 定义交换器的名称public final static String EXCHANGE_NAME = "exchange.scheduler";// 定义路由的名称public final static String ROUTE_NAME = "route.scheduler";// 定义死信队列的名称public final static String DLX_QUEUE_NAME = "scheduler.queue.name";// 定义死信交换器的名称public final static String DLX_EXCHANGE_NAME = "scheduler.exchange.name";public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 // 创建一个Channel try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 定义交换器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); arguments.put("x-message-ttl", 2000); // 定义队列 channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); // 绑定队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_NAME); // 定义死信交换器 channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null); // 定义死信队列 channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null); // 绑定死信队列 channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, ROUTE_NAME); // 发送消息 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME, true, null, df.format(new Date()).getBytes()); }}消费者,从私信队列获取消息,可以得到延迟后的消息。 ...

June 4, 2020 · 1 min · jiezi

RabbitMQ系列

RabbitMQ - 队列RabbitMQ - 消息确认RabbitMQ - 消息预取RabbitMQ - 消息拒绝RabbitMQ - 交换器RabbitMQ - 备用交换器RabbitMQ - 发送方的可靠性RabbitMQ - 死信队列RabbitMQ - 延迟队列

June 4, 2020 · 1 min · jiezi

RabbitMQ-死信队列

ActiveMQ - 死信队列有死信队列,rabbitmq也有死信队列,以下几种情况会有把消息投递到死信队列: 消息被拒绝,且requeue设置为false。消息过期(队列过期并不会把消息投递给死信队列)由于超过了队列的消息最大数被抛弃消息投递给死信队列的时候,也会经过交换器,这个交换器称之为死信交换器,但是他依然是一个正常的交换器。要设置队列的死信交换,在声明队列时需要指定可选的x-dead-letter-exchange参数。主要是下面的代码: Map<String, Object> args = new HashMap<String, Object>();args.put("x-dead-letter-exchange", "some.exchange.name");示例:// 定义队列的名称public final static String QUEUE_NAME = "queue.dlx";// 定义交换器的名称public final static String EXCHANGE_NAME = "exchange.dlx";// 定义路由的名称public final static String ROUTE_NAME = "route.dlx";// 定义死信队列的名称public final static String DLX_QUEUE_NAME = "some.queue.name";// 定义死信交换器的名称public final static String DLX_EXCHANGE_NAME = "some.exchange.name";// 定义死信路由的名称public final static String DLX_ROUTE_NAME = "some.route.name";public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 // 创建一个Channel try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 定义交换器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "some.exchange.name"); arguments.put("x-message-ttl", 2000); // 定义队列 channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); // 绑定队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_NAME); // 定义死信交换器 channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null); // 定义死信队列 channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null); // 绑定死信队列 channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, ROUTE_NAME); // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME, true, null, "dlx".getBytes()); }}流程是这样的: ...

June 3, 2020 · 2 min · jiezi

RabbitMQ-备用交换器

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换器来实现。大概原理如下,如下图所示,消息发送给交换器,交换器发现没有可路由的队列,于是消息发给备用交换器,备用交换器再发给队列2,由队列2的消费者来处理消息。 示例我们在RabbitMQ - 交换器知道,交换器的定义,需要一个参数,可以通过参数的方式,来指定备用交换器。参数的key是alternate-exchange,value是交换器的名称。通常备用交换器的类型是fanout。生产者中,定义一个交换器和备用交换器,此时并没有响应路由的队列。 public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 // 创建一个Channel try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 备用交换器 channel.exchangeDeclare("back.exchange", BuiltinExchangeType.FANOUT); Map<String, Object> map = new HashMap<>(); map.put("alternate-exchange", "back.exchange"); // 定义交换器 channel.exchangeDeclare("exchange", BuiltinExchangeType.DIRECT, false, false, map); String[] routingKeys = {"exchange.no.found"}; for (int i = 0; i < routingKeys.length; i++) { // 把消息发送到队列中 channel.basicPublish("exchange", routingKeys[i], null, routingKeys[i].getBytes()); } System.out.println("Sent complete"); }}消费者,订阅的是备用交换器信息 ...

May 30, 2020 · 1 min · jiezi

RabbitMQ-交换器

在Rabbitmq中,消息发送给交换器,交换器根据一定的规则把消息发给队列,broker再把消息发送给消费者,或者发送至主动从队列拉去消息。前面几张讲了队列的相关东西,这篇看看交换器是如何把消息发送给队列的。 交换器交换器接收消息并将其路由到零个或多个队列,它支持四种交换类型:Direct、Fanout、Topic、Headers。还还声明了一些属性,其中最重要的是:交换器的名称、交换器类型、是否持久化、是否自动删除、参数。是否持久化,决定了rabbitmq重启后,交换器是否存在。是否自动删除,决定了当最后一个队列被解除绑定时,交换器是否被删除。 Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;默认的交换器默认的交换器名称为"",即空字符串。当我们没有定义的时候,看起来就像消息直接发送到队列一样。 Direct根据消息路由键将消息传递到队列。主要的步骤如下: 通过路由键K把队列绑定到交换器当带有路由键R的新消息到达交换器时,如果K = R,交换器将其路由到队列生产者代码,通过channel.basicPublish把带有路由键("images.archive", "images.crop", "images.resizer")的消息发送给交换器images。 public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 // 创建一个Channel try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 定义交换器 channel.exchangeDeclare(Constant.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String[] routingKeys = {"images.archive", "images.crop", "images.resizer"}; for (int i = 0; i < routingKeys.length; i++) { // 把消息发送到队列中 channel.basicPublish(Constant.EXCHANGE_NAME, routingKeys[i], null, routingKeys[i].getBytes()); channel.basicPublish(Constant.EXCHANGE_NAME, routingKeys[i], null, routingKeys[i].getBytes()); } System.out.println("Sent complete"); }}ArchiveRec1消费者,通过channel.queueBind把交换器images、路由键images.archive、队列archive1绑定一起。 ...

May 30, 2020 · 4 min · jiezi

RabbitMQ-消息拒绝

RabbitMQ - 消息确认中提到了当消费者收到消息后,需要对消息进行确认,队列才会把这个消息删除。如果消息处理中发生了异常需要拒绝消息怎么办呢?在这个章节中,我们看到了没确认消息时,如果断开了和rabbitmq的连接,消息会回到待发送那边,等待其他消费者,虽然我们可以通过关闭连接来拒绝消息,但是频繁的频繁的建立连接、关闭连接,会增加rabbitmq的负担。rabbitmq提供了另外一种优雅的方式来拒绝消息,方法如下: void basicReject(long deliveryTag, boolean requeue) throws IOException第一个参数deliveryTag,消息确认中提过,传递标识。第二个参数requeue,为true的话,消息会重新发给下一个消费者,为false的话,就不发给消费者,相当于说,消息我确认了。 重新投递重新投递,就是requeue为true的情况。 public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 Connection connection = factory.newConnection(); // 创建一个Channel Channel channel = connection.createChannel(); // 通过Channel定义队列 channel.queueDeclare("reject", false, false, false, null); // 异步回调处理 DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("reject message '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true); }; // 接收消息 channel.basicConsume("reject", false, deliverCallback1, consumerTag -> { });}我们通过web控制台发送一条消息,在控制台打印如下,由于当前消费者把消息拒绝了,所以rabbitmq重新投递,又发给这个消费者,消费者又拒绝,所以一直打印,相对于死循环了。从deliveryTag可以看出,这条消息每次重新投递,就会递增。消费者端口连接,web控制有一条消息 ...

May 29, 2020 · 1 min · jiezi

rabbitmq账户的问题

rabbitmq的web管理界面无法使用guest用户登录安装最新版本的rabbitmq(3.3.1),并启用management plugin后,使用默认的账号guest登陆管理控制台,却提示登陆失败。 翻看官方的release文档后,得知由于账号guest具有所有的操作权限,并且又是默认账号,出于安全因素的考虑,guest用户只能通过localhost登陆使用,并建议修改guest用户的密码以及新建其他账号管理使用rabbitmq(该功能是在3.3.0版本引入的)。 虽然可以以比较猥琐的方式:将ebin目录下rabbit.app中loopback_users里的<<"guest">>删除,  并重启rabbitmq,可通过任意IP使用guest账号登陆管理控制台,但始终是违背了设计者的初衷,再加上以前对这一块了解也不多,因此有必要总结一下。 1. 用户管理 用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。 相应的命令 (1) 新增一个用户 rabbitmqctl  add_user  Username  Password (2) 删除一个用户 rabbitmqctl  delete_user  Username (3) 修改用户的密码 rabbitmqctl  change_password  Username  Newpassword (4) 查看当前用户列表 rabbitmqctl  list_users 2. 用户角色 按照个人理解,用户角色可分为五类,超级管理员, 监控者, 策略制定者, 普通管理者以及其他。 (1) 超级管理员(administrator) 可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。 (2) 监控者(monitoring) 可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) (3) 策略制定者(policymaker) 可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 与administrator的对比,administrator能看到这些内容 (4) 普通管理者(management) 仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。 (5) 其他 无法登陆管理控制台,通常就是普通的生产者和消费者。 了解了这些后,就可以根据需要给不同的用户设置不同的角色,以便按需管理。 设置用户角色的命令为: rabbitmqctl  set_user_tags  User  Tag User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。 也可以给同一用户设置多个角色,例如 ...

May 28, 2020 · 1 min · jiezi

RabbitMQ-消息预取

RabbitMQ - 消息确认这篇文章中,提到了消息预取,避免了rabbitmq一直往消费端发送数据,导致消费端出现无限制的缓冲区问题。消息预取定义了信道上或者消费者允许的最大未确认的消息数量。一旦未确认数达到了设置的值,RabbitMQ将停止传递更多消息,除非至少有一条未完成的消息得到确认。使用消息预取的时候,会调用chanel的basicQos方法,prefetchCount是未确认的消息数,global默认值为false,是限制消费者未确认的消息数,设置为true的时候,是限制信道上的未确认消息数。 void basicQos(int prefetchCount, boolean global) throws IOException;消费者限制global设置为false,当每个消费者有2个未确认的消息时,不能再发消息给消费者。 public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 Connection connection = factory.newConnection(); // 创建一个Channel Channel channel = connection.createChannel(); // 通过Channel定义队列 channel.queueDeclare("qos", false, false, false, null); // 异步回调处理 DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; /* DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback2 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); };*/ channel.basicQos(2, false); // 接收消息 channel.basicConsume("qos", false, deliverCallback1, consumerTag -> { }); /*channel.basicConsume("qos", false, deliverCallback2, consumerTag -> { });*/}运行后,往队列发送了4条消息,可以看到,未发送(ready)有2个,未确认2个。控制台确实只收到了两个消息。如果把注释放开,同时有两个消费者,可以看到,未发送(ready)有0个,未确认4个。控制台结果如下,两个都消费了两个。 ...

May 28, 2020 · 2 min · jiezi

RabbitMQ-消息确认

RabbitMQ - 队列中提到,接收消息的时候,有两个方式,一个是consume,一个是get,这两个方法都有一个autoAck的参数。当我们设置为true的时候,说明消费者会通过AMQP显示的向rabbitmq发送一个确认,rabbitmq自动视其确认了消息,然后把消息从队列中删除。下面用consume的方式做些例子来理解autoAck的参数设置。 String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;GetResponse basicGet(String queue, boolean autoAck) throws IOException;不确认先往ack队列发送5条数据,可以看到ready是5,total是5。运行以下代码,autoAck设置为false,且不对消息确认。 public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 Connection connection = factory.newConnection(); // 创建一个Channel Channel channel = connection.createChannel(); // 通过Channel定义队列 channel.queueDeclare("ack", false, false, false, null); // 异步回调处理 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("ack Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 接收消息 channel.basicConsume("ack", false, deliverCallback, consumerTag -> { });}运行结果如下,打印了5条消息:在web控制台可以看出,ready是0,unacked是5,即未确认的消息数是5条。把应用停止掉,即关闭消费者和rabbitmq的连接,web控制台如下,unacked的5条数据回到了ready。综上,当autoAck为false时,消息分为两个部分,一个是未投放放消费者的(ready),一个是投放给消费者但未确认的。如果未确认信息的消费者断开了连接,这部分消息会回到ready重新投递给消费者,确保了消息的可靠性。需要注意的是,如果消费者一直没有断开连接,也没有进行确认,那这个消息会一直等待确认中。 ...

May 27, 2020 · 1 min · jiezi

rabbitmq的启动问题

/usr/bin/curl 127.0.0.1:15672 查看本地能否访问1、开启web控制,在web端设置 执行rabbitmq-plugins enable rabbitmq_management 开启web访问 重启服务 service  rabbitmq-server restart 登录 ‘http://localhost:15672’,默认用户名密码为guest:guest,该用户只能本机登录; 添加用户,赋予admin权限,保存用户;再点击用户,赋予所有权限即可 2、命令行操作 rabbitmqctl add_user test passwd    设置该用户为administrator角色: rabbitmqctl set_user_tags test administrator    设置权限: rabbitmqctl set_permissions -p '/' test '.' '.' '.'    重启rabbitmq服务: sudo service rabbitmq-server restart

May 27, 2020 · 1 min · jiezi

yii2-queue

Worker starting control[](https://github.com/yiisoft/yi...Supervisor 是Linux的进程监视器。 它会自动启动您的控制台进程。 安装在Ubuntu上,你需要运行命令: sudo apt-get install supervisor Supervisor 配置文件通常可用 /etc/supervisor/conf.d。 你可以创建任意数量的配置文件。 配置示例: [program:yii-queue-worker]process_name=%(program_name)s_%(process_num)02dcommand=/usr/bin/php /var/www/my_project/yii queue/listen --verbose=1 --color=0autostart=trueautorestart=trueuser=www-datanumprocs=4redirect_stderr=truestdout_logfile=/var/www/my_project/log/yii-queue-worker.log在这种情况下,Supervisor 会启动4个 queue/listen worker。输出将写入相应日志文件。 有关 Supervisor 配置和使用的更多信息,请参阅文档。 以守护进程模式启动的Worker使用 queue/listen 命令支持 File、 Db、 Redis、 RabbitMQ、 Beanstalk、 Gearman 驱动。 有关其他参数,请参阅驱动程序指南。 [](https://github.com/yiisoft/yi...Systemd is an init system used in Linux to bootstrap the user space. To configure workers startup using systemd, create a config file named yii-queue@.service in /etc/systemd/system with the following contents: ...

May 27, 2020 · 2 min · jiezi

RabbitMQ-队列

AMQP消息路由必须包含三部分,交换器、队列、绑定。如下图所示,生产者把消息发送给交换器,交换器再路由到符合条件的队列上,最终被消费者接收。绑定决定了消息如何从路由器路由到相应的队列。这一篇,主要是了解一下队列。 相关概念当新增队列的时候,需要定义一下4中属性,分布是Name、Durability、Auto delete、Arguments。 Name:队列名称,不能用amq.开头命名。Durability:持久化,如果为durable,那broker重启不会丢失,如果为transient,broker重启后会丢失。(win系统仅仅重启rabbitmq是不行的)Auto delete:最后一个消费者退订时被自动删除Arguments:队列的其他参数,如上图,比如消息的TTL等。定义一个队列的方法如下,exclusive的参数,下面的临时队列会说明。 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;命名我们在使用队列之前,需要先声明队列。如果队列不存在,则创建队列。如果已存在则不会创建,但是和已存在的队列属性不一致,则会有406 (PRECONDITION_FAILED)的通道级异常。 参数设置参数的设置有两种,一种是通过分组,一个是一个个队列设置。分组的方式更加灵活、非侵入性,不需要修改和重新部署应用程序,是官方推荐的方式。参数的描述如下: 参数描述x-message-ttl消息的存活时间,单位为毫秒x-expires队列的存活时间,单位为毫秒x-max-length队列的最大消息数x-max-length-bytes消息的最大字节数x-overflow消息达到最大数的策略,drop-head或者reject-publishx-dead-letter-exchange死信队列的交换器x-dead-letter-routing-key死信队列的路由键x-max-priority消息的优先级设置x-queue-mode消息的延迟x-queue-master-locator用于主从临时队列当我们需要一个临时队列的时候,我们可以先定义队列,使用完再删除,或者直接定义Durability的属性为transient,等broker重启的时候就消失,但是感觉没有很方便。特别是使用后删除,如果客户端失败,这个队列就一直存在。我们可以用以下方法来自动删除: Exclusive:独占队列x-expires:设置队列的过期时间Auto delete:队列设置自动删除public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 Connection connection = factory.newConnection(); // 创建一个Channel Channel channel = connection.createChannel(); // 通过Channel定义队列 channel.queueDeclare("queue1", false, true, false, null); channel.queueDeclare("queue2", false, false, true, null); channel.basicConsume("queue2", true, null, consumerTag -> { }); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-expires",5000); channel.queueDeclare("queue3", false, false, false, arguments);}queue1是独占队列,queue2是自动删除,queue3设置了5秒的过期时间。运行后如下图,五秒后queue3消失,停止程序运行,queue1和queue2消失。需要注意的是,如果把queue2的basicConsume方法调用注释掉,由于没有消费者,队列并不会消失。独占队列只能由其声明连接使用(从声明连接使用、清除、删除等)。其他队列如果想使用独占队列将导致通道级异常RESOURCE_LOCKED,该异常带有一条错误消息,表明无法获得对锁定队列的独占访问。 ...

May 26, 2020 · 2 min · jiezi

rabbitMq常用创建消息应用的maven demo项目(一)---路由routing

rabbitmq官网上提供了6个demo,分别从是hello world、工作队列、发布/订阅、路由、主题、rpc这六个demo。基本上看完这6哥demo之后,对rabbitmq应该就有了清晰的认识,并且可以达到基本数量应用的程度。下面我挑选最常用的路由和主题这两个demo,为大家翻译下。个人加谷歌翻译,有不合适的地方,欢迎大家批评指正。Routing—路由在之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给许多接收者。在本教程中,我们将在他的基础上添加一个功能 - 只订阅一部分消息。例如,我们只将严重错误的消息导入日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。 ...

May 14, 2018 · 2 min · jiezi