关于rabbitmq:RabbitMQ

ribbitmq益处服务解耦防止服务之间耦合度过于严密,只须要降须要解决的音讯发送至音讯队列,单方只须要跟音讯服务器沟通即可 流量销峰为了防止高并发送的音讯须要解决,将产生的须要解决的音讯保留到音讯队列,解决一个拿一个,加重后盾解决音讯的压力 异步调用为了晋升用户的体验,将音讯发送到音讯队列后及时返回信息,如果有必要期待解决完结过后向用户发送处理完毕相应 rabbitmq的长久化1.队列长久化ch.queueDeclare(uuid,false/true为长久化队列, true, true, null);2.音讯长久化ch.basicPublish("logs",//交换机名 "", //陆游建指定 MessageProperties.PERSISTENT_TEXT_PLAIN 其余属性音讯长久化 msg.getBytes()); //转换字节ribbitmq的六大模式简略模式消费者共享队列,音讯队列实现了负载平衡,轮询发送音讯给所有的消费者 工作模式增加手动ack回执音讯并独自拉取音讯机制实现正当调配音讯并在音讯队列失去了缓存 订阅模式应用fanout模式交换机绑定音讯队列,分发送所有绑定的音讯队列 陆游模式应用直连模式的交换机(disrect模式),绑定音讯队列.音讯和队列绑定陆游建互相匹配则发送该音讯 主题模式非凡的陆游模式,动静的匹配陆建来发送音讯 RPC模式

September 30, 2020 · 1 min · jiezi

关于rabbitmq:RabbitMQ-04-订阅模式路由模式

rabbitmq六种工作模式3.公布订阅模式即向多个消费者传递同一条信息 1).Exchanges 交换机RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何音讯间接发送到队列。 相同,生产者只能向交换机(Exchange)发送音讯。交换机是一个非常简单的货色。一边接管来自生产者的音讯,另一边将音讯推送到队列。交换器必须确切地晓得如何解决它接管到的音讯。它应该被增加到一个特定的队列中吗?它应该增加到多个队列中吗?或者它应该被抛弃。这些规定由exchange的类型定义。 有几种可用的替换类型:direct、topic、header和fanout。创立fanout交换机logs: c.exchangeDeclare("logs", "fanout");或c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); fanout交换机非常简单。它只是将接管到的所有音讯播送给它所晓得的所有队列。 2).绑定 Bindings 创立了一个fanout交换机和一个队列。当初咱们须要通知exchange向指定队列发送音讯。exchange和队列之间的关系称为绑定。 //指定的队列,与指定的交换机关联起来//称为绑定 -- binding//第三个参数时 routingKey, 因为是fanout交换机, 这里疏忽 routingKeych.queueBind(queueName, "logs", "");3).整体代码1.生产者最重要的更改是,咱们当初心愿将音讯公布到logs交换机,而不是无名的日志交换机。咱们须要在发送时提供一个routingKey,然而对于fanout交换机类型,该值会被疏忽。 public class Producer { public static void main(String[] args) throws Exception { //建设连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //定义fanout类型交换机:logs //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //向交换机发送信息 while (true){ System.out.println("输出音讯:"); String msg = new Scanner(System.in).nextLine(); c.basicPublish("logs", "", null, msg.getBytes()); } }}2.消费者如果还没有队列绑定到交换器,音讯就会失落,但这对咱们来说没有问题;如果还没有消费者在听,咱们能够平安地抛弃这些信息。 ...

September 28, 2020 · 2 min · jiezi

关于rabbitmq:RabbitMQ-03

rabbitmq六种工作模式1.简略模式RabbitMQ是一个消息中间件,你能够设想它是一个邮局。当你把函件放到邮箱里时,可能确信邮递员会正确地递送你的函件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。 发送音讯的程序是生产者队列就代表一个邮箱。尽管音讯会流经RbbitMQ和你的应用程序,但音讯只能被存储在队列里。队列存储空间只受服务器内存和磁盘限度,它实质上是一个大的音讯缓冲区。多个生产者能够向同一个队列发送音讯,多个消费者也能够从同一个队列接管音讯.消费者期待从队列接管音讯 1.pom.xml增加 slf4j 依赖, 和 rabbitmq依赖 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tedu</groupId> <artifactId>rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.8.0-alpha2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.8.0-alpha2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build></project>2.生产者发送音讯 public class Test1 { public static void main(String[] args) throws Exception { //创立连贯工厂,并设置连贯信息 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672);//可选,5672是默认端口 f.setUsername("admin"); f.setPassword("admin"); /* * 与rabbitmq服务器建设连贯, * rabbitmq服务器端应用的是nio,会复用tcp连贯, * 并开拓多个信道与客户端通信 * 以加重服务器端建设连贯的开销 */ Connection c = f.newConnection(); //建设信道 Channel ch = c.createChannel(); /* * 申明队列,会在rabbitmq中创立一个队列 * 如果曾经创立过该队列,就不能再应用其余参数来创立 * * 参数含意: * -queue: 队列名称 * -durable: 队列长久化,true示意RabbitMQ重启后队列仍存在 * -exclusive: 排他,true示意限度仅以后连贯可用 * -autoDelete: 当最初一个消费者断开后,是否删除队列 * -arguments: 其余参数 */ ch.queueDeclare("helloworld", false,false,false,null); /* * 公布音讯 * 这里把音讯向默认交换机发送. * 默认交换机隐含与所有队列绑定,routing key即为队列名称 * * 参数含意: * -exchange: 交换机名称,空串示意默认交换机"(AMQP default)",不能用 null * -routingKey: 对于默认交换机,路由键就是指标队列名称 * -props: 其余参数,例如头信息 * -body: 音讯内容byte[]数组 */ ch.basicPublish("", "helloworld", null, "Hello world!".getBytes()); System.out.println("音讯已发送"); c.close(); }}3.消费者接管队列 ...

September 25, 2020 · 2 min · jiezi

关于rabbitmq:RabbitMQ-02

RabbitMQ 应用场景服务解耦1.假如有这样一个场景, 服务A产生数据, 而服务B,C,D须要这些数据, 那么咱们能够在A服务中间接调用B,C,D服务,把数据传递到上游服务即可 然而,随着咱们的利用规模不断扩大,会有更多的服务须要A的数据,如果有几十甚至几百个上游服务,而且会一直变更,再加上还要思考上游服务出错的状况,那么A服务中调用代码的保护会极为艰难 这是因为服务之间耦合度过于严密 2.再来思考用RabbitMQ解耦的状况 A服务只须要向音讯服务器发送音讯,而不必思考谁须要这些数据;上游服务如果须要数据,自行从音讯服务器订阅音讯,不再须要数据时则勾销订阅即可 流量削峰1.假如咱们有一个利用,平时访问量是每秒300申请,咱们用一台服务器即可轻松应答 而在高峰期,访问量霎时翻了十倍,达到每秒3000次申请,那么单台服务器必定无奈应答,这时咱们能够思考减少到10台服务器,来扩散拜访压力 但如果这种刹时顶峰的状况每天只呈现一次,每次只有半小时,那么咱们10台服务器在少数工夫都只分担每秒几十次申请,这样就有点浪费资源了 2.这种状况,咱们就能够应用RabbitMQ来进行流量削峰,顶峰状况下,霎时呈现的大量申请数据,先发送到音讯队列服务器,排队期待被解决,而咱们的利用,能够缓缓的从音讯队列接管申请数据进行解决,这样把数据处理工夫拉长,以加重刹时压力 这是音讯队列服务器十分典型的利用场景 异步调用思考定外卖领取胜利的状况 领取后要发送领取胜利的告诉,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程十分耗时,尤其是高峰期,可能要期待几十秒甚至更长 这样就造成整条调用链路响应十分迟缓 而如果咱们引入RabbitMQ音讯队列,订单数据能够发送到音讯队列服务器,那么调用链路也就能够到此结束,订单零碎则能够立刻失去响应,整条链路的响应工夫只有200毫秒左右 寻找外卖小哥的利用能够以异步的形式从音讯队列接管订单音讯,再执行耗时的寻找操作

September 25, 2020 · 1 min · jiezi

关于rabbitmq:RabbitMQ-01

搭建 RabbitMQ 服务器1.克隆 centos-7-1908 :rabbitmq 2.设置ip ./ip-staticip: 192.168.64.140ifconfigping www.baidu.com3.mobaxterm 连贯 140 服务器 4.上传 Rabbitmq 的离线安装文件 --> 解压缩 rabbitmq-install.zip --> 把 rabbitmq-install 文件夹上传到 /root/5.切换到rabbitmq-install目录 cd rabbitmq-install`装置 rpm -ivh *.rpm6.启动rabbitmq服务器 # 设置服务,开机主动启动systemctl enable rabbitmq-server# 启动服务systemctl start rabbitmq-server7.rabbitmq治理界面启用治理界面 # 开启治理界面插件rabbitmq-plugins enable rabbitmq_management# 防火墙关上 15672 治理端口firewall-cmd --zone=public --add-port=15672/tcp --permanentfirewall-cmd --reload8.重启RabbitMQ服务 systemctl restart rabbitmq-server9.拜访拜访服务器的15672端口,例如:http://192.168.64.140:15672 10.增加用户 # 增加用户rabbitmqctl add_user admin admin# 新用户设置用户为超级管理员rabbitmqctl set_user_tags admin administrator11.凋谢客户端连贯端口 # 关上客户端连贯端口firewall-cmd --zone=public --add-port=5672/tcp --permanentfirewall-cmd --reloa次要端口介绍 4369 – erlang发现口5672 – client端通信口15672 – 治理界面ui端口25672 – server间外部通信口

September 25, 2020 · 1 min · jiezi

关于rabbitmq:深入理解RabbitMQ的前世今生

原文:https://sq.163yun.com/blog/ar...作者:Java大蜗牛 对于RabbitMQ出身:诞生于金融行业的音讯队列语言:Erlang协定:AMQP(Advanced Message Queuing Protocol 高级音讯队列协定)关键词:内存队列,高可用,一条音讯队列构造 Producer/Consumer:生产者消费者Exchange:交换器,能够了解为队列的路由逻辑,交换器次要有三种,图中是Direct交换器Queue:队列Binding:绑定关系,理论是交换器上映射队列的规定发送和生产一条音讯在上图的模式下,交换器的类型为Direct,伪代码示意音讯的生产和生产 音讯生产#音讯发送办法#messageBody 音讯体#exchangeName 交换器名称#routingKey 路由键publishMsg(messageBody,exchangeName,routingKey){ ......}#音讯发送publishMsg("This is a warning log","exchange","log.warning"); RoutingKey=log.warning,和队列A与交换器的绑定统一,所以音讯被路由到了队列A上。 音讯生产对于音讯生产而言,消费者间接指定要生产的队列即可,比方指定生产队列A的数据。 须要留神的是,在消费者生产实现数据后,返回给RabbitMq ACK音讯,RabbitMq会删掉队列中的该条信息。 多种音讯路由模式在Exchange这个模块上,RabbitMq次要反对了Direct,Fanout,Topic三种路由模式,RabbitMq在路由模式上下功夫,也阐明了他在设计上想要满足多样化的需要。 Direct和Fanout模式比拟好了解,相似于单播和播送模式,Topic模式比拟有意思,它反对自定义匹配规定,依照规定把所有满足条件的音讯路由到指定队列,可能帮忙开发者灵便应答各类需要。 音讯的存储RabbitMQ的音讯默认是在内存里的,实际上不光是音讯,Exchange路由等信息理论都在内存中。内存的长处是高性能,问题在于故障后无奈复原。所以RabbitMQ也反对长久化的存储,也就是写磁盘。 要在RabbitMQ中长久化音讯,要同时满足三个条件: 音讯投体时应用长久化投递模式指标交换器是配置为长久化的指标队列是配置为长久化的RabbitMQ长久化音讯的形式是常见的写日志形式: 当一条长久化音讯发送到长久化的Exchange上时,RabbitMQ会在音讯提交到日志文件后,才发送响应。一旦这条音讯被生产后,RabbitMQ会将会把日志中该条音讯标记为期待垃圾收集,之后会从日志中革除。如果呈现故障,主动重建Exchange,Bindings和Queue,同时通过重播长久化日志来复原音讯。音讯长久化的优缺点很显著,领有故障恢复能力的同时,也带来了性能的急剧下降。同时,因为RabbitMQ默认状况下是没有冗余的,假如一个长久化节点解体,统一到该节点复原前,音讯和队列都无奈复原。 音讯投递模式1.发后即忘 RabbitMQ默认公布音讯是不会返回任何后果给生产者的,所以存在发送过程中失落数据的危险。 2.AMQP事务 AMQP事务保障RabbitMQ不仅收到了音讯,并胜利将音讯路由到了所有匹配的订阅队列,AMQP事务将使得生产者和RabbitMQ产生同步。 尽管事务使得生产者能够确定音讯曾经达到RabbitMQ中的对应队列,然而却会升高2~10倍的音讯吞吐量。 3.发送方确认 开启发送方确认模式后,音讯会有一个惟一的ID,一旦音讯被投递给所有匹配的队列后,会回调给发送方应用程序(蕴含音讯的惟一ID),使得生产者晓得音讯曾经平安达到队列了。 如果音讯和队列是配置成了长久化,这个确认音讯只会在队列将音讯写入磁盘后才会返回。如果RabbitMQ外部产生了谬误导致这条音讯失落,那么RabbitMQ会发送一条nack音讯,当然我了解这个是不能保障的。 这种模式因为不存在事务回滚,同时整体依然是一个异步过程,所以更加轻量级,对服务器性能的影响很小。 RabbitMQ RPC个别的异步服务间,可能会用两组队列实现两个服务模块之前的异步通信,乏味的是RabbitMQ就内建了这个性能。 RabbitMQ反对音讯应答性能,每个AMQP音讯头中有一个Reply_to字段,通过该字段指定音讯返回到的队列名称(这是一个公有队列)音讯的生产者能够监听该字段对应的队列。 RabbitMQ集群RabbitMQ集群的设计指标: 容许消费者和生产者在RabbitMQ节点解体的状况下持续运行能过通过增加节点来线性扩大音讯通信吞吐量从理论后果看,RabbitMQ实现设计指标上并不非常杰出,次要起因在于默认的模式下,RabbitMQ的队列实例子只存在在一个节点上(尽管后续也反对了镜像队列),既不能保障该节点解体的状况下队列还能够持续运行,也不能线性扩大该队列的吞吐量。 集群构造RabbitMQ外部的元数据次要有: 队列元数据-队列名称和属性交换器元数据-交换器名称,类型和属性绑定元数据-路由信息尽管RabbitMQ的队列理论只会在一个节点上,但元数据能够存在各个节点上。举个例子来说,当创立一个新的交换器时,RabbitMQ会把该信息同步到所有节点上,这个时候客户端不论连贯的那个RabbitMQ节点,都能够拜访到这个新的交换器,也就能找到交换器下的队列。 如上图所示,队列A的实例理论只在一个RabbitMQ节点上,其它节点理论存储的是只想该队列的指针。 为什么RabbitMQ不在各个节点间做复制了,《RabbitMQ实战》给出了两个起因: 存储老本-RabbitMQ作为内存队列,复制对存储空间的影响,毕竟内存是低廉而无限的性能损耗-公布音讯须要将音讯复制到所有节点,特地是对于长久化队列而言,性能的影响会很大我了解老本这个起因并不齐全成立,复制并不一定要复制到所有节点,比方一个队列能够只做两个正本,复制带来的内存老本能够交给应用方来评估,毕竟在内存中没有沉积的状况下,实际上队列是不会占用多大内存的。 还有一点是RabbitMQ自身并没有保障音讯生产的有序性,所以实际上队列被Partition到各个节点上,这样能力真正达到线性扩容的目标(以RabbitMQ的现状来说,单队列理论是无奈扩容的,只有在业务层做切分)。 注:RabbitMQ集群中的节点能够是内存节点也能够是磁盘节点,但要求至多有一个磁盘节点,这样呈现故障时能力复原数据。 镜像队列镜像队列架构RabbitMQ本人也思考到了咱们之前剖析的单节点长时间故障无奈复原的问题,所以RabbitMQ 2.6.0之后它也反对了镜像队列,换个说法也就是正本。 除了发送音讯,所有的操作理论都在主拷贝上,从拷贝理论只是个冷备(默认的状况下所有RabbitMQ节点上都会有镜像队列的拷贝),如果应用音讯确认模式,RabbitMQ会在主拷贝和从拷贝都平安的承受到音讯时才告诉生产者。 从这个构造上来看,如果从拷贝的节点挂了,理论没有任何影响,如果主拷贝挂了,那么会有一个从新选主的过程,这也是镜像队列的长处,除非所有节点都挂了,才会导致音讯失落。从新选主后,RabbitMQ会给消费者一个消费者勾销告诉(Consumer Cancellation),让消费者重连新的主拷贝。 镜像队列原理1.RabbitMQ构造 AMQPQueue:负责AMQP协定相干的音讯解决,包含接管音讯,投递音讯,Confirm音讯等BackingQueue:提供AMQQueue调用的接口,实现音讯的存储和长久化工作BackingQueue由Q1,Q2,Delta,Q3,Q4五个子队列形成,在Backing中,音讯的生命周期有四个状态: Alpha:音讯的内容和音讯索引都在RAM中。(Q1,Q4)Beta:音讯的内容保留在Disk上,音讯索引保留在RAM中。(Q2,Q3)Gamma:音讯的内容保留在Disk上,音讯索引在DISK和RAM上都有。(Q2,Q3)Delta:音讯内容和索引都在Disk上。(Delta)这里以长久化音讯为例(能够看到非长久化音讯的生命周期会简略很多),从Q1到Q4,音讯理论经验了一个RAM->DISK->RAM这样的过程,BackingQueue这么设计的目标有点相似于Linux的Swap,当队列负载很高时,通过将局部音讯放到磁盘上来节俭内存空间,当负载升高时,音讯又从磁盘回到内存中,让整个队列有很好的弹性。因而触发音讯流动的次要因素是:1.音讯被生产;2.内存不足。 RabbitMQ会更具音讯的传输速度来计算以后内存中容许保留的最大音讯数量(Traget_RAM_Count),当:内存中保留的音讯数量+期待ACK的音讯数量>Target_RAM_Count时,RabbitMQ才会把音讯写到磁盘上,所以说尽管实践上音讯会依照Q1->Q2->Delta->Q3->Q4的程序流动,然而并不是每条音讯都会经验所有的子队列以及对应的生命周期。 从RabbitMQ的Backing Queue构造来看,当外部有余时,音讯要经验多个生命周期,在Disk和RAM之间置换,者理论会升高RabbitMQ的解决性能(后续的流控就是关联的解决办法)。 2.镜像队列构造 所有对镜像队列主拷贝的操作,都会通过Guarented Multicasting(GM)同步到各个Salve节点,Coodinator负责组播后果的确认。 GM是一种牢靠的组播通信协议,保障组组内的存活节点都收到音讯。 GM的主播并不是由Master节点来负责告诉所有Slave的(目标是为了防止Master压力过大,同时防止Master生效导致音讯无奈最终Ack),RabbitMQ把一个镜像队列的所有节点组成一个链表,由主拷贝发动,由主拷贝最终确认告诉到了所有的Slave,而两头由Slave接力的形式进行音讯流传。 从这个构造来看,音讯实现整个镜像队列的同步耗时实践上是不低的,然而因为RabbitMQ音讯的音讯确认自身是异步的模式,所以整体的吞吐量并不会受到太大影响。 流控当RabbitMQ呈现内存(默认是0.4)或者磁盘资源达到阈值时,会触发流控机制,阻塞Producer的Connection,让生产者不能持续发送音讯,直到内存或者磁盘资源失去开释。 RabbitMQ基于Erlang/OTP开发,一个音讯的生命周期中,会波及多个过程间的转发,这些Erlang过程之间不共享内存,每个过程都有本人独立的内存空间,如果没有适合的流控机制,可能会导致某个过程占用内存过大,导致OOM。因而,要保障各个过程占用的内容在一个正当的范畴,RabbitMQ的流控采纳了一种信用证机制(Credit),为每个过程保护了四类键值对: ...

September 19, 2020 · 1 min · jiezi

关于rabbitmq:消息中间件RabbitMQ一-初识RabbitMQ

说到RabbitMQ,就不能不提AMQP协定,因为RabbitMQ就是基于AMQP协定来实现的,咱们先来看看AMQP协定到底是什么 AMQP协定 AMQP,即Advanced Message Queuing Protocol,一个提供对立音讯服务的应用层规范高级音讯队列协定,是应用层协定的一个凋谢规范,为面向音讯的中间件设计。基于此协定的客户端与消息中间件可传递音讯,并不受客户端/中间件不同产品,不同的开发语言等条件的限度。更多的对于AMQP的信息,大家能够去百度百科查看RabbitMQ次要组件 1.生产者(Producer):音讯生产者,生产音讯投递到交换机中的客户端 2.消费者(Consumer):音讯消费者,从队列中获取音讯生产的客户端 3.音讯(Message):蕴含有效载荷和标签,有效载荷指要传输的数据,标签形容了有效载荷,并且rabbitmq用它来决定谁取得音讯 4.信道(Channel):多路复用连贯中的一条独立的双向数据流通道 5.交换器(Exchange):服务器中的实体,用来接管生产者发送的音讯并将这些音讯路由给服务器中的队列 6.队列(Queue):一个命名实体,用来保留音讯直到发送给消费者 7.路由键(Routing Key):一个音讯头,交换器能够用这个音讯头决定如何路由某条音讯8.虚拟主机(Virtual Host):一批交换器、音讯队列和相干对象。虚拟主机是共享雷同的身份认证和加密环境的独立服务器域。客户端应用程序在登录到服务器之后,能够抉择一个虚拟主机 RabbitMQ简略架构模型 队列通过路由键绑定到交换器,当生产者生产音讯后,将音讯投递到交换器中,交换器依据绑定的路由键,将音讯路由到对应的队列,而后由订阅该队列的消费者接管音讯并生产 对于RabbitMQ的装置,网上资源很多,这里就不再多说。RabbitMQ的依赖较多,能够应用Dokcer去装置

August 13, 2020 · 1 min · jiezi

关于rabbitmq:RabbitMQ什么是死信队列

一 什么是死信队列当一条音讯在队列中呈现以下三种状况的时候,该音讯就会变成一条死信。 音讯被回绝(basic.reject / basic.nack),并且requeue = false音讯TTL过期队列达到最大长度当音讯在一个队列中变成一个死信之后,如果配置了死信队列,它将被从新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。 二 实现死信队列2.1 原理图 2.2 创立消费者创立一个消费者,绑定生产队列及死信交换机,交换机默认为direct模型,死信交换机也是,arguments绑定死信交换机和key。(注解反对的具体参数文末会附上) public class DirectConsumer { @RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "javatrip",arguments = {@Argument(name="x-dead-letter-exchange",value = "deadExchange"), @Argument(name="x-dead-letter-routing-key",value = "deadKey") }), exchange = @Exchange(value="javatripDirect"), key = {"info","error","warning"} ) })public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{ System.out.println("消费者1"+message);}2.3 创立生产者public void publishMessage(String message){ rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatripDirect","info",message);}三 造成死信的三种状况3.1 回绝音讯,并且禁止从新入队设置yml为手动签收模式spring: rabbitmq: listener: simple: acknowledge-mode: manual设置回绝音讯并禁止从新入队Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicNack(deliverTag,false,false);绑定死信队列@RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "javatripDead"), exchange = @Exchange(value = "deadExchange"), key = "deadKey" )})public void receive2(String message){ System.out.println("我是一条死信:"+message);}3.2 音讯TTL过期绑定业务队列的时候,减少音讯的过期时长,当音讯过期后,音讯将被转发到死信队列中。 ...

August 7, 2020 · 2 min · jiezi

关于rabbitmq:RabbitMQ如何保证消息不被重复消费

一 反复音讯为什么会呈现音讯反复?音讯反复的起因有两个:1.生产时音讯反复,2.生产时音讯反复。 1.1 生产时音讯反复因为生产者发送音讯给MQ,在MQ确认的时候呈现了网络稳定,生产者没有收到确认,实际上MQ曾经接管到了音讯。这时候生产者就会从新发送一遍这条音讯。 生产者中如果音讯未被确认,或确认失败,咱们能够应用定时工作+(redis/db)来进行音讯重试。 @Component@Slf4Jpublic class SendMessage { @Autowired private MessageService messageService; @Autowired private RabbitTemplate rabbitTemplate; // 最大投递次数 private static final int MAX_TRY_COUNT = 3; /** * 每30s拉取投递失败的音讯, 从新投递 */ @Scheduled(cron = "0/30 * * * * ?") public void resend() { log.info("开始执行定时工作(从新投递音讯)"); List<MsgLog> msgLogs = messageService.selectTimeoutMsg(); msgLogs.forEach(msgLog -> { String msgId = msgLog.getMsgId(); if (msgLog.getTryCount() >= MAX_TRY_COUNT) { messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL); log.info("超过最大重试次数, 音讯投递失败, msgId: {}", msgId); } else { messageService.updateTryCount(msgId, msgLog.getNextTryTime());// 投递次数+1 CorrelationData correlationData = new CorrelationData(msgId); rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 从新投递 log.info("第 " + (msgLog.getTryCount() + 1) + " 次从新投递音讯"); } }); log.info("定时工作执行完结(从新投递音讯)"); }}1.2生产时音讯反复消费者生产胜利后,再给MQ确认的时候呈现了网络稳定,MQ没有接管到确认,为了保障音讯被生产,MQ就会持续给消费者投递之前的音讯。这时候消费者就接管到了两条一样的音讯。 ...

August 6, 2020 · 2 min · jiezi

关于rabbitmq:RabbitMQ如何保证消息的可靠性

一条生产胜利被生产经验了生产者->MQ->消费者,因而在这三个步骤中都有可能造成音讯失落。 一 音讯生产者没有把音讯胜利发送到MQ1.1 事务机制AMQP协定提供了事务机制,在投递音讯时开启事务反对,如果音讯投递失败,则回滚事务。 自定义事务管理器 @Configurationpublic class RabbitTranscation { @Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ return new RabbitTemplate(connectionFactory); }}批改yml spring: rabbitmq: # 音讯在未被队列收到的状况下返回 publisher-returns: true开启事务反对 rabbitTemplate.setChannelTransacted(true);音讯未接管时调用ReturnCallback rabbitTemplate.setMandatory(true);生产者投递音讯 @Servicepublic class ProviderTranscation implements RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ // 设置channel开启事务 rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("这条音讯发送失败了"+message+",请解决"); } @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager") public void publishMessage(String message) throws Exception { rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatrip",message); }}然而,很少有人这么干,因为这是同步操作,一条音讯发送之后会使发送端阻塞,以期待RabbitMQ-Server的回应,之后能力持续发送下一条音讯,生产者生产音讯的吞吐量和性能都会大大降低。 ...

August 5, 2020 · 2 min · jiezi

关于rabbitmq:操作rabbitMQ时误删guest账户无法登录

1,以管理员身份关上cmd2,进入rabbitMQ安装文件件sbin目录下3,设置账户和明码rabbitmqctl.bat add_user guest guest4,设置为管理员rabbitmqctl.bat set_user_tags guest administrator5,设置权限rabbitmqctl set_permissions -p /develop guest "." "." ".*"

July 24, 2020 · 1 min · jiezi

关于rabbitmq:深入浅出-RabbitMQ

什么是 RabbitMQ简介(长处)基于 ErLang 语言开发有高可用高并发的长处,适宜集群。开源、稳固、易用、跨平台、反对多种语言、文档齐全。有音讯确认机制和长久化机制,可靠性高。概念生产者和消费者Producer:音讯的生产者Consumer:音讯的消费者Queue音讯队列提供了 FIFO 的解决机制,具备缓存音讯的能力。在 RabbitMQ 中,队列音讯能够设置为长久化,长期或者主动删除。如果是长久化的队列,Queue 中的音讯会在 Server 本地硬盘存储一份,避免零碎 Crash 数据失落。如果是长期的队列,Queue 中的数据在零碎重启之后就会失落。如实是主动删除的队列,当不存在用户连贯到 Server,队列中的数据会被主动删除。ExChangeExChange 相似于数据通信网络中的交换机,提供音讯路由策略。 在 RabbitMQ 中,生产者不是将音讯间接发送给 Queue,而是先发送给 ExChange,ExChange 依据生产者传递的 key 依照特定的路由算法将音讯给指定的 Queue。一个 ExChange 能够绑定多个 Queue。和 Queue 一样,ExChange 也能够设置为长久化、长期或者主动删除。 Binding所谓绑定就是将一个特定的 ExChange 和一个特定的 Queue 绑定起来。ExChange 和 Queue 的绑定能够是多对多的关系。 Virtual Host在 RabbitMQ Server上能够创立多个虚构的 Message Broker(又叫做 Virtual Hosts)。每一个 vhost 实质上是一个迷你的 RabbitMQ Server,别离治理各自的 ExChange 和 binding。生产者和消费者连贯 RabbitMQ Server 须要指定一个 Virtual Host。 应用过程客户端连贯到音讯队列服务器,关上一个 Channel。客户端申明一个 ExChange,并设置相干属性。客户端申明一个 Queue,并设置相干属性。客户端应用 Routing Key,在 ExChange 和 Queue 之间建设好绑定关系。客户端投递音讯到 ExChange。ExChange 接管到音讯后,就依据音讯的 key 和曾经设置的 bingding,进行音讯路由,将音讯投递到一个或多个队列里。部署 RabbitMQ应用 Docker Compose 部署创立 docker-compose.yml ...

July 18, 2020 · 2 min · jiezi

关于rabbitmq:RabbitMQ的开发应用

1.介绍RabbitMQ 是一个由erlang语言编写的、开源的、在AMQP根底上残缺的、可复用的企业音讯零碎。反对多种语言,包含java、Python、ruby、PHP、C/C++等。 1.1.AMQP模型AMQP:advanced message queuing protocol ,一个提供对立音讯服务的应用层规范高级音讯队列协定,是应用层协定的一个凋谢规范,为面向音讯的中间件设计。基于此协定的客户端与消息中间件可传递音讯并不受客户端/中间件不同产品、不同开发语言等条件的限度。 AMQP模型图 1.1.1.工作过程发布者(Publisher)公布音讯(Message),经由交换机(Exchange)。 交换机依据路由规定将收到的音讯分发给与该交换机绑定的队列(Queue)。 最初 AMQP 代理会将音讯投递给订阅了此队列的消费者,或者消费者依照需要自行获取。 1、发布者、交换机、队列、消费者都能够有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,音讯代理 能够别离存在于不同的设施上。 2、发布者公布音讯时能够给音讯指定各种音讯属性(Message Meta-data)。有些属性有可能会被音讯代理(Brokers)应用,然而其余的属性则是齐全不通明的,它们只能被接管音讯的利用所应用。 3、从平安角度思考,网络是不牢靠的,又或是消费者在解决音讯的过程中意外挂掉,这样没有解决胜利的音讯就会失落。基于此起因,AMQP 模块蕴含了一个音讯确认(Message Acknowledgements)机制:当一个音讯从队列中投递给消费者后,不会立刻从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才齐全从队列中删除。 4、在某些状况下,例如当一个音讯无奈被胜利路由时(无奈从交换机散发到队列),音讯或者会被返回给发布者并被抛弃。或者,如果音讯代理执行了延期操作,音讯会被放入一个所谓的死信队列中。此时,音讯发布者能够抉择某些参数来解决这些非凡状况。 1.1.2.Exchange交换机交换机是用来发送音讯的 AMQP 实体。交换机拿到一个音讯之后将它路由给一个或零个队列。它应用哪种路由算法是由交换机类型和绑定(Bindings)规定所决定的。常见的交换机有如下几种: direct 直连交换机:Routing Key==Binding Key,严格匹配。fanout 扇形交换机:把发送到该 Exchange 的音讯路由到所有与它绑定的 Queue 中。topic 主题交换机:Routing Key==Binding Key,含糊匹配。headers 头交换机:依据发送的音讯内容中的 headers 属性进行匹配。 具体无关这五种交换机的阐明和用法,后续会有章节具体介绍。1.1.3.Queue队列AMQP 中的队列(queue)跟其余音讯队列或工作队列中的队列是很类似的:它们存储着行将被利用生产掉的音讯。队列跟交换机共享某些属性,然而队列也有一些另外的属性。 Durable(音讯代理重启后,队列仍旧存在)Exclusive(只被一个连贯(connection)应用,而且当连贯敞开后队列即被删除)Auto-delete(当最初一个消费者退订后即被删除)Arguments(一些音讯代理用他来实现相似与 TTL 的某些额定性能)1.2.rabbitmq和kafka比照rabbitmq遵循AMQP协定,用在实时的对可靠性要求比拟高的消息传递上。kafka次要用于解决沉闷的流式数据,大数据量的数据处理上。次要体现在: 1.2.1.架构rabbitmq:RabbitMQ遵循AMQP协定,RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了音讯的路由键;客户端Producer通过连贯channel和server进行通信,Consumer从queue获取音讯进行生产(长连贯,queue有音讯会推送到consumer端,consumer循环从输出流读取数据)。rabbitMQ以broker为核心。kafka:kafka听从个别的MQ构造,producer,broker,consumer,以consumer为核心,音讯的生产信息保留的客户端consumer上,consumer依据生产的点,从broker上批量pull数据。1.2.2.音讯确认rabbitmq:有音讯确认机制。kafka:无音讯确认机制。1.2.3.吞吐量rabbitmq:rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不一样,rabbitMQ反对对音讯的牢靠的传递,反对事务,不反对批量的操作;基于存储的可靠性的要求存储能够采纳内存或者硬盘。kafka:kafka具备高的吞吐量,外部采纳音讯的批量解决,zero-copy机制,数据的存储和获取是本地磁盘程序批量操作,具备O(1)的复杂度,音讯解决的效率很高。 (备注:kafka零拷贝,通过sendfile形式。(1)一般数据读取:磁盘->内核缓冲区(页缓存 PageCache)->用户缓冲区->内核缓冲区->网卡输入;(2)kafka的数据读取:磁盘->内核缓冲区(页缓存 PageCache)->网卡输入。1.2.4.可用性rabbitmq:(1)一般集群:在多台机器上启动多个rabbitmq实例,每个机器启动一个。然而你创立的queue,只会放在一个rabbtimq实例上,然而每个实例都同步queue的元数据。完了你生产的时候,实际上如果连贯到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过去。(2)镜像集群:跟一般集群模式不一样的是,你创立的queue,无论元数据还是queue里的音讯都会存在于多个实例上,而后每次你写音讯到queue的时候,都会主动把音讯到多个实例的queue里进行音讯同步。这样的话,益处在于,一个机器宕机了,没事儿,别的机器都能够用。害处在于,第一,这个性能开销太大了,音讯同步所有机器,导致网络带宽压力和耗费很重。第二,这么玩儿,就没有扩展性可言了,如果某个queue负载很重,你加机器,新增的机器也蕴含了这个queue的所有数据,并没有方法线性扩大你的queuekafka:kafka是由多个broker组成,每个broker是一个节点;每创立一个topic,这个topic能够划分为多个partition,每个partition能够存在于不同的broker上,每个partition就放一部分数据。这就是人造的分布式音讯队列,就是说一个topic的数据,是扩散放在多个机器上的,每个机器就放一部分数据。每个partition的数据都会同步到其余机器上,造成本人的多个replica正本,而后所有replica会选举一个leader进去,主从构造。1.2.5.集群负载平衡rabbitmq:rabbitMQ的负载平衡须要独自的loadbalancer进行反对,如HAProxy和Keepalived等。kafka:kafka采纳zookeeper对集群中的broker、consumer进行治理,能够注册topic到zookeeper上;通过zookeeper的协调机制,producer保留对应topic的broker信息,能够随机或者轮询发送到broker上;并且producer能够基于语义指定分片,音讯发送到broker的某分片上。2.构造2.1.交换机模式RabbitMQ罕用的Exchange Type有fanout、direct、topic、headers这四种。 2.1.1.Direct Exchangedirect类型的Exchange路由规定很简略,它会把音讯路由到那些binding key与routing key齐全匹配的Queue中。 2.1.2.Topic Exchange后面讲到direct类型的Exchange路由规定是齐全匹配binding key与routing key,但这种严格的匹配形式在很多状况下不能满足理论业务需要。topic类型的Exchange与direct类型的Exchage类似,也是将音讯路由到binding key与routing key相匹配的Queue中,但反对含糊匹配: routing key为一个句点号“. ”分隔的字符串(咱们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”binding key与routing key一样也是句点号“. ”分隔的字符串binding key中能够存在两种特殊字符"*"与“#”,用于做含糊匹配,其中" * "用于匹配一个单词,“#”用于匹配多个单词(能够是零个)2.1.3.Fanout Exchangefanout类型的Exchange路由规定非常简单,它会把所有发送到fanout Exchange的音讯都会被转发到与该Exchange 绑定(Binding)的所有Queue上。 Fanout Exchange 不须要解决RouteKey 。只须要简略的将队列绑定到exchange 上。这样发送到exchange的音讯都会被转发到与该交换机绑定的所有队列上。相似子网播送,每台子网内的主机都取得了一份复制的音讯。所以,Fanout Exchange 转发音讯是最快的。 ...

July 17, 2020 · 5 min · jiezi

复制粘贴就完事的Linux-安装RabbitMQ

1.linux装置四个依赖 yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel httpd python-simplejson2.下载erlang的tar.gz包,如我下载的otp_src_19.3.tar.gz,https://www.erlang.org/downlo... 3.上传到opt目录下,解压文件 tar -zxvf otp_src_19.3.tar.gz4.执行查看命令 cd otp_src_19.3./configure --prefix=/opt/erlang5.执行编译装置命令 make && make install6.增加环境变量 vi /etc/profileERLANG_HOME=/opt/erlangexport PATH=$PATH:$ERLANG_HOME/binexport ERLANG_HOME7.配置文件失效 source /etc/profile8.验证是否装置胜利 erl9.下载rabbitmq的tar.xz包,如我下载的rabbitmq-server-generic-unix-3.6.10.tar.xz,https://www.rabbitmq.com/rele... 10.上传到opt目录下,解压文件 xz -d rabbitmq-server-generic-unix-3.6.10.tar.xztar -xvf rabbitmq-server-generic-unix-3.6.10.tar11.重命名,此步骤可跳过 mv rabbitmq_server-3.6.10/ rabbitmq12.增加环境变量 vi /etc/profileexport PATH=$PATH:/opt/rabbitmq/sbinexport RABBITMQ_HOME=/opt/rabbitmq13.配置文件失效 source /etc/profile14.启动rabbitmq rabbitmq-server -detached

July 13, 2020 · 1 min · jiezi

rabbitMQ

作者太懒了,总感觉做进去就会了,看过也会了,没有做笔记,前面找工夫补上 RabbitMQ基础知识解说RabbitMQ的装置RabbitMQ工作模式 - 发送端和接收端封装 RabbitMQ - 五种工作模式 - 简略模式和work工作模式RabbitMQ - 五种工作模式 - 订阅模式RabbitMQ - 五种工作模式 - 路由模式RabbitMQ - 五种工作模式 - 主题模式 RabbitMQ - 音讯长久化和手动应答RabbitMQ - 死信队列

July 10, 2020 · 1 min · jiezi

RabbitMQ跨机房迁移数据零丢失

一、背景介绍 公司以前大部分服务在私有云上,因使用有一段时间了,机器比较老化再加上运维成本高,计划将整个机房上云,因负责中间件一块,所以最近将RabbitMQ顺利地迁移到云上。 先说下大概部署结构,为 保证高可用,在私有云上部署了3台Broker,应用在配置文件中直接配置3个IP,每次请求时由客户端随机选择1台。 本次迁移的目标是: 1、零数据丢失,但不保证消息不重复消费; 2、不出现整个MQ集群长时间(2分钟以上)不可用; 二、方案分析 关于数据丢失,我们先要知道RabbitMQ中有哪些数据: Exchange、Queue、Message。 关于Exchange和Queue,可以设置在不存在的时候创建,但这样难以管控,所以一般我们都是在后台由管理员创建Exchange和Queue,并设置好相应属性,一般来说都要设置为持久化,即durable为true,这样保证在Broker重启时Exchange和Queue还存在。 Message的持久化则要在发送的时候设置相应的参数,如果用的amqp-client这个包,则代码如下: channel.basicPublish(exchange, routingKey, basicProperties, payload);其中为basicProperties为消息属性,类型为AMQP.BasicProperties public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties { private String contentType; private String contentEncoding; private Map<String,Object> headers; private Integer deliveryMode; private Integer priority; private String correlationId; private String replyTo; private String expiration; private String messageId; private Date timestamp; private String type; private String userId; private String appId; private String clusterId;关注deliveryMode属性,要设为2消息才会持久化。 ...

June 26, 2020 · 1 min · jiezi

RabbitMQ实践经验

虚拟主机可以将一台RabbitMQ服务器服务多个不同的应用,应用间通过不同虚拟主机的划分提供了消息逻辑上的独立。 #创建虚拟主机rabbitmqctl add_vhost test#删除虚拟主机rabbitmqctl delete_vhost test#查询当前RabbitMQ服务器上所有虚拟机rabbitmqctl list_vhosts消息保存RabbitMQ对Queue中消息的保存方式有disk和RAM两种。采用disk方式消息数据会被保存到以.rdq后缀命名的文件中。绝大部分情况下,对消息相关数据保存采用disk方式,如果有其他高可用手段,也可选用RAM。消息持久化涉及Queue、Message、Exchange三部分。1.Queue持久化通过设置durable为true来实现。 com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException;/*** exclusive表示是否为排他队列,为true表示为排他队列* autoDelete表示是否自动删除,为true表示队列会在没有任何订阅消费者时被自动删除。*/com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;void queueDeclareNoWait(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclarePassive(String var1) throws IOException;2.Message持久化如果想要重启后Queue里面没有发送的消息也继续存在,需要设置消息持久化。 /*** BasicProperties可以使用PERSISTENT_TEXT_PLAIN表示发送的是需要持久化的消息,其实也就是将BasicProperties中的deliveryMode设置为2*/void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;void basicPublish(String var1, String var2, boolean var3, BasicProperties var4, byte[] var5) throws IOException;void basicPublish(String var1, String var2, boolean var3, boolean var4, BasicProperties var5, byte[] var6) throws IOException;3.Exchange持久化在声明Exchange时使用支持durable入参的方法,将其设置为true。 ...

June 22, 2020 · 1 min · jiezi

基于RabbitMQ的消息推送

基于RabbitMQ的消息推送H5定义了WebSocket,能实现浏览器和服务器之间全双工通信,服务器可以推送数据给客户端浏览器。针对WebSocket通信,RabbitMQ需要提供Web STOMP插件,这样浏览器就可以使用RabbitMQ。前端需要引入一个stomp.js文件,可到GitHub上下载该文件,https://github.com/jmesnil/stomp-websocket,js文件地址:https://github.com/jmesnil/stomp-websocket/blob/master/lib/stomp.js

June 22, 2020 · 1 min · jiezi

RabbitMQ入门6死信和备份交换机

1. 死信交换机(Dead-Letter-Exchange) 当消息在一个队列中由于过期,被拒绝等原因变成死信(dead message)之后,它能被重新发送到一个交换机中这个交换机就是死信交换机,绑定死信交换机的队列就称之为死信叫交换机2. 判断一个消息是否是死信消息的依据:消息被拒绝 (msg.Reject) 并且设置requeue值设置为false消息过期后,消息过期时间设置主要有两种方式 设置队列的过期时间,这样改队列所有的消息都存在相同的过期时间 在队列申明的时候使用 x-message-ttl参数 单位为:毫秒单独设置某个消息的过期时间,每条消息的过期时间不一样(设置消息属性的 exporation 参数的值,单位为毫秒如果同时使用了两种方式设置过期四件,以两者之间较小的那个数值为准;队列已满,无法再添加消息到mq中 申明队列的时候设置 x-dead-letter-exchange参数备份交换机 未被正确路由的消息将会结果此交换机 申明交换机的时候设置 alternate-exchange 参数

June 20, 2020 · 1 min · jiezi

RabbitMQ功能实现1-红包未领取退回

生产者代码:package mainimport ( uuid "github.com/satori/go.uuid" "github.com/streadway/amqp" "github.com/wonderivan/logger" "rmq/db/rmq" "time")const ( DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机 DeadLettersQueueName = "dlx_queue_packet" // 死信队列 QueueName = "queue_packet" // 目标队列 ExchangeName = "exchange_packet" // 目标交换机)var ( ch *amqp.Channel err error conn *amqp.Connection queue amqp.Queue dlxQueue amqp.Queue)func main() { if conn, err = rmq.GetConn(); err != nil { logger.Error("连接RabbitMQ服务器失败:%s", err.Error()) return } defer conn.Close() if ch, err = conn.Channel(); err != nil { logger.Error("获取Channel失败:%s", err.Error()) return } defer ch.Close() // 声明队列交换机 if err = ch.ExchangeDeclare(ExchangeName, amqp.ExchangeFanout, true, false, false, false, nil); err != nil { logger.Error("声明业务交换机失败:%s", err.Error()) return } // 创建死信交换机 if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil { logger.Error("创建死信交换机:%s", err.Error()) return } // 创建死信队列 if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil { logger.Error("创建死信队列失败:%s", err.Error()) return } // 创建业务队列 if queue, err = ch.QueueDeclare(QueueName, true, false, false, false, amqp.Table{ "x-message-ttl": 6000, // 消息过期时间 毫秒 "x-dead-letter-exchange": DeadLettersExchangeName, // 死信交换机 // "x-dead-letter-routing-key": "dlxKey", // 死信路由key }); err != nil { logger.Warn("创建业务队列失败:%s", err.Error()) return } // 业务队列绑定交换机 if err = ch.QueueBind(queue.Name, "", ExchangeName, false, nil); err != nil { logger.Error("绑定业务交换机失败:%s", err.Error()) return } // 死信队列绑定死信交换机 if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil { logger.Error("绑定死信交换机失败:%s", err.Error()) } for i := 1; i <= 10; i++ { msg := amqp.Publishing{ MessageId: uuid.NewV4().String(), ContentType: "text/plain", Body: []byte("红包退回"), } // 发布消息 err = ch.Publish( ExchangeName, "", false, false, msg, ) if err != nil { logger.Error("发送失败: %s", err.Error()) return } else { logger.Info("发送成功:%s", msg.MessageId) } }}消费者代码package mainimport ( uuid "github.com/satori/go.uuid" "github.com/streadway/amqp" "github.com/wonderivan/logger" "rmq/db/rmq" "time")const ( DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机 DeadLettersQueueName = "dlx_queue_packet" // 死信队列 QueueName = "queue_packet" // 目标队列 ExchangeName = "exchange_packet" // 目标交换机)var ( ch *amqp.Channel err error conn *amqp.Connection queue amqp.Queue dlxQueue amqp.Queue)func main() { if conn, err = rmq.GetConn(); err != nil { logger.Error("连接RabbitMQ服务器失败:%s", err.Error()) return } defer conn.Close() if ch, err = conn.Channel(); err != nil { logger.Error("获取Channel失败:%s", err.Error()) return } defer ch.Close() // 创建死信交换机 if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil { logger.Error("创建死信交换机:%s", err.Error()) return } // 创建死信队列 if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil { logger.Error("创建死信队列失败:%s", err.Error()) return } // 死信队列绑定死信交换机 if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil { logger.Error("绑定死信交换机失败:%s", err.Error()) } msgList, err := ch.Consume(dlxQueue.Name, "", false, false, false, false, nil) if err != nil { logger.Error("消费者监听失败:%s", err.Error()) return } for { select { case message, ok := <-msgList: if !ok { continue } go func(msg amqp.Delivery) { logger.Info("messageID: %s", msg.MessageId) logger.Info("messageBody: %s", msg.Body) if err = msg.Ack(false); err != nil { logger.Error("确认消息失败") } }(message) case <-time.After(time.Second): } }}

June 20, 2020 · 3 min · jiezi

RabbitMQ入门4工作模式和交换机类型

工作模式1. 简单队列模式(simple queue)只包含一个生产者和一个消费者 生产者将消息发送到队列中 消费者从队列中接收消息2. 工作队列模式(work Queues)一个生产者对应多个消费者,一条消息只被一个消费者进行消费工作队列有轮询分发和公平分发两种模式2.1平均分配公平分配,每次只给一个消费者分配一个ch.Qos(1, 0, false)3. 发布-订阅模式(Publish/SubScribe)一个生产者,多个消费者每个消费者都有自己的消息队列,分别绑定到不同的队列上生产者没有把消息发送到队列,而是发送到交换机上每个队列都需要绑定到交换机上生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费如果消息发送到没有队列绑定的交换机时,消息将会消失,因为交换机没有存储消息的能力,只有队列才有存储消息的能力;4. 路由模式(routing)生产者将消息发送到direct交换机,它会吧消息路由到那些binding key 与 routing key 完全匹配的queue中这样就能实现消费者有选择的去消费消息5. 主题模式(Tipic)交换机通过模式匹配分配消息的路由键属性将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上,它将路由键和绑定的字符串切分为单词,这些单词之间用点隔开(use.news)同样也会识别两个通配符, "#" 和 "*"# 匹配0个或者多个单词, * 匹配多个单词binding keyrouting keylogger.#logger.errorlogger.infologger.debug#.infologger.info交换机类型参数名类型解释direct直连交换机fanout扇型交换机topic主题交换机headers头交换机直连交换机是根据消息携带的路由键(routing key) 将消息投递到对应的队列,步骤如下:将一个队列绑定到某个交换机上,同时赋予绑定一个路由键(routing key);当一个携带着路由键值为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列扇形交换机将消息路由给绑定到它身上的所有队列,不同于直连交换机,路由键在此类型上不启任务作用如果N个队列绑定到某个扇形交换机上,当有消息发送给此扇形交换机时,交换机会将此消息发送给这所有的N个队列主题交换机队列通过路由键绑定到交换机上,然后交换机根据消息里面的路由值,将消息路由给一个或多个绑定的队列扇形交换机和主题交换机异同对于扇形交换机 路由键是没有意义的,只要有消息,它都发送消息到它所绑定的所有队列上对于主题交换机 路由规则由路由键决定,只有满足路由键的规则,消息才可以路由到对应的队列上头交换机类似主题交换机,但是头交换机所有多个消息属性来替代路由键建立路由规则,通过判断消息头的值能否与指定的绑定相匹配来确立路由规则此交换机有个重要的参数: x-match当x-match为any时: 消息头的任意一个值被匹配就可以满足条件当x-match为all的时候,就需要消息头的所有值都匹配成功

June 20, 2020 · 1 min · jiezi

RabbitMQ入门3api参数

ch.QueueDeclare queue, err = ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments )参数名参数类型解释namestring队列名称durablebool是否持久化,队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库autoDeletebool是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除exclusivebool是否排外的,有两个作用,1:当连接关闭时该队列是否会自动删除;2:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常;一般等于true的话用于一个队列只能有一个消费者来消费的场景no-waitbool是否等待服务器返回argumentsmap[string]interface{}设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。ch.Publishch.Publish( "", // exchange "hello", // routing key false, // mandatory false, // immediate body, // msg) 参数名参数类型解释exchangestring交换机routing keystring路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用mandatorybooltrue:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉immediatebooltrue:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。msg 消息内容ch.Consumech.Consume( "hello", // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args)参数名参数类型解释queuestring consumerstring auto-ackbool是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答exclusivebool no-localbool no-waitbool是否等待服务器返回args ch.ExchangeDeclarech.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments)参数名参数类型解释namestring typestring交换机类型: direct fanout topic headers其中一种durablebool是否持久化,durable设置为true表示持久化,反之是非持久化,持久化的可以将交换器存盘,在服务器重启的时候不会丢失信息auto-deletedbool是否自动删除,设置为TRUE则表是自动删除,自删除的前提是至少有一个队列或者交换器与这交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,一般都设置为faseinternalbool是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式no-waitbool是否等待服务器返回arguments 其它一些结构化参数比如alternate-exchange

June 20, 2020 · 1 min · jiezi

RabbitMQ入门1下载安装

安装erlangrabbitMQ是erlang语言开发的,所以安装的的时候需要erlang环境yum -y install erlang 测试erlang安装完毕erl -version下载地址http://erlang.org/download/ 下载到本地wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-generic-unix-3.8.5.tar.xz解压tar -xvf rabbitmq-server-generic-unix-3.8.5.tar.xz移动目录mv rabbitmq_server-3.8.5 /opt/rabbitmq添加环境变量vim ~/.bash_profileexport RABBITMQ=/opt/rabbitmqPATH=$PATH:$HOME/bin:$RABBITMQ/sbin使环境变量生效source ~/.bash_profile查看环境变量echo $RABBITMQ后台启动rabbitmqrabbitmq-server -detached错误Crash dump was written to: erl_crash.dumpinit terminating in do_boot ()该错误是erlang和rabbitmq不符合 只能重新安装erlang对应版本查询: https://www.rabbitmq.com/whic... 编译安装erlang卸载之前安装的erlangyum remove erlangyum list installed | grep erlang-ertsyum remove erlang-erts.x86_64下载wget http://erlang.org/download/otp_src_21.3.tar.gz解压tar -zxvf otp_src_21.3.tar.gz 进入目录cd otp_src_21.3生成makefile./configure --prefix=/opt/erlang编译安装make make install配置环境变量vim ~/.bash_profile# 使环境变量生效source ~/.bash_profile查看是否安装成功erl -version

June 20, 2020 · 1 min · jiezi

RabbitMQ入门2工作原理和基本操作

组成部分:名称解释Broker消息队列服务进程,该进程包含2个部分,Exchange和QueueExchange消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤!Queue存储消息的队列,消息到达队列并转发给消费方Producer消息生产者,即生产方客户端,生产方客户端将消息发送到MQConsumer消息消费者,消费方客户端,接收MQ转发的消息消息发布流程:生产者和Broker建立TCP连接生产者和Broker建立通道.生产者通过通道把消息发送给Broker,由Exchange将消息转发.Exchange将消息发送给指定的Queue(队列)消息接收流程:消费者和Broker建立TCP连接消费者和Broker建立通道消费者监听指定的Queue当有消息到达Queue时Broker默认将消息推给消费者消费者接收到消息.基本操作后台启动rabbitmqrabbitmq-server -detached查看单节点状态rabbitmqctl status查看日志cat $RABBITMQ/var/log/rabbitmq/rabbit@$HOSTNAME.log查看集群状态rabbitmqctl cluster_status新增用户rabbitmqctl add_user lee lee新增授权rabbitmqctl set_permissions -p / lee ".*" ".*" ".*"设置管理者权限rabbitmqctl set_user_tags lee administrator启动web管理插件rabbitmq-plugins enable rabbitmq_management打开 http://ip:15672/ 即可打开web管理界面

June 20, 2020 · 1 min · jiezi

一文带你深入理解消息中间件技术RabbitMQ服务

什么叫消息队列?消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。 为何用消息队列?消息队列是一种应用间的异步协作机制,那什么时候需要使用消息队列呢?像用户下单之后、生成订单、结算,定时给系统注册用户推送活动消息,一些常见的流程类的业务都会用到消息队列服务。 一、RabbitMQ简介RabbitMQ是一个消息的代理器,用于接收和发送消息,你可以这样想,他就是一个邮局,当您把需要寄送的邮件投递到邮筒之时,你可以确定的是邮递员先生肯定会把邮件发送到需要接收邮件的人的手里,不会送错的。在这个比喻中,RabbitMQ就是一个邮箱,也可以理解为邮局和邮递员,他们负责把消息发送出去和用于接收信息。 RabbitMQ和邮局这两者之间的主要区别是它不会处理纸质邮件,取而代之的是接收、存储和发送二进制数据块,也就是我们通常所说的消息。 二 、RabbitMQ基本概念下图是RabbitMQ服务的内部结构 1)Message 消息,它由消息头和消息体两部分组成。消息体是不透明的,但消息头是由一些属性组成的,其中包括:routing-key(路由键)、priority(优先权)、delivery-mode(持久存储)。2)Publisher 生产者,也是消息的生产者,它是向交换器发布消息的应用程序3)Exchange 交换器,用来接收生产者传递过来的消息,然后将这些消息路由至服务器中的队列4)Binding 绑定,用于消息队列与交换器之间的沟通。也是消息路由的规则,相当于一个路由表。5)Queue 消息队列,用来保存消息直到发送给消费者。一个消息可以进入一个或多个队列,除消费者取走消息,否则它一直在消息队列里。6)Connection 网络连接,如:一个TCP连接7)Channel 信道,多路复用连接中一个独立的双向数据传输通道。无论是发布消息、订阅队列、接收消息都是通过信道来完成。复用信道是为了降低系统资源的消耗。8)Consumer 消费者,也就是接收生产者发来的消息的客户端应用。9)Virtual Host 虚拟主机,交换器、消息队列相关的对象。一个VHOST其实可以看成一个rabbitmp服务器,它拥有自己的队列、交换器、绑定与权限机制等。Rabbitmq默认vhost是/。三、RabbitMQ 特点RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。 RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 具体特点包括:1)可靠性(Reliability RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 2)灵活的路由(Flexible Routing) 在消息进入队列之前,通过Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的Exchange 来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的 Exchange 。 3)消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 4)高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 5)多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 6)多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 7)管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 8)跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 9)插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。参考文章 http://www.rabbitmq.com/四、Rabbitmq的工作过程1)客户端连接到消息队列服务器,开启一个channel2)客户端声明一个exchange、queue,并配置相关属性3)客户端使用routing key,在exchange与queue之间建立好绑定关系4)客户端传递消息到交换器5)交换器接收到消息后,根据预定的KEY与绑定关系,对消息进行路由至消息队列 ...

June 10, 2020 · 2 min · jiezi

RabbitMQ-延迟队列

rabbitmq的延迟队列,我们可以通过死信交换器来实现。生产者发送消息,定义2秒后消息过期,消息就会进入死信交换器,最后到死信队列。 // 定义队列的名称public final static String QUEUE_NAME = "queue.scheduler";// 定义交换器的名称public final static String EXCHANGE_NAME = "exchange.scheduler";// 定义路由的名称public final static String ROUTE_NAME = "route.scheduler";// 定义死信队列的名称public final static String DLX_QUEUE_NAME = "scheduler.queue.name";// 定义死信交换器的名称public final static String DLX_EXCHANGE_NAME = "scheduler.exchange.name";public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 // 创建一个Channel try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 定义交换器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); arguments.put("x-message-ttl", 2000); // 定义队列 channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); // 绑定队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_NAME); // 定义死信交换器 channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null); // 定义死信队列 channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null); // 绑定死信队列 channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, ROUTE_NAME); // 发送消息 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME, true, null, df.format(new Date()).getBytes()); }}消费者,从私信队列获取消息,可以得到延迟后的消息。 ...

June 4, 2020 · 1 min · jiezi

RabbitMQ系列

RabbitMQ - 队列RabbitMQ - 消息确认RabbitMQ - 消息预取RabbitMQ - 消息拒绝RabbitMQ - 交换器RabbitMQ - 备用交换器RabbitMQ - 发送方的可靠性RabbitMQ - 死信队列RabbitMQ - 延迟队列

June 4, 2020 · 1 min · jiezi

RabbitMQ-死信队列

ActiveMQ - 死信队列有死信队列,rabbitmq也有死信队列,以下几种情况会有把消息投递到死信队列: 消息被拒绝,且requeue设置为false。消息过期(队列过期并不会把消息投递给死信队列)由于超过了队列的消息最大数被抛弃消息投递给死信队列的时候,也会经过交换器,这个交换器称之为死信交换器,但是他依然是一个正常的交换器。要设置队列的死信交换,在声明队列时需要指定可选的x-dead-letter-exchange参数。主要是下面的代码: Map<String, Object> args = new HashMap<String, Object>();args.put("x-dead-letter-exchange", "some.exchange.name");示例:// 定义队列的名称public final static String QUEUE_NAME = "queue.dlx";// 定义交换器的名称public final static String EXCHANGE_NAME = "exchange.dlx";// 定义路由的名称public final static String ROUTE_NAME = "route.dlx";// 定义死信队列的名称public final static String DLX_QUEUE_NAME = "some.queue.name";// 定义死信交换器的名称public final static String DLX_EXCHANGE_NAME = "some.exchange.name";// 定义死信路由的名称public final static String DLX_ROUTE_NAME = "some.route.name";public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 // 创建一个Channel try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 定义交换器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "some.exchange.name"); arguments.put("x-message-ttl", 2000); // 定义队列 channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); // 绑定队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_NAME); // 定义死信交换器 channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null); // 定义死信队列 channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null); // 绑定死信队列 channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, ROUTE_NAME); // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME, true, null, "dlx".getBytes()); }}流程是这样的: ...

June 3, 2020 · 2 min · jiezi

RabbitMQ-备用交换器

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换器来实现。大概原理如下,如下图所示,消息发送给交换器,交换器发现没有可路由的队列,于是消息发给备用交换器,备用交换器再发给队列2,由队列2的消费者来处理消息。 示例我们在RabbitMQ - 交换器知道,交换器的定义,需要一个参数,可以通过参数的方式,来指定备用交换器。参数的key是alternate-exchange,value是交换器的名称。通常备用交换器的类型是fanout。生产者中,定义一个交换器和备用交换器,此时并没有响应路由的队列。 public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 // 创建一个Channel try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 备用交换器 channel.exchangeDeclare("back.exchange", BuiltinExchangeType.FANOUT); Map<String, Object> map = new HashMap<>(); map.put("alternate-exchange", "back.exchange"); // 定义交换器 channel.exchangeDeclare("exchange", BuiltinExchangeType.DIRECT, false, false, map); String[] routingKeys = {"exchange.no.found"}; for (int i = 0; i < routingKeys.length; i++) { // 把消息发送到队列中 channel.basicPublish("exchange", routingKeys[i], null, routingKeys[i].getBytes()); } System.out.println("Sent complete"); }}消费者,订阅的是备用交换器信息 ...

May 30, 2020 · 1 min · jiezi

RabbitMQ-交换器

在Rabbitmq中,消息发送给交换器,交换器根据一定的规则把消息发给队列,broker再把消息发送给消费者,或者发送至主动从队列拉去消息。前面几张讲了队列的相关东西,这篇看看交换器是如何把消息发送给队列的。 交换器交换器接收消息并将其路由到零个或多个队列,它支持四种交换类型:Direct、Fanout、Topic、Headers。还还声明了一些属性,其中最重要的是:交换器的名称、交换器类型、是否持久化、是否自动删除、参数。是否持久化,决定了rabbitmq重启后,交换器是否存在。是否自动删除,决定了当最后一个队列被解除绑定时,交换器是否被删除。 Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;默认的交换器默认的交换器名称为"",即空字符串。当我们没有定义的时候,看起来就像消息直接发送到队列一样。 Direct根据消息路由键将消息传递到队列。主要的步骤如下: 通过路由键K把队列绑定到交换器当带有路由键R的新消息到达交换器时,如果K = R,交换器将其路由到队列生产者代码,通过channel.basicPublish把带有路由键("images.archive", "images.crop", "images.resizer")的消息发送给交换器images。 public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 // 创建一个Channel try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 定义交换器 channel.exchangeDeclare(Constant.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String[] routingKeys = {"images.archive", "images.crop", "images.resizer"}; for (int i = 0; i < routingKeys.length; i++) { // 把消息发送到队列中 channel.basicPublish(Constant.EXCHANGE_NAME, routingKeys[i], null, routingKeys[i].getBytes()); channel.basicPublish(Constant.EXCHANGE_NAME, routingKeys[i], null, routingKeys[i].getBytes()); } System.out.println("Sent complete"); }}ArchiveRec1消费者,通过channel.queueBind把交换器images、路由键images.archive、队列archive1绑定一起。 ...

May 30, 2020 · 4 min · jiezi

RabbitMQ-消息拒绝

RabbitMQ - 消息确认中提到了当消费者收到消息后,需要对消息进行确认,队列才会把这个消息删除。如果消息处理中发生了异常需要拒绝消息怎么办呢?在这个章节中,我们看到了没确认消息时,如果断开了和rabbitmq的连接,消息会回到待发送那边,等待其他消费者,虽然我们可以通过关闭连接来拒绝消息,但是频繁的频繁的建立连接、关闭连接,会增加rabbitmq的负担。rabbitmq提供了另外一种优雅的方式来拒绝消息,方法如下: void basicReject(long deliveryTag, boolean requeue) throws IOException第一个参数deliveryTag,消息确认中提过,传递标识。第二个参数requeue,为true的话,消息会重新发给下一个消费者,为false的话,就不发给消费者,相当于说,消息我确认了。 重新投递重新投递,就是requeue为true的情况。 public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 Connection connection = factory.newConnection(); // 创建一个Channel Channel channel = connection.createChannel(); // 通过Channel定义队列 channel.queueDeclare("reject", false, false, false, null); // 异步回调处理 DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("reject message '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true); }; // 接收消息 channel.basicConsume("reject", false, deliverCallback1, consumerTag -> { });}我们通过web控制台发送一条消息,在控制台打印如下,由于当前消费者把消息拒绝了,所以rabbitmq重新投递,又发给这个消费者,消费者又拒绝,所以一直打印,相对于死循环了。从deliveryTag可以看出,这条消息每次重新投递,就会递增。消费者端口连接,web控制有一条消息 ...

May 29, 2020 · 1 min · jiezi

rabbitmq账户的问题

rabbitmq的web管理界面无法使用guest用户登录安装最新版本的rabbitmq(3.3.1),并启用management plugin后,使用默认的账号guest登陆管理控制台,却提示登陆失败。 翻看官方的release文档后,得知由于账号guest具有所有的操作权限,并且又是默认账号,出于安全因素的考虑,guest用户只能通过localhost登陆使用,并建议修改guest用户的密码以及新建其他账号管理使用rabbitmq(该功能是在3.3.0版本引入的)。 虽然可以以比较猥琐的方式:将ebin目录下rabbit.app中loopback_users里的<<"guest">>删除,  并重启rabbitmq,可通过任意IP使用guest账号登陆管理控制台,但始终是违背了设计者的初衷,再加上以前对这一块了解也不多,因此有必要总结一下。 1. 用户管理 用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。 相应的命令 (1) 新增一个用户 rabbitmqctl  add_user  Username  Password (2) 删除一个用户 rabbitmqctl  delete_user  Username (3) 修改用户的密码 rabbitmqctl  change_password  Username  Newpassword (4) 查看当前用户列表 rabbitmqctl  list_users 2. 用户角色 按照个人理解,用户角色可分为五类,超级管理员, 监控者, 策略制定者, 普通管理者以及其他。 (1) 超级管理员(administrator) 可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。 (2) 监控者(monitoring) 可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) (3) 策略制定者(policymaker) 可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 与administrator的对比,administrator能看到这些内容 (4) 普通管理者(management) 仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。 (5) 其他 无法登陆管理控制台,通常就是普通的生产者和消费者。 了解了这些后,就可以根据需要给不同的用户设置不同的角色,以便按需管理。 设置用户角色的命令为: rabbitmqctl  set_user_tags  User  Tag User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。 也可以给同一用户设置多个角色,例如 ...

May 28, 2020 · 1 min · jiezi

RabbitMQ-消息预取

RabbitMQ - 消息确认这篇文章中,提到了消息预取,避免了rabbitmq一直往消费端发送数据,导致消费端出现无限制的缓冲区问题。消息预取定义了信道上或者消费者允许的最大未确认的消息数量。一旦未确认数达到了设置的值,RabbitMQ将停止传递更多消息,除非至少有一条未完成的消息得到确认。使用消息预取的时候,会调用chanel的basicQos方法,prefetchCount是未确认的消息数,global默认值为false,是限制消费者未确认的消息数,设置为true的时候,是限制信道上的未确认消息数。 void basicQos(int prefetchCount, boolean global) throws IOException;消费者限制global设置为false,当每个消费者有2个未确认的消息时,不能再发消息给消费者。 public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 Connection connection = factory.newConnection(); // 创建一个Channel Channel channel = connection.createChannel(); // 通过Channel定义队列 channel.queueDeclare("qos", false, false, false, null); // 异步回调处理 DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; /* DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback2 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); };*/ channel.basicQos(2, false); // 接收消息 channel.basicConsume("qos", false, deliverCallback1, consumerTag -> { }); /*channel.basicConsume("qos", false, deliverCallback2, consumerTag -> { });*/}运行后,往队列发送了4条消息,可以看到,未发送(ready)有2个,未确认2个。控制台确实只收到了两个消息。如果把注释放开,同时有两个消费者,可以看到,未发送(ready)有0个,未确认4个。控制台结果如下,两个都消费了两个。 ...

May 28, 2020 · 2 min · jiezi

RabbitMQ-消息确认

RabbitMQ - 队列中提到,接收消息的时候,有两个方式,一个是consume,一个是get,这两个方法都有一个autoAck的参数。当我们设置为true的时候,说明消费者会通过AMQP显示的向rabbitmq发送一个确认,rabbitmq自动视其确认了消息,然后把消息从队列中删除。下面用consume的方式做些例子来理解autoAck的参数设置。 String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;GetResponse basicGet(String queue, boolean autoAck) throws IOException;不确认先往ack队列发送5条数据,可以看到ready是5,total是5。运行以下代码,autoAck设置为false,且不对消息确认。 public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 Connection connection = factory.newConnection(); // 创建一个Channel Channel channel = connection.createChannel(); // 通过Channel定义队列 channel.queueDeclare("ack", false, false, false, null); // 异步回调处理 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("ack Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 接收消息 channel.basicConsume("ack", false, deliverCallback, consumerTag -> { });}运行结果如下,打印了5条消息:在web控制台可以看出,ready是0,unacked是5,即未确认的消息数是5条。把应用停止掉,即关闭消费者和rabbitmq的连接,web控制台如下,unacked的5条数据回到了ready。综上,当autoAck为false时,消息分为两个部分,一个是未投放放消费者的(ready),一个是投放给消费者但未确认的。如果未确认信息的消费者断开了连接,这部分消息会回到ready重新投递给消费者,确保了消息的可靠性。需要注意的是,如果消费者一直没有断开连接,也没有进行确认,那这个消息会一直等待确认中。 ...

May 27, 2020 · 1 min · jiezi

rabbitmq的启动问题

/usr/bin/curl 127.0.0.1:15672 查看本地能否访问1、开启web控制,在web端设置 执行rabbitmq-plugins enable rabbitmq_management 开启web访问 重启服务 service  rabbitmq-server restart 登录 ‘http://localhost:15672’,默认用户名密码为guest:guest,该用户只能本机登录; 添加用户,赋予admin权限,保存用户;再点击用户,赋予所有权限即可 2、命令行操作 rabbitmqctl add_user test passwd    设置该用户为administrator角色: rabbitmqctl set_user_tags test administrator    设置权限: rabbitmqctl set_permissions -p '/' test '.' '.' '.'    重启rabbitmq服务: sudo service rabbitmq-server restart

May 27, 2020 · 1 min · jiezi

yii2-queue

Worker starting control[](https://github.com/yiisoft/yi...Supervisor 是Linux的进程监视器。 它会自动启动您的控制台进程。 安装在Ubuntu上,你需要运行命令: sudo apt-get install supervisor Supervisor 配置文件通常可用 /etc/supervisor/conf.d。 你可以创建任意数量的配置文件。 配置示例: [program:yii-queue-worker]process_name=%(program_name)s_%(process_num)02dcommand=/usr/bin/php /var/www/my_project/yii queue/listen --verbose=1 --color=0autostart=trueautorestart=trueuser=www-datanumprocs=4redirect_stderr=truestdout_logfile=/var/www/my_project/log/yii-queue-worker.log在这种情况下,Supervisor 会启动4个 queue/listen worker。输出将写入相应日志文件。 有关 Supervisor 配置和使用的更多信息,请参阅文档。 以守护进程模式启动的Worker使用 queue/listen 命令支持 File、 Db、 Redis、 RabbitMQ、 Beanstalk、 Gearman 驱动。 有关其他参数,请参阅驱动程序指南。 [](https://github.com/yiisoft/yi...Systemd is an init system used in Linux to bootstrap the user space. To configure workers startup using systemd, create a config file named yii-queue@.service in /etc/systemd/system with the following contents: ...

May 27, 2020 · 2 min · jiezi

RabbitMQ-队列

AMQP消息路由必须包含三部分,交换器、队列、绑定。如下图所示,生产者把消息发送给交换器,交换器再路由到符合条件的队列上,最终被消费者接收。绑定决定了消息如何从路由器路由到相应的队列。这一篇,主要是了解一下队列。 相关概念当新增队列的时候,需要定义一下4中属性,分布是Name、Durability、Auto delete、Arguments。 Name:队列名称,不能用amq.开头命名。Durability:持久化,如果为durable,那broker重启不会丢失,如果为transient,broker重启后会丢失。(win系统仅仅重启rabbitmq是不行的)Auto delete:最后一个消费者退订时被自动删除Arguments:队列的其他参数,如上图,比如消息的TTL等。定义一个队列的方法如下,exclusive的参数,下面的临时队列会说明。 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;命名我们在使用队列之前,需要先声明队列。如果队列不存在,则创建队列。如果已存在则不会创建,但是和已存在的队列属性不一致,则会有406 (PRECONDITION_FAILED)的通道级异常。 参数设置参数的设置有两种,一种是通过分组,一个是一个个队列设置。分组的方式更加灵活、非侵入性,不需要修改和重新部署应用程序,是官方推荐的方式。参数的描述如下: 参数描述x-message-ttl消息的存活时间,单位为毫秒x-expires队列的存活时间,单位为毫秒x-max-length队列的最大消息数x-max-length-bytes消息的最大字节数x-overflow消息达到最大数的策略,drop-head或者reject-publishx-dead-letter-exchange死信队列的交换器x-dead-letter-routing-key死信队列的路由键x-max-priority消息的优先级设置x-queue-mode消息的延迟x-queue-master-locator用于主从临时队列当我们需要一个临时队列的时候,我们可以先定义队列,使用完再删除,或者直接定义Durability的属性为transient,等broker重启的时候就消失,但是感觉没有很方便。特别是使用后删除,如果客户端失败,这个队列就一直存在。我们可以用以下方法来自动删除: Exclusive:独占队列x-expires:设置队列的过期时间Auto delete:队列设置自动删除public static void main(String[] args) throws IOException, TimeoutException { // 声明一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建一个与rabbitmq服务器的连接 Connection connection = factory.newConnection(); // 创建一个Channel Channel channel = connection.createChannel(); // 通过Channel定义队列 channel.queueDeclare("queue1", false, true, false, null); channel.queueDeclare("queue2", false, false, true, null); channel.basicConsume("queue2", true, null, consumerTag -> { }); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-expires",5000); channel.queueDeclare("queue3", false, false, false, arguments);}queue1是独占队列,queue2是自动删除,queue3设置了5秒的过期时间。运行后如下图,五秒后queue3消失,停止程序运行,queue1和queue2消失。需要注意的是,如果把queue2的basicConsume方法调用注释掉,由于没有消费者,队列并不会消失。独占队列只能由其声明连接使用(从声明连接使用、清除、删除等)。其他队列如果想使用独占队列将导致通道级异常RESOURCE_LOCKED,该异常带有一条错误消息,表明无法获得对锁定队列的独占访问。 ...

May 26, 2020 · 2 min · jiezi

Spring-Boot-and-Rabbit-MQ-异常的时候消息的状态

我们有一个处理消息的方法。 在处理消息的时候出现了异常,那出现异常后这个消息会怎么处理呢。 根据我们的实际情况的观察,如果出现了异常。 但是你没有捕获或者处理异常,这个消息会一直存在,并且你的系统会持续报告异常。 所以在你的系统中有要捕获异常,并且进行正确的处理。

October 16, 2019 · 1 min · jiezi

Spring-Boot-十三-Spring-Boot-整合-RabbitMQ

1. 前言RabbitMQ 是一个消息队列,说到消息队列,大家可能多多少少有听过,它主要的功能是用来实现应用服务的异步与解耦,同时也能起到削峰填谷、消息分发的作用。 消息队列在比较主要的一个作用是用来做应用服务的解耦,消息从消息的生产者传递到消息队列,消费者从消息队列中获取消息并进行消费,生产者不需要管是谁在消费消息,消费者也无需关注消息是由谁来生产的。在分布式的系统中,消息队列也会被用在其他地方,比如分布式事务的支持,代表如阿里开源的 RocketMQ 。 当然,我们本篇文章的主角还是 RabbitMQ 。 2. RabbitMQ 介绍RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。 AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 3. 概念介绍在普通的消息队列的设计中,一般会有这么几个概念:生产者、消费者和我们的队列。但是在 RabbitMQ 中,中间增加了一层,叫交换机(Exchange),这样,消息的投递就不由生产者来决定投递至哪个队列,而消息是直接投递至交换机的,由交换机根据调度策略来决定这个消息需要投递到哪个队列。如图: 左侧的 P 代表消息的生产者紫色的 X 代表交换机右侧红色的代表队列4. 交换机(Exchange)那么为什么我们需要 Exchange 而不是直接将消息发送至队列呢? AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。 Exchange 收到消息时,他是如何知道需要发送至哪些 Queue 呢?这里就需要了解 Binding 和 RoutingKey 的概念: Binding 表示 Exchange 与 Queue 之间的关系,我们也可以简单的认为队列对该交换机上的消息感兴趣,绑定可以附带一个额外的参数 RoutingKey。Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 Binding 做匹配,如果满足匹配,就往 Exchange 所绑定的 Queue 发送消息,这样就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue。RoutingKey 的意义依赖于交换机的类型。 ...

October 15, 2019 · 3 min · jiezi

该如何选择消息队列

在高并发业务场景下,消息队列在流量削峰、解耦上有不可替代的作用。当前使用较多的消息队列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等。 消息队列这么多,到底该选择哪款消息队列呢? 选择消息队列的基本标准虽然这些消息队列在功能和特性方面各有优劣,但我们在选择的时候要有一个基本标准。 首先,必须是开源的产品。开源意味着,如果有一天你使用的消息队列遇到了一个影响你系统业务的 Bug,至少还有机会通过修改源代码来迅速修复或规避这个 Bug,解决你的系统的问题,而不是等待开发者发布的下一个版本来解决。 其次,这个产品必须是近年来比较流行并且有一定社区活跃度的产品。流行的好处是,只要使用场景不太冷门,遇到 Bug 的概率会非常低,因为大部分遇到的 Bug,其他人早就遇到并且修复了。在使用过程中遇到的一些问题,也比较容易在网上搜索到类似的问题,然后很快的找到解决方案。还有一个优势就是,流行的产品与周边生态系统会有一个比较好的集成和兼容。 最后,作为一款及格的消息队列,必须具备的几个特性包括: 消息的可靠传递:确保不丢消息;Cluster:支持集群,确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;性能:具备足够好的性能,能满足绝大多数场景的性能要求。接下来看一下有哪些符合上面这些条件,可供选择的开源消息队列。 RabbitMQ 首先,我们来看下消息队列 RabbitMQ。RabbitMQ 于 2007 年发布,是使用 Erlang 编程语言编写的,最早是为电信行业系统之间的可靠通信设计的,也是少数几个支持 AMQP 协议的消息队列之一。 RabbitMQ:轻量级、迅捷,它的宣传口号,也很明确地表明了 RabbitMQ 的特点:Messaging that just works,开箱即用的消息队列。也就是说,RabbitMQ 是一个相当轻量级的消息队列,非常容易部署和使用。 RabbitMQ 一个比较有特色的功能是支持非常灵活的路由配置,和其他消息队列不同的是,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块,可以理解为交换机。 Exchange 模块的作用和交换机非常相似,根据配置的路由规则将生产者发出的消息分发到不同的队列中。路由的规则也非常灵活,甚至可以自己来实现路由规则。如果正好需要这个功能,RabbitMQ 是个不错的选择。 RabbitMQ 的客户端支持的编程语言大概是所有消息队列中最多的。 接下来说下 RabbitMQ 的几个问题: RabbitMQ 对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。RabbitMQ 的性能是这几个消息队列中最差的,大概每秒钟可以处理几万到十几万条消息。如果应用对消息队列的性能要求非常高,那不要选择 RabbitMQ。RabbitMQ 使用的编程语言 Erlang,扩展和二次开发成本高。RocketMQ RocketMQ 是阿里巴巴在 2012 年开源的消息队列产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,后来捐赠给 Apache 软件基金会,2017 正式毕业,成为 Apache 的顶级项目。RocketMQ 在阿里内部被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,Binglog 分发等场景。经历过多次双十一考验,它的性能、稳定性和可靠性都是值得信赖的。 RocketMQ 有着不错的性能,稳定性和可靠性,具备一个现代的消息队列应该有的几乎全部功能和特性,并且它还在持续的成长中。 ...

October 15, 2019 · 1 min · jiezi

rabbitmq延迟队列死信队列

主要记录一下开发时用到rabbitmq死信队列的一些笔记 延迟队列的应用场景1、未支付订单定时取消2、定时清理缓存对象、空闲连接等3、下单成功后30分钟内,按不同时间间隔发送通知等(1min、3min、10min发一次)1、设置队列的过期时间$this->channel->queue_declare( $this->retry_queue(), false, true, false, false, false, new AMQPTable( [ # 不设置x-dead-letter-routing-key,使用原先的routing_key,10s过期后自动重回原先的队列里面,那x-dead-letter-exchange交换机就需绑定原先队列 'x-dead-letter-exchange' => $this->retry_exchange(), # 10s 'x-message-ttl' => 10000, ] ) );推送到该队列的所有消息(不设ttl),10s之后都会过期,根据原来的routing_key,进入到指定的exchange,进而进到指定队列。 2、设置消息的过期时间$message = new AMQPMessage( 'msg', array( # 消息持久化 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSITENT, # ttl过期时间 'expiration' => 50000, ));每个消息都设置相同的过期时间,到期后消息就会失效。 3、同时设置队列、消息的过期时间如果同时设置,则消息的过期时间会取决于较小的值,比如队列的‘x-message-ttl’设置为10s,消息的‘expiration’设置为50s,则10s之后这个消息就会失效。 4、后续单单设置队列的ttl,或者单单设置相同的消息过期时间,死信队列是能正常工作的。但是设置不同的消息过期时间,就可能无法正常使用死信队列了。 队列不设ttl $this->channel->queue_declare( $this->retry_queue(), false, true, false, false, false, new AMQPTable( [ # 不设置x-dead-letter-routing-key,使用原先的routing_key,10s过期后自动重回原先的队列里面,那x-dead-letter-exchange交换机就需绑定原先队列 'x-dead-letter-exchange' => $this->retry_exchange(), ] ) );第一个消息设置500s过期,先推进队列 $message = new AMQPMessage( 'msg', array( # 消息持久化 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSITENT, # ttl过期时间 'expiration' => 500000, ));第二个消息设置5s过期,后推进队列 ...

September 9, 2019 · 1 min · jiezi

RabbitMQ-总结

Connection & ChannelConnection 代表一个 TCP 连接,Channel 是建立在 Connection 上的虚拟连接。RabbitMQ 每条指令都是通过 Channel 完成的。 对于 OS 而言,创建和销毁 TCP 连接的代价非常高,在高峰期很容易遇到瓶颈。程序中一般会有多个线程需要与 RabbitMQ建立通信,消费或生产消息,通过 TCP 连接复用来减少性能开销。 Connection 可以创建多个 Channel ,但是 Channel 不是线程安全的所以不能在线程间共享。 Connection 在创建时可以传入一个 ExecutorService ,这个线程池时给该 Connection 上的 Consumer 用的。 Channel.isOpen 以及 Connection.isOpen 方法是同步的,因此如果在发送消息时频繁调用会产生竞争。我们可以认为在 createChannel 方法后 Channel 以及处于开启状态。若在使用过程中 Channel 关闭了,那么只要捕获抛出的 ShutDownSignalException 就可以了,同时建议捕获 IOException 以及 SocketException 防止连接意外关闭。 Exchange & Queue消费者和生产者都可以声明一个已经存在的 Exchange 或者 Queue ,前提是参数完全匹配现有的 Exchange 或者 Queue,否则会抛出异常。 QueueDeclare 参数:exclusive: 排他队列,只有同一个 Connection 的 Channel 可以访问,且在 Connection 关闭或者客户端退出后自动删除,即使 durable 为 true 。 ...

July 15, 2019 · 5 min · jiezi

RabbitMQ入门指南

简介RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理(英语:Message broker)软件(亦称面向消息的中间件(英语:Message-oriented middleware))。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在OTP(Open Telecom Platform)上的。所有主要的编程语言均有与代理接口通讯的客户端函式库。 主要特性可靠性(Reliability)RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 灵活的路由(Flexible Routing)在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 消息集群(Clustering)多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 高可用(Highly Available Queues)队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 多种协议(Multi-protocol)RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 多语言客户端(Many Clients)RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 管理界面(Management UI)RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 跟踪机制(Tracing)如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 插件机制(Plugin System)RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。 基本概念(以下使用AMQP-0-9-1版本)先放一张主要结构图 BrokerBroker(可以理解为RabbitMQ)从发布者(发布消息的应用程序,也称为生产者)接收消息,并将其路由到消费者(处理消息的应用程序)。 ExchangeExchange是 AMQP 0-9-1实体,用于发送消息。 Exchange接收消息并将其路由到零个或多个队列。 所使用的路由算法取决于称为绑定的交换类型和规则。 Amqp 0-9-1型经纪商提供四种交易所类型: 类型默认名称Direct exchange空字符串或amq.directFanout exchangeamq.fanoutTopic exchangeamq.topicHeaders exchangeamq.match(以及 RabbitMQ 中的 amq.headers)Default Exchange默认交换是直接交换,没有代理预先声明的名称(空字符串)。 它有一个特殊的属性,这使得它对简单的应用程序非常有用: 创建的每个队列都自动用与队列名相同的路由键绑定到它。 Direct Exchange直接交换器根据邮件路由键将邮件传递到队列。 直接交换对于消息的单播路由是理想的(尽管它们也可以用于广播路由)。 以下是它的工作原理: ...

June 22, 2019 · 1 min · jiezi

多维度对比5款主流分布式MQ消息队列妈妈再也不担心我的技术选型了

1、引言对于即时通讯系统(包括IM、消息推送系统等)来说,MQ消息中件间是非常常见的基础软件,但市面上种类众多、各有所长的MQ消息中件间产品,该怎么去选择?这是个问题! 对于很多经验不足的开发者来说,一个公司内部用的IM聊天系统,总用户量也不过百十来人,动辄就是Kafka、MongoDB,美其名曰为了高性能和可扩展性,真是大炮打蚊子。而对于中大型的即时通讯场景来说,有的开发者确为了贪图使用简单、资料全面,反而使用臃肿不堪的ActiveMQ,这就有点失去章法了。 唧唧歪歪这么多,那什么样的场景到底该用哪种MQ消息中件间产品合适?读完本文您或许就有了答案。 本文将从17个维度综合对比Kafka、RabbitMQ、ZeroMQ、RocketMQ、ActiveMQ这5款当前最主流的MQ消息中间件产品,希望能为您的下一次产品的架构设计和MQ消息中间件选型提供参考依据。 学习交流: 即时通讯/推送技术开发交流4群:101279154[推荐]移动端IM开发入门文章:《新手入门一篇就够:从零开发移动端IM》(本文同步发布于:http://www.52im.net/thread-26...) 2、相关资料官网地址: Kafka:http://kafka.apache.org/ RabbitMQ:https://www.rabbitmq.com/ ZeroMQ:http://zeromq.org/ RocketMQ:http://rocketmq.apache.org/ ActiveMQ:http://activemq.apache.org/ 相关文章: 《IM开发基础知识补课(五):通俗易懂,正确理解并用好MQ消息队列》 《IM系统的MQ消息中间件选型:Kafka还是RabbitMQ?》 3、维度1:资料文档1)Kafka:资料数量中等。有Kafka作者自己写的书,网上资料也有一些。 2)RabbitMQ:资料数量多。有一些不错的书,网上资料多。 3)ZeroMQ:资料数量少。专门写ZeroMQ的书较少,网上的资料多是一些代码的实现和简单介绍。 4)RocketMQ:资料数量少。专门写RocketMQ的书目前有了两本;网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。 5)ActiveMQ:资料数量多。没有专门写ActiveMQ的书,网上资料多。 4、维度2:开发语言1)Kafka:Scala 2)RabbitMQ:Erlang 3)ZeroMQ:C语言 4)RocketMQ:Java 5)ActiveMQ:Java 5、维度3:支持的协议1)Kafka:自己定义的一套…(基于TCP) 2)RabbitMQ:AMQP 3)ZeroMQ:TCP、UDP 4)RocketMQ:自己定义的一套… 5)ActiveMQ:OpenWire、STOMP、REST、XMPP、AMQP 6、维度4:消息存储1)Kafka: 内存、磁盘、数据库。支持大量堆积。 Kafka的最小存储单元是分区,一个topic包含多个分区,Kafka创建主题时,这些分区会被分配在多个服务器上,通常一个broker一台服务器。 分区首领会均匀地分布在不同的服务器上,分区副本也会均匀的分布在不同的服务器上,确保负载均衡和高可用性,当新的broker加入集群的时候,部分副本会被移动到新的broker上。 根据配置文件中的目录清单,Kafka会把新的分区分配给目录清单里分区数最少的目录。 默认情况下,分区器使用轮询算法把消息均衡地分布在同一个主题的不同分区中,对于发送时指定了key的情况,会根据key的hashcode取模后的值存到对应的分区中。 2)RabbitMQ: 内存、磁盘。支持少量堆积。 RabbitMQ的消息分为持久化的消息和非持久化消息,不管是持久化的消息还是非持久化的消息都可以写入到磁盘。 持久化的消息在到达队列时就写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。 非持久化的消息一般只存在于内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存。 引入镜像队列机制,可将重要队列“复制”到集群中的其他broker上,保证这些队列的消息不会丢失。 配置镜像的队列,都包含一个主节点master和多个从节点slave,如果master失效,加入时间最长的slave会被提升为新的master,除发送消息外的所有动作都向master发送,然后由master将命令执行结果广播给各个slave,RabbitMQ会让master均匀地分布在不同的服务器上,而同一个队列的slave也会均匀地分布在不同的服务器上,保证负载均衡和高可用性。 3)ZeroMQ: 消息发送端的内存或者磁盘中。不支持持久化。 4)RocketMQ: 磁盘。支持大量堆积。 commitLog文件存放实际的消息数据,每个commitLog上限是1G,满了之后会自动新建一个commitLog文件保存数据。 ConsumeQueue队列只存放offset、size、tagcode,非常小,分布在多个broker上。 ConsumeQueue相当于CommitLog的索引文件,消费者消费时会从consumeQueue中查找消息在commitLog中的offset,再去commitLog中查找元数据。 ConsumeQueue存储格式的特性,保证了写过程的顺序写盘(写CommitLog文件),大量数据IO都在顺序写同一个commitLog,满1G了再写新的。 加上RocketMQ是累计4K才强制从PageCache中刷到磁盘(缓存),所以高并发写性能突出。 5)ActiveMQ: 内存、磁盘、数据库。支持少量堆积。 7、维度5:消息事务1)Kafka:支持 2)RabbitMQ:支持。客户端将信道设置为事务模式,只有当消息被RabbitMQ接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降 3)ZeroMQ:不支持 4)RocketMQ:支持 5)ActiveMQ:支持 8、维度6:负载均衡8.1 Kafka支持负载均衡。 1)一个broker通常就是一台服务器节点。 对于同一个Topic的不同分区,Kafka会尽力将这些分区分布到不同的Broker服务器上,zookeeper保存了broker、主题和分区的元数据信息。 分区首领会处理来自客户端的生产请求,Kafka分区首领会被分配到不同的broker服务器上,让不同的broker服务器共同分担任务。 每一个broker都缓存了元数据信息,客户端可以从任意一个broker获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。 2)Kafka的消费者组订阅同一个topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。 ...

June 21, 2019 · 3 min · jiezi

RabbitMQ笔记

普通发送: $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params); 事务模式:单个发送: $ch->startTransaction();try { $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params); $ch->commitTransaction();}catch(AMQPConnectionException $e) { $ch->rollbackTransaction();} 批量发送: $loop_times = 10;$ch->startTransaction();for($i=0;$i<$loop_times;$i++) { try { $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params); $ch->commitTransaction(); } catch(AMQPConnectionException $e) { $ch->rollbackTransaction(); }} // to be continued Refer:RabbitMQ专栏:https://blog.csdn.net/u013256...php-amqp测试用例:https://github.com/pdezwart/p...

June 17, 2019 · 1 min · jiezi

Laravel-使用-RabbitMQ

导语RabbitMQ 想必大家都有了解,不做多介绍来。这里实现的是用 RabbitMQ 作为 Larvel 队列的驱动,替代 Redis。下面以 Laradock 中安装示例。 安装切换到 laradock 目录,将 .env 中关于 INSTALL_AMQP 的值修改为 truedocker-compose stop workspace php-fpm php-workerdocker-compose build workspace php-fpm php-worker rabbitmqdocker-compose up -d workspace php-fpm php-worker rabbitmq扩展包安装以及配置进入到 workspace 容器中,在项目目录安装扩展包 composer require vladimir-yuldashev/laravel-queue-rabbitmq接下来在 config/queue.php 文件中 connections 添加 rabbitmq 配置,根据情况自行修改'rabbitmq' => [ 'driver' => 'rabbitmq', /* * Set to "horizon" if you wish to use Laravel Horizon. */ 'worker' => env('RABBITMQ_WORKER', 'default'), 'dsn' => env('RABBITMQ_DSN', null), /* * Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example: * - \EnqueueAmqpExt\AmqpConnectionFactory if you install enqueue/amqp-ext * - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib * - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny */ 'factory_class' => Enqueue\AmqpLib\AmqpConnectionFactory::class, 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'vhost' => env('RABBITMQ_VHOST', '/'), 'login' => env('RABBITMQ_LOGIN', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'queue' => env('RABBITMQ_QUEUE', 'default'), 'options' => [ 'exchange' => [ 'name' => env('RABBITMQ_EXCHANGE_NAME'), /* * Determine if exchange should be created if it does not exist. */ 'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true), /* * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html */ 'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT), 'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false), 'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true), 'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false), 'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'), ], 'queue' => [ /* * Determine if queue should be created if it does not exist. */ 'declare' => env('RABBITMQ_QUEUE_DECLARE', true), /* * Determine if queue should be binded to the exchange created. */ 'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true), /* * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html */ 'passive' => env('RABBITMQ_QUEUE_PASSIVE', false), 'durable' => env('RABBITMQ_QUEUE_DURABLE', true), 'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false), 'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false), 'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'), ], ], /* * Determine the number of seconds to sleep if there's an error communicating with rabbitmq * If set to false, it'll throw an exception rather than doing the sleep for X seconds. */ 'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5), /* * Optional SSL params if an SSL connection is used * Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here: https://www.rabbitmq.com/ssl.html */ 'ssl_params' => [ 'ssl_on' => env('RABBITMQ_SSL', false), 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), ],],在 .env 中修改 QUEUE_CONNECTION 为 rabbitmq,并添加以下值RABBITMQ_WORKER=horizonRABBITMQ_HOST=rabbitmqRABBITMQ_PORT=5672RABBITMQ_LOGIN=guestRABBITMQ_PASSWORD=guestRABBITMQ_QUEUE=default有两个值说明一下,因为是在 Laradock 中,所以 RABBITMQ_HOST 设置为 rabbitmq;如果之前使用了Laravel Horizon,那么 RABBITMQ_WORKER 的设置为 horizon 就可以了。 ...

June 17, 2019 · 2 min · jiezi

ERROR-node-with-name-rabbit-already-running-on-localhost

rabbitmq-server使用rabbitmq-server启动RabbitMQ后,进行start、restart或stop都会提示错误ERROR: node with name "rabbit" already running on "localhost",原因是RabbitMQ已在localhost里运行了,但是停止不了。解决记录如下: zhangguoyedeMacBook-Pro:~ zhangguoye$ rabbitmq-serverERROR: node with name "rabbit" already running on "localhost"zhangguoyedeMacBook-Pro:~ zhangguoye$ ps aux|grep epmdzhangguoye 35736 0.0 0.0 4328468 676 ?? S 四03下午 0:00.08 /usr/local/Cellar/erlang/22.0.2/lib/erlang/erts-10.4.1/bin/epmd -daemonzhangguoye 67752 0.0 0.0 4268036 804 s009 S+ 10:05上午 0:00.00 grep epmdzhangguoyedeMacBook-Pro:~ zhangguoye$ ps aux|grep erlzhangguoye 65864 0.1 0.5 5517096 84836 s009 S 10:00上午 0:09.27 /usr/local/Cellar/erlang/22.0.2/lib/erlang/erts-10.4.1/bin/beam.smp -W w -A 128 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -B i -- -root /usr/local/Cellar/erlang/22.0.2/lib/erlang -progname erl -- -home /Users/zhangguoye -- -pa /usr/local/Cellar/rabbitmq/3.7.15/ebin -noshell -noinput -s rabbit boot -sname rabbit@localhost -boot /usr/local/opt/erlang/lib/erlang/bin/start_clean -kernel inet_default_connect_options [{nodelay,true}] -rabbit tcp_listeners [{"127.0.0.1",5672}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit lager_log_root "/usr/local/var/log/rabbitmq" -rabbit lager_default_file "/usr/local/var/log/rabbitmq/rabbit@localhost.log" -rabbit lager_upgrade_file "/usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log" -rabbit enabled_plugins_file "/usr/local/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/local/Cellar/rabbitmq/3.7.15/plugins" -rabbit plugins_expand_dir "/usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672zhangguoye 35736 0.0 0.0 4328468 688 ?? S 四03下午 0:00.08 /usr/local/Cellar/erlang/22.0.2/lib/erlang/erts-10.4.1/bin/epmd -daemonzhangguoye 68953 0.0 0.0 4268036 804 s009 S+ 10:11上午 0:00.00 grep erlzhangguoye 65877 0.0 0.0 4270068 828 ?? Ss 10:00上午 0:00.36 erl_child_setup 7168zhangguoyedeMacBook-Pro:~ zhangguoye$ kill -9 65864zhangguoyedeMacBook-Pro:~ zhangguoye$ rabbitmq-server ## ## ## ## RabbitMQ 3.7.15. Copyright (C) 2007-2019 Pivotal Software, Inc. ########## Licensed under the MPL. See https://www.rabbitmq.com/ ###### ## ########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log Starting broker... completed with 6 plugins.brew管理RabbitMQ使用mac的用户可以直接使用brew进行安装与管理RabbitMQ1. 安装Homebrew安装:https://www.rabbitmq.com/inst... ...

June 11, 2019 · 2 min · jiezi

Ubuntu1604下安装和配置RabbitMQ

1. 安装erlangapt-get install erlang 安装完毕后,输入erl查看erlang版本,出现如下图则表示安装成功。然后退出erlang命令行:按下ctrl+c,然后输入a即可。 2. 安装RabbitMQapt-get install rabbitmq-server 安装完毕后,查看rabbitmq状态:rabbitmqctl status 可以通过以下命令操作rabbitmq 启动rabbitmq: service rabbitmq-server start停止rabbitmq: service rabbitmq-server stop 重启rabbitmq: service rabbitmq-server restart 3. 启动rabbitmq插件rabbitmq-plugins enable rabbitmq_management这个可以启动rabbitmq的web控制台 4. 添加用户rabbitmqctl add_user 用户名 密码 5. 给予用户管理员权限rabbitmqctl set_user_tags 用户名 administrator 6. 为用户设置读写权限rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*" 接下来就可以访问web控制台了,输入ip:15672即可,如下图,然后输入用户名和密码,登录web控制台。

June 6, 2019 · 1 min · jiezi

开源一个kafka增强okmq100

本工具的核心思想就是:赌。只有两个基础组件同时死亡,才会受到严重影响。哦,断电除外。mq是个好东西,我们都在用。这也决定了mq应该是高高高可用的。某团就因为这个组件,出了好几次生产事故,呵呵。 大部分业务系统,要求的消息语义都是at least once,即都会有重复消息,但保证不会丢。即使这样,依然有很多问题: 一、mq可用性无法保证。 mq的意外死亡,造成生产端发送失败。很多消息要通过扒取日志进行回放,成本高耗时长。 二、mq阻塞业务正常进行。 mq卡顿或者网络问题,会造成业务线程卡在mq的发送方法上,正常业务进行不下去,造成灾难性的后果。 三、消息延迟。 mq死了就用不着说了,消息还没投胎就已死亡。消息延迟主要是客户端消费能力不强,或者是消费通道单一造成的。 使用组合存储来保证消息的可靠投递,就是okmq。 注意:okmq注重的是可靠性。对于顺序性、事务等其他要素,不予考虑。当然,速度是必须的。设计想法我即使用两套redis来模拟一些mq操作,都会比现有的一些解决方案要强。但这肯定不是我们需要的,因为redis的堆积能力太有限,内存占用率直线上升的感觉并不太好。 但我们可以用redis来作为额外的发送确认机制。这个想法,在《使用多线程增加kafka消费能力》一文中曾经提到过,现在到了实现的时候了。 首先看下使用ApiOkmqKafkaProducer producer = new ProducerBuilder().defaultSerializer().eanbleHa("redis").any("okmq.redis.mode", "single").any("okmq.redis.endpoint", "127.0.0.1:6379").any("okmq.redis.poolConfig.maxTotal", 100).servers("localhost:9092").clientID("okMQProducerTest").build();Packet packet = new Packet();packet.setTopic("okmq-test-topic");packet.setContent("i will send you a msg");producer.sendAsync(packet, null);producer.shutdown();以redis为例我们按照数字标号来介绍: 1、 在消息发送到kafka之前,首先入库redis。由于后续回调需要用到一个唯一表示,我们在packet包里添加了一个uuid。 2、 调用底层的api,进行真正的消息投递。 3、 通过监听kafka的回调,删除redis中对应的key。在这里可以得到某条消息确切的的ack时间。那么长时间没有删除的,就算是投递失败的消息。 4、 后台会有一个线程进行这些失败消息的遍历和重新投递。我们叫做recovery。最复杂的也就是这一部分。对于redis来说,会首先争抢一个持续5min的锁,然后遍历相关hashkey。 所以,对于以上代码,redis发出以下命令: 1559206423.395597 [0 127.0.0.1:62858] "HEXISTS" "okmq:indexhash" "okmq:5197354"1559206423.396670 [0 127.0.0.1:62858] "HSET" "okmq:indexhash" "okmq:5197354" ""1559206423.397300 [0 127.0.0.1:62858] "HSET" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" "{\"content\":\"i will send you a msg104736623015238\",\"topic\":\"okmq-test-topic\",\"identify\":\"2b9b33fd-95fd-4cd6-8815-4c572f13f76e\",\"timestamp\":1559206423318}"1559206423.676212 [0 127.0.0.1:62858] "HDEL" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e"1559206428.327788 [0 127.0.0.1:62861] "SET" "okmq:recovery:lock" "01fb85a9-0670-40c3-8386-b2b7178d4faf" "px" "300000"1559206428.337930 [0 127.0.0.1:62858] "HGETALL" "okmq:indexhash"1559206428.341365 [0 127.0.0.1:62858] "HSCAN" "okmq:5197354" "0"1559206428.342446 [0 127.0.0.1:62858] "HDEL" "okmq:indexhash" "okmq:5197354"1559206428.342788 [0 127.0.0.1:62861] "GET" "okmq:recovery:lock"1559206428.343119 [0 127.0.0.1:62861] "DEL" "okmq:recovery:lock"以上问题解答所以对于以上的三个问题,回答如下:一、mq可用性无法保证。 ...

June 5, 2019 · 1 min · jiezi

RabbitMQ入门

记得在学习Celery的时候,官方文档就建议我们的broker使用RabbitMQ或Redis,可是当然由于Redis使用比较习惯,并且不用在去另外安装,因此好长时间都没有去学习这个东西.首先RabbitMQ绝对不像大众所认识的那样,是JAVA或者Python,GO写的,而是一种交换机语言ERLang,前段时间这个语言的发明人的离世也是轰动了整个IT界,而作为程序员的我因为时间问题没有学习RabbitMQ,很是后悔呀,所以现在赶紧学习学习,废话不多说,请欣赏... 这是什么东西?RabbitMQ是一个消息队列的中间件,通俗点讲就是饮料生产商(康师傅,达利园等等)生产东西放入超市,然后用户去超市里拿.这里的超市就是我们的RabbitMQ,饮料生产商就是所谓的生产者,用户就是消费者,这里生产者可以好多种消息(茉莉,红茶等等)放入到队列中,消费者也可以根据名称去拿(我只能去买),这就是其功能.点击进入安装教程但是不要以为这样就能开始,当你运行以下代码就会发现报错: 530, "NOT_ALLOWED - access to vhost '/' refused for user 'jim'"这是怎么回事呢?我也给大家解决了,点击查看__生产者__: import pikaclass Produce: def __init__(self, host, port, credentials): # 创建连接参数对象 parameters = pika.ConnectionParameters(host, port, credentials=credentials) # 创建连接对象 self._conn = pika.BlockingConnection(parameters) # 创建一个默认channel self._channel = self._conn.channel() def create_channel(self, num): """ 创建channel """ self._channel = self._conn.channel(num) def create_queue(self, name): """ 创建queue """ self._channel.queue_declare(queue=name) self._key = name def run(self, body): """ 生产消息 """ self._channel.basic_publish(exchange='', routing_key=self._key, body=body, ) print(" [x] Sent Message Successful! ") self._conn.close()if __name__ == '__main__': host = 'localhost' port = 5672 credentials = pika.PlainCredentials('jim', 'adminjim') produce = Produce(host, port, credentials) produce.create_queue('你傻不傻') produce.run('你就是一个垃圾')__消费者__: ...

May 25, 2019 · 1 min · jiezi

RabbitMQ高级特性消费端限流策略实现

应用范围为服务访问量突然剧增,原因可能有多种外部的调用或内部的一些问题导致消息积压,对服务的访问超过服务所能处理的最大峰值,导致系统超时负载从而崩溃。业务场景举一些我们平常生活中的消费场景,例如:火车票、机票、门票等,通常来说这些服务在下单之后,后续的出票结果都是异步通知的,如果服务本身只支持每秒1000访问量,由于外部服务的原因突然访问量增加到每秒2000并发,这个时候服务接收者因为流量的剧增,超过了自己系统本身所能处理的最大峰值,如果没有对消息做限流措施,系统在这段时间内就会造成不可用,在生产环境这是一个很严重的问题,实际应用场景不止于这些,本文通过RabbitMQ来讲解如果对消费端做限流措施。 消费端限流机制RabbitMQ提供了服务质量保证 (QOS) 功能,对channel(通道)预先设置一定的消息数目,每次发送的消息条数都是基于预先设置的数目,如果消费端一旦有未确认的消息,这时服务端将不会再发送新的消费消息,直到消费端将消息进行完全确认,注意:此时消费端不能设置自动签收,否则会无效。 在 RabbitMQ v3.3.0 之后,放宽了限制,除了对channel设置之外,还可以对每个消费者进行设置。 以下为 Node.js 开发语言 amqplib 库对于限流实现提供的接口方法 prefetch export interface Channel extends events.EventEmitter { prefetch(count: number, global?: boolean): Promise<Replies.Empty>; ...}prefetch 参数说明: number:每次推送给消费端 N 条消息数目,如果这 N 条消息没有被ack,生产端将不会再次推送直到这 N 条消息被消费。global:在哪个级别上做限制,ture 为 channel 上做限制,false 为消费端上做限制,默认为 false。建立生产端生产端没什么变化,和正常声明一样,关于源码参见rabbitmq-prefetch(Node.js客户端版Demo) const amqp = require('amqplib');async function producer() { // 1. 创建链接对象 const connection = await amqp.connect('amqp://localhost:5672'); // 2. 获取通道 const channel = await connection.createChannel(); // 3. 声明参数 const exchangeName = 'qosEx'; const routingKey = 'qos.test001'; const msg = 'Producer:'; // 4. 声明交换机 await channel.assertExchange(exchangeName, 'topic', { durable: true }); for (let i=0; i<5; i++) { // 5. 发送消息 await channel.publish(exchangeName, routingKey, Buffer.from(`${msg} 第${i}条消息`)); } await channel.close();}producer();建立消费端const amqp = require('amqplib');async function consumer() { // 1. 创建链接对象 const connection = await amqp.connect('amqp://localhost:5672'); // 2. 获取通道 const channel = await connection.createChannel(); // 3. 声明参数 const exchangeName = 'qosEx'; const queueName = 'qosQueue'; const routingKey = 'qos.#'; // 4. 声明交换机、对列进行绑定 await channel.assertExchange(exchangeName, 'topic', { durable: true }); await channel.assertQueue(queueName); await channel.bindQueue(queueName, exchangeName, routingKey); // 5. 限流参数设置 await channel.prefetch(1, false); // 6. 限流,noAck参数必须设置为false await channel.consume(queueName, msg => { console.log('Consumer:', msg.content.toString()); // channel.ack(msg); }, { noAck: false });}consumer();未确认消息情况测试在 consumer 中我们暂且将 channel.ack(msg) 注释掉,分别启动生产者和消费者,看看是什么情况? ...

May 23, 2019 · 2 min · jiezi

RabbitMQ消息幂等性设计

RabbitMQ消息幂等性设计可能导致消息出现非幂等性的原因: 1 可靠性投递机制:比如消息已经发送出去,mq已经收到了,然后mq在返回confirm的时候网络出现闪断,导致broker未收到应答,导致发送两次。2 MQ Broker服务与消费端传输消息的过程中出现网络抖动。3 消费端故障、异常。

May 22, 2019 · 1 min · jiezi

Rabbitmq基础组件架构设计

Rabbitmq基础组件架构设计基础组件封装设计 - 迅速消息发送支持迅速消息发送模式,在一些日志收集、统计分析等需求下可以保证高性能,高吞吐量。基础组件封装设计 - 确认消息发送基础组件封装设计 - 批量消息发送基础组件封装设计 - 延迟消息发送支持延迟消息模式,消息可以延迟发送,制定延迟时间,用于某些延迟检查,服务限流场景。基础组件封装设计 - 顺序消息发送支持顺序消息,保证消息送达消费端的前后顺序,例如下订单等复合性操作。基础组件封装设计 - 事务消息发送支持事务消息,且100%保障可靠性投递,在金融行业单笔大金额操作时会有此类需求。支持消息补偿,重试,以及快速定位异常、失败的消息。支持集群消息负载均衡,保障消息落到具体SET集群的负载均衡。支持消息路由策略,指定某些消息路由到指定的SET集群。消息幂等性保障 - 消息路由规则架构设计。Rabbitmq组件实现功能点支持消息高性能的序列化转换、异步化发送消息。支持消息生产实例与消费实例的连接池化缓存化,提升性能。支持可靠性投递消息,保障消息的100%不丢失。支持消费端幂等操作,避免消费端重复消费。

May 21, 2019 · 1 min · jiezi

RabbitMQ集群镜像模式

RabbitMQ集群镜像模式Mirror镜像队列,目的是为了保证rabbitmq数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲是2-3个节点实现数据同步(对于100%数据可靠性解决方案一般是3节点)集群架构如下

May 21, 2019 · 1 min · jiezi

RabbitMQ集群架构模式

一、主备模式(Warren)主备模式:实现RabbitMQ的高可用集群 ,一般在并发和数据量不高的情况下,这种模式很好用且简单。主备模式也称之为Waren模式。就是一个主/备方案(主节点如果挂了,从节点提供服务而已,主备切换。)二、远程模式(Shovel)三、镜像模式(Mirror)四、多活模式(Federation)多活模式:这种模式也是实现异地数据复制的主流模式,因为Shovel模式配置比较复杂,所以一般来说实现异地集群都是使用这种双活模式或者多活模式实现的。这种模式需要依赖rabbitmq的dederration插件,可以实现持续的可靠的AMQP数据通信,多活模式在实际配置与应用非常简单。Rabbitmq部署架构采用双中心模式(多中心),那么在两套(或者多套)数据中心各部署一套Rabbitmq集群,各中心Rabbitmq服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。多活集群架构如下:

May 21, 2019 · 1 min · jiezi

分享一波-RabbitMQ-面试题有答案

1、什么是rabbitmq2、为什么要使用rabbitmq3、使用rabbitmq的场景4、如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?发送方确认模式 接收方确认机制 接收方消息确认机制 下面罗列几种特殊情况 5.如何避免消息重复投递或重复消费?6、消息基于什么传输?7、消息如何分发?8、消息怎么路由?常用的交换器主要分为一下三种 9、如何确保消息不丢失?10、使用RabbitMQ有什么好处?11、rabbitmq的集群镜像集群模式 12.mq的缺点系统可用性降低 系统复杂性提高 一致性问题 面试题答案关注微信公众号:【搜云库技术团队】 公众号微信ID:souyunku 回复关键字:MQ521 即可获取

May 21, 2019 · 1 min · jiezi

RabbitMQ-之-AMQP-091-模型解释

写在开头 本文翻译RabbitMQ官方文档属于译作(部分)详细链接 点我跳转 如有版权问题联系作者删除,如发现错误请不吝指出 说明本节主要叙述RabbitMQ中的AMQP 0-9-1协议即Advance Message Queuing Protocol - 高级消息队列协议,该协议是理解RabbitMQ消息模型的重要理论基础。AMQP 模型和AMQP 0-9-1协议概览什么是AMQP 0-9-1AMQP 0-9-1是一种高级消息协议,它允许特定的客户端应用程序能够与特定的消息代理中间件(messaging middleware brokers)进行消息传输和交流。消息代理中间件和其扮演的角色中间件接收来自于消息发布者(也叫生产者,产生消息的应用程序组)传递的消息并且把它们路由给接收者(也叫消费者,处理消息的应用程序组)由于AMQP 0-9-1是一种网络协议,所以生产者、消费者、和消息代理都可以运行在网络中不同的机器上。 AMQP 0-9-1 模型简介AMQP 0-9-1模型定义了如下规则消息被发送到交换机exchanges,而后交换机利用绑定规则bindings将消息副本发布到消息队列queue,然后消息队列中的消息,要么被传递给了订阅消息队列的消费者,要么消费者按照需要从消息队列拉取fetch/pull消息。 当一条消息发布的时候,发布者可能会指定一些消息属性message attributes(也叫message meta-data消息元数据),其中有一些消息属性是用于消息中间件处理消息,其余的部分则是用于消息消费者对于消息中间件完全透明。 由于网络的不稳定性,消息在传输过程中可能出现失败的情况,鉴于此AMQP 0-9-1提供了一种消息确认机制message acknowledgements: 当一条消息传递给消费者后该消费者发送一条通知notifies给消息中间件来确认消息,无论是自动的还是开发者自己这样做,当消息确认机制使用时,只有当消息代理收到通知后才会将该条或该组消息从消息队列中移除。 在一些特定情况下,比如当消息不能被路由时,消息可能会被返回给消息发布者,或者删除,或者如果消息代理实现了某种扩展插件,则这些无法被路由的消息可能会被放入一个称之为dead letter queue(死亡标记队列)的队列中,发布者可以通过指定一些确定的消息属性 message attributes来响应出现这种情景时消息应该如何被处理。 exchange、bindings、queue均被称为AMQP实体-AMQP entities AMQP是一种可编程的协议AMQP 0-9-1是一种可编程协议,实体和路由规则应该被实现该协议的应用来定义。协议只是为具体操作提供一种处理的规范,具体实现(exchange和queue如何路由,如何定制绑定规则等)应该有具体应用来完成。这种方式给了开发者很大的自由空间,但是也需要他们能意识到可能潜在的定义冲突,实际上定义冲突很少见,大多都由错误的设定引起。 交换机和交换机的类型交换机exchanges是AMQP 0-9-1协议定义的消息应该被送达的实体。交换机拿到消息并且将它路由到一个或者多个队列中去,具体的路由算法由交换机的种类决定。交换机类型和路由规则被统称为(binding)绑定规则(交换机和消息队列间的绑定规则),AMQP 0-9-1代理提供了四种交换机类型 名称默认预定义名称Direct exchange(Empty string) and amq.directFanout exchangeamq.fanoutTopic exchangeamq.topicHeaders exchangeamq.match (and amq.headers in RabbitMQ)除了交换机类型以外,交换机还定义了一些属性,其中最重要的属性如下: Name 名称Durability 持续性(控制交换机能从在重启后恢复)Auto-delete 交换机在最后一条消息出栈之后是否删除Arguments 参数,可选用于插件或者消息代理的一些特殊用途交换机可以是持续性的也可以是瞬态的,持续性的交换机可以在重启后恢复,而瞬态的交换机则需要重新定义。 Default exchange/默认交换机默认交换机是被消息代理预定义的没有名称的直转交换机Direct exchange也有叫直连交换机他有一个特殊属性使得它非常有用,每一个创建的消息队列都会使用队列的名称绑定路由键routing key来和默认交换机绑定。比如当你定义了一个名为search_index_online的消息队列时,队列会使用search_index_online自动绑定到默认交换机,消息发布者可以使用默认的交换机附带search_index_online的路由键来发送消息到消息队列,看起来没有定义任何交换机,像是直接将消息传递给了队列。 Direct exchange/直转交换机/直连交换机直转交换机基于路由键routing key给消息队列转发消息,直转交换机可用于消息单播(尽管实际上可以用于多播消息)直转交换机模式如下 ...

May 18, 2019 · 1 min · jiezi

rabbitmq

一、rabbitmq关键字Binding:Exchange和Exchange、Queue之间的连接关系Queue:实际存储消息。Durability:是否持久化,Durable:是,Transient:否。Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动删除。Message:服务器和应用程序之间传送的数据。本质上就是一段数据,由Properties和Payload(Body)组成。常用属性:delivery mode、headers(自定义属性)Virtual host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue。一、rabbitmq高级特性消息如何保证100%的投递成功?幂等性概念详解在海量订单产生的业务高峰期,如何避免消息的重复消费问题?Confirm确认消息、Return返回消息

May 14, 2019 · 1 min · jiezi

消息队列RabbitMQ如何处理消息丢失

首先明确一点 一条消息的传送流程:生产者->MQ->消费者 所以有三个地方都会丢失数据: 生产者发送给MQ的途中出现网络问题MQ自己没保管好弄丢了消费者拿到数据后出错了没有最终完成任务依次分析 1)生产者弄丢了数据 生产者将数据发送到rabbitmq的时候,可能因为网络问题导致数据就在半路给搞丢了。 1.使用事务(性能差)可以选择用rabbitmq提供的事务功能,在生产者发送数据之前开启rabbitmq事务(channel.txSelect),然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,那么可以提交事务(channel.txCommit)。但是问题是,开始rabbitmq事务机制,基本上吞吐量会下来,因为太耗性能。 2.发送回执确认(推荐)可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。 事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息rabbitmq接收了之后会异步回调你一个接口通知你这个消息接收到了。 所以一般在生产者这块避免数据丢失,都是用confirm机制的。 2)RabbitMQ弄丢了数据-开启RabbitMQ的数据持久化 为了防止rabbitmq自己弄丢了数据,这个你必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是rabbitmq自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。 设置持久化有两个步骤,第一个是创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。 而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。 若生产者那边的confirm机制未开启的情况下,哪怕是你给rabbitmq开启了持久化机制,也有一种可能,就是这个消息写到了rabbitmq中,但是还没来得及持久化到磁盘上,结果不巧,此时rabbitmq挂了,就会导致内存里的一点点数据会丢失。 3)消费端弄丢了数据 主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了比如重启了,那么就尴尬了,RabbitMQ认为你都消费了,这数据就丢了。或者消费者拿到数据之后挂了,这时候需要MQ重新指派另一个消费者去执行任务(一块肉,刚用筷子夹起来,发地震抖了一下,肉掉了) 这个时候得用RabbitMQ提供的ack机制,也是一种处理完成发送回执确认的机制。如果MQ等待一段时间后你没有发送过来处理完成 那么RabbitMQ就认为你还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。

May 9, 2019 · 1 min · jiezi

RabbitMQ的应用场景以及基本原理介绍

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queuing Protocol)的开源实现。 AMQP :高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 一、应用场景异步处理应用解耦流量削峰二、RabbitMQ 特性RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括: - 可靠性(Reliability)RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 - 灵活的路由(Flexible Routing)在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 - 消息集群(Clustering)多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 - 高可用(Highly Available Queues)队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 - 多种协议(Multi-protocol)RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 - 多语言客户端(Many Clients)RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 - 管理界面(Management UI)RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 - 跟踪机制(Tracing)如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 - 插件机制(Plugin System)RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。三、RabbitMQ 基本概念 - Message消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 - Publisher消息的生产者,也是一个向交换器发布消息的客户端应用程序。 - Exchange交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 - Routing Key路由关键字,exchange根据这个关键字进行消息投递。 - Binding绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 - Queue消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 - Connection网络连接,比如一个TCP连接。 - Channel信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 - Consumer消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 - Virtual Host虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 - Broker表示消息队列服务器实体。它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, 四、Exchange 类型Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型: ...

May 9, 2019 · 2 min · jiezi

SpringBoot集成RabbitMQ死信队列

介绍死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false2.队列达到最大长度3.消息TTL过期 场景 1.小时进入初始队列,等待30分钟后进入5分钟队列2.消息等待5分钟后进入执行队列3.执行失败后重新回到5分钟队列4.失败5次后,消息进入2小时队列5.消息等待2小时进入执行队列6.失败5次后,将消息丢弃或做其他处理 使用安装MQ使用docker方式安装,选择带mangement的版本 docker pull rabbitmq:managementdocker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management访问 localhost: 15672,默认账号密码guest/guest 项目配置(1)创建springboot项目(2)在application.properties配置文件中配置mq连接信息 spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest(3)队列配置 package com.df.ps.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.beans.factory.annotation.Autowire;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MqConfig { //time @Value("${spring.df.buffered.min:120}") private int springdfBufferedTime; @Value("${spring.df.high-buffered.min:5}") private int springdfHighBufferedTime; @Value("${spring.df.low-buffered.min:120}") private int springdfLowBufferedTime; // 30min Buffered Queue @Value("${spring.df.queue:spring-df-buffered-queue}") private String springdfBufferedQueue; @Value("${spring.df.topic:spring-df-buffered-topic}") private String springdfBufferedTopic; @Value("${spring.df.route:spring-df-buffered-route}") private String springdfBufferedRouteKey; // 5M Buffered Queue @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}") private String springdfHighBufferedQueue; @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}") private String springdfHighBufferedTopic; @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}") private String springdfHighBufferedRouteKey; // High Queue @Value("${spring.df.high.queue:spring-df-high-queue}") private String springdfHighQueue; @Value("${spring.df.high.topic:spring-df-high-topic}") private String springdfHighTopic; @Value("${spring.df.high.route:spring-df-high-route}") private String springdfHighRouteKey; // 2H Low Buffered Queue @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}") private String springdfLowBufferedQueue; @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}") private String springdfLowBufferedTopic; @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}") private String springdfLowBufferedRouteKey; // Low Queue @Value("${spring.df.low.queue:spring-df-low-queue}") private String springdfLowQueue; @Value("${spring.df.low.topic:spring-df-low-topic}") private String springdfLowTopic; @Value("${spring.df.low.route:spring-df-low-route}") private String springdfLowRouteKey; @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue") Queue springdfBufferedQueue() { int bufferedTime = 1000 * 60 * springdfBufferedTime; return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime); } @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue") Queue springdfHighBufferedQueue() { int highBufferedTime = 1000 * 60 * springdfHighBufferedTime; return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime); } @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue") Queue springdfHighQueue() { return new Queue(springdfHighQueue, true); } @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue") Queue springdfLowBufferedQueue() { int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime; return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime); } @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue") Queue springdfLowQueue() { return new Queue(springdfLowQueue, true); } @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic") TopicExchange springdfBufferedTopic() { return new TopicExchange(springdfBufferedTopic); } @Bean Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) { return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey); } @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic") TopicExchange springdfHighBufferedTopic() { return new TopicExchange(springdfHighBufferedTopic); } @Bean Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) { return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey); } @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic") TopicExchange springdfHighTopic() { return new TopicExchange(springdfHighTopic); } @Bean Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) { return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey); } @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic") TopicExchange springdfLowBufferedTopic() { return new TopicExchange(springdfLowBufferedTopic); } @Bean Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) { return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey); } @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic") TopicExchange springdfLowTopic() { return new TopicExchange(springdfLowTopic); } @Bean Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) { return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey); } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(springdfHighQueue, springdfLowQueue); container.setMessageListener(listenerAdapter); return container; } @Bean MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) { MessageListenerAdapter adapter = new MessageListenerAdapter(receiver); adapter.setDefaultListenerMethod("receive"); Map<String, String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive"); queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); return adapter; } private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", topic); args.put("x-dead-letter-routing-key", routeKey); args.put("x-message-ttl", bufferedTime); // 是否持久化 boolean durable = true; // 仅创建者可以使用的私有队列,断开后自动删除 boolean exclusive = false; // 当所有消费客户端连接断开后,是否自动删除队列 boolean autoDelete = false; return new Queue(queueName, durable, exclusive, autoDelete, args); }}消费者配置package com.df.ps.mq;import com.fasterxml.jackson.databind.ObjectMapper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import java.util.Map;public class MqReceiver { private static Logger logger = LoggerFactory.getLogger(MqReceiver.class); @Value("${high-retry:5}") private int highRetry; @Value("${low-retry:5}") private int lowRetry; @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}") private String springdfHighBufferedTopic; @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}") private String springdfHighBufferedRouteKey; @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}") private String springdfLowBufferedTopic; @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}") private String springdfLowBufferedRouteKey; private final RabbitTemplate rabbitTemplate; @Autowired public MqReceiver(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void receive(Object message) { if (logger.isInfoEnabled()) { logger.info("default receiver: " + message); } } /** * 消息从初始队列进入5分钟的高速缓冲队列 * @param message */ public void highReceiver(Object message){ ObjectMapper mapper = new ObjectMapper(); Map msg = mapper.convertValue(message, Map.class); try{ logger.info("这里做消息处理..."); }catch (Exception e){ int times = msg.get("times") == null ? 0 : (int) msg.get("times"); if (times < highRetry) { msg.put("times", times + 1); rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message); } else { msg.put("times", 0); rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message); } } } /** * 消息从5分钟缓冲队列进入2小时缓冲队列 * @param message */ public void lowReceiver(Object message){ ObjectMapper mapper = new ObjectMapper(); Map msg = mapper.convertValue(message, Map.class); try { logger.info("这里做消息处理..."); }catch (Exception e){ int times = msg.get("times") == null ? 0 : (int) msg.get("times"); if (times < lowRetry) { rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message); }else{ logger.info("消息无法被消费..."); } } }}

April 30, 2019 · 3 min · jiezi

ActiveMQrabbitmq

JMS模型Java消息服务应用程序结构支持两种模型: 点对点或队列模型发布/订阅模型在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:只有一个消费者将获得消息生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。每一个成功处理的消息都由接收者签收 发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息在发布者和订阅者之间存在时间依赖性。发布者需要创建一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者创建了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。 ActiveMQ基本组件:Broker,消息代理,表示消息队列服务器实体,接受客户端连接,提供消息通信的核心服务。Producer,消息生产者,业务的发起方,负责生产消息并传输给 Broker 。Consumer,消息消费者,业务的处理方,负责从 Broker 获取消息并进行业务逻辑处理。Topic,主题,发布订阅模式下的消息统一汇集地,不同生产者向 Topic 发送消息,由 Broker 分发到不同的订阅者,实现消息的广播。Queue,队列,点对点模式下特定生产者向特定队列发送消息,消费者订阅特定队列接收消息并进行业务逻辑处理。Message,消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务 数据,实现消息的传输。connector(broker与client,broker与broker)的协议:vm,tcp,xxxx持久消息的存储:KahaDB(基于文件),journaal+JDBC,你内存呢,levelDb(leveldb+zk作为M-S复制方案)集群:M-S:数据单独HA方案共享+锁获取master/slave切换。leveldb+zk/KahaDB+SAN类似分片:networkConnector。对于订阅,一直在brocker中转发,直到消费。对于queue顺序保证比较困难。https://shift-alt-ctrl.iteye.... AMQPhttp://docs.oasis-open.org/am... TYPE - type system and encodingTransport - AMQP transport layer(全双工可靠递交对等,connection=>多channel的session=>links)Messaging - AMQP Messaging Layer (对消息确认,拒绝,持久化等规定)Transactions - AMQP Transactions Layer(事务提交等)Security - AMQP Security Layers rabbitmq 组件Message消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。Publisher消息的生产者,也是一个向交换器发布消息的客户端应用程序。Exchange交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange 类型Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers(基本不用)Binding绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Queue消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。Connection网络连接,比如一个TCP连接。Channel信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。Consumer消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。Virtual Host虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。Broker表示消息队列服务器实体。交换机:1.直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:1)将一个队列(一个消费者一个队列)绑定到某个交换机上时,赋予该绑定一个绑定键(Binding Key),假设为R;2)当一个携带着路由键(Routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列直连交换机的队列通常是循环分发任务给多个消费者(我们称之为轮询)2.生产者(P)生产消息 1 将消息 1 推送到 Exchange,由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与它绑定 Queue3.topic集群与持久化rabbitmq不支持动态扩展,erlang性能高可以共享 user、vhost、exchange等,所有的数据和状态都是必须在所有节点上复制的,队列例外,只在单个节点而不是所有节点上创建完整的队列信息(元数据、状态、内容,RabbitMQ 2.6.0之后提供了镜像队列以避免集群节点故障导致的队列内容不可用)(猜测:消息的持久应该包含交换机上的和队列中的,发给所有队列后消息应该可删除了,消费后队列中的可删除了)磁盘节点+内存节点,要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入火离开集群时,它们必须要将该变更通知到至少一个磁盘节点,如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。 ...

April 29, 2019 · 1 min · jiezi

RabbitMQ安装教程

一、安装官方Unbuntu下安装文档:http://www.rabbitmq.com/insta… 官方CentOS下安装文档:https://www.rabbitmq.com/inst…主要介绍CentOS下的安装1. 安装 ErlangRabbitMq是用Erlang分布式语言开发的,首先要安装Erlang环境,我使用的是RabbitMQ提供的Erlang源:https://github.com/rabbitmq/e…添加yum源:# In /etc/yum.repos.d/rabbitmq-erlang.repo[rabbitmq-erlang]name=rabbitmq-erlangbaseurl=https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/20/el/7gpgcheck=1gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.ascrepo_gpgcheck=0enabled=1安装yum install erlang2. 安装 RabbitMQ Server# 下载rpm包wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.14/rabbitmq-server-3.7.14-1.el7.noarch.rpm# 安装yum install rabbitmq-server-3.7.14-1.el7.noarch.rpm二、启动RabbitMQ启动$ systemctl start rabbitmq-server #启动$ systemctl restart rabbitmq-server #启动$ systemctl stop rabbitmq-server #关闭$ systemctl status rabbitmq-serverRabbitMQ用户管理$ rabbitmqctl list_users # 查看rabbitmq用户列表,默认用户名密码都为guest$ rabbitmqctl add_user admin1 123456 # 新增用户 $ rabbitmqctl set_user_tags admin1 administrator # 设置用户admin1为管理员级别 # 其他命令rabbitmqctl add_user <username> <password>rabbitmqctl delete_user <username>rabbitmqctl change_password <username> <newpassword>rabbitmqctl clear_password <username>rabbitmqctl authenticate_user <username> <password>rabbitmqctl set_user_tags <username> <tag> …rabbitmqctl list_users其它命令rabbitmqctl status #查看状态web端可视化操作界面# 开启web可视化界面$ rabbitmq-plugins enable rabbitmq_management# 需要重启rabbitmq$ service rabbitmq-server restart浏览器访问:http://127.0.0.1:15672。用户名和密码都是 guest端口修改默认端口: 4369 – erlang发现口 5672 –client端通信口 15672 – 管理界面ui端口 25672 – server间内部通信口 官方介绍:https://www.rabbitmq.com/netw…将配置文档示例:/usr/share/doc/rabbitmq-server-3.7.14/rabbitmq.config.example拷贝到/etc/rabbitmq/rabbitmq.configcp /usr/share/doc/rabbitmq-server-3.7.14/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config添加配置{ listener,[{port,15672}]}三、允许guest用户远程访问rabbitmq从3.3.0开始禁止使用guest/guest权限通过除localhost外的访问,否则会报错:需要在/etc/rabbitmq/rabbitmq.config文件中需要添加{loopback_users, []}配置:[{rabbit, [{loopback_users, []}]}]. ...

April 19, 2019 · 1 min · jiezi

rabbitMQ安装

安装yum源yum list | grep epel-releaseyum -y install epel-release.noarch安装rabbitmq-server#安装yum -y install rabbitmq-server#启动systemctl start rabbitmq-server.service#查看systemctl status rabbitmq-server.service启用管理插件#查看rabbitmq-plugins list#开启管理后台rabbitmq-plugins enable rabbitmq_management#重启systemctl restart rabbitmq-server.service#查看端口netstat -tunlp |grep 15672#防火墙端口放开-A INPUT -p tcp -m state –state NEW -m tcp –dport 15672 -j ACCEPTiptables-save > /etc/sysconfig/iptables登录管理后台账号:guest 密码:guest

April 13, 2019 · 1 min · jiezi

RabbitMQ二三事

RabbitMQ概览RabbitMQ是一个高性能的分布式消息中间件。它由Erlang编写,这种语言天生支持分布式,而且性能极高(但是比较难上手)。通信概念RabbitMQ简单理解就是一个队列服务,我们的生产者不断地往它投递消息,而消费者不断地从它那里获取消息。但相较于利用redis的List这类简单队列,RabbitMQ的消息投递更灵活一点。首先需要知道一些RabbitMQ中的通信概念:Exchange(交换器)Queue(队列)Producer(生产者)Consumer(消费者)RabbitMQ中Exchange类似于一个路由器,我们的Consumer并不会把消息直接投递给队列,而是投递给Exchange,Exchange根据我们投递时的路由键(routing key)再发送到特定的队列。这样的设计让消息可以灵活选路,发送到某一类的队列中,形成一对多的关系,而不仅仅是一对一。Exchange所以说RabbitMQ中的Exchange很方便,很强大,它有这样几种类型:directfanouttopicheaders(几乎用不到)direct交换器很简单,有时候我们仅仅需要一个很简单的队列(消息投递到其中,然后不断消费它),这时候我们就可以用direct交换器,它的规则是:如果路由键匹配,消息就会被投递到对应的队列。fanout交换器忽略路由键,把消息同时发到一批队列。topic则就是根据不同路由键,把消息发送到某一类队列中。

March 28, 2019 · 1 min · jiezi

Springboot 配置RabbitMQ文档

简介 RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗概念: 生产者 消息的产生方,负责将消息推送到消息队列 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息 队列 消息的寄存器,负责存放生产者发送的消息 交换机 负责根据一定规则分发生产者产生的消息 绑定 完成交换机和队列之间的绑定模式: direct:直连模式,用于实例间的任务分发 topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列 headers:适用规则复杂的分发,用headers里的参数表达规则 fanout:分发给所有绑定到该exchange上的队列,忽略routing keySpringBoot集成RabbitMQ 一、引入maven依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.2.RELEASE</version></dependency>二、配置application.properties# rabbitmqspring.rabbitmq.host = dev-mq.a.pa.comspring.rabbitmq.port = 5672spring.rabbitmq.username = adminspring.rabbitmq.password = adminspring.rabbitmq.virtualHost = /message-test/三、编写AmqpConfiguration配置文件package message.test.configuration;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.autoconfigure.amqp.RabbitProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class AmqpConfiguration {/** * 消息编码 */ public static final String MESSAGE_ENCODING = “UTF-8”; public static final String EXCHANGE_ISSUE = “exchange_message_issue”; public static final String QUEUE_ISSUE_USER = “queue_message_issue_user”; public static final String QUEUE_ISSUE_ALL_USER = “queue_message_issue_all_user”; public static final String QUEUE_ISSUE_ALL_DEVICE = “queue_message_issue_all_device”; public static final String QUEUE_ISSUE_CITY = “queue_message_issue_city”; public static final String ROUTING_KEY_ISSUE_USER = “routing_key_message_issue_user”; public static final String ROUTING_KEY_ISSUE_ALL_USER = “routing_key_message_issue_all_user”; public static final String ROUTING_KEY_ISSUE_ALL_DEVICE = “routing_key_message_issue_all_device”; public static final String ROUTING_KEY_ISSUE_CITY = “routing_key_message_issue_city”; public static final String EXCHANGE_PUSH = “exchange_message_push”; public static final String QUEUE_PUSH_RESULT = “queue_message_push_result”; @Autowired private RabbitProperties rabbitProperties; @Bean public Queue issueUserQueue() { return new Queue(QUEUE_ISSUE_USER); } @Bean public Queue issueAllUserQueue() { return new Queue(QUEUE_ISSUE_ALL_USER); } @Bean public Queue issueAllDeviceQueue() { return new Queue(QUEUE_ISSUE_ALL_DEVICE); } @Bean public Queue issueCityQueue() { return new Queue(QUEUE_ISSUE_CITY); } @Bean public Queue pushResultQueue() { return new Queue(QUEUE_PUSH_RESULT); } @Bean public DirectExchange issueExchange() { return new DirectExchange(EXCHANGE_ISSUE); } @Bean public DirectExchange pushExchange() { // 参数1:队列 // 参数2:是否持久化 // 参数3:是否自动删除 return new DirectExchange(EXCHANGE_PUSH, true, true); } @Bean public Binding issueUserQueueBinding(@Qualifier(“issueUserQueue”) Queue queue, @Qualifier(“issueExchange”) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER); } @Bean public Binding issueAllUserQueueBinding(@Qualifier(“issueAllUserQueue”) Queue queue, @Qualifier(“issueExchange”) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER); } @Bean public Binding issueAllDeviceQueueBinding(@Qualifier(“issueAllDeviceQueue”) Queue queue, @Qualifier(“issueExchange”) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE); } @Bean public Binding issueCityQueueBinding(@Qualifier(“issueCityQueue”) Queue queue, @Qualifier(“issueExchange”) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY); } @Bean public Binding pushResultQueueBinding(@Qualifier(“pushResultQueue”) Queue queue, @Qualifier(“pushExchange”) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).withQueueName(); } @Bean public ConnectionFactory defaultConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(rabbitProperties.getHost()); connectionFactory.setPort(rabbitProperties.getPort()); connectionFactory.setUsername(rabbitProperties.getUsername()); connectionFactory.setPassword(rabbitProperties.getPassword()); connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost()); return connectionFactory; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( @Qualifier(“defaultConnectionFactory”) ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public AmqpTemplate rabbitTemplate(@Qualifier(“defaultConnectionFactory”) ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); }}三、编写生产者body = JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING); rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE, AmqpConfiguration.ROUTING_KEY_ISSUE_USER, body);四、编写消费者@RabbitListener(queues = AmqpConfiguration.QUEUE_PUSH_RESULT)public void handlePushResult(@Payload byte[] data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { } ...

March 18, 2019 · 2 min · jiezi

ubuntu16.04下安装rabbitmq

简单记录# 安装erlang$ sudo apt-get install erlang-nox # 查看版本$ erl # 添加公钥$ sudo wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -# 安装 RabbitMQ$ sudo apt-get install rabbitmq-server #安装成功自动启动# 查看 RabbitMq状态$ systemctl status rabbitmq-server# 启动、停止、重启. 到此,rabbitMq已经安装成功。$ sudo service rabbitmq-server start # 启动$ sudo service rabbitmq-server stop # 停止$ sudo service rabbitmq-server restart # 重启 #启用 web端可视化操作界面,我们还需要配置Management Plugin插件 $ rabbitmq-plugins enable rabbitmq_management # 启用插件$ sudo service rabbitmq-server restart # 重启此时,应该可以通过 http://localhost:15672 查看,使用默认账户guest/guest 登录。注意:RabbitMQ 3.3 及后续版本,guest 只能在服务本机登录。# 添加管理员用户$ rabbitmqctl add_user admin yourpassword # 增加普通用户$ rabbitmqctl set_user_tags admin administrator # 给普通用户分配管理员角色 接下来就可以登录了 ...

March 17, 2019 · 1 min · jiezi

PHP+RabbitMQ实现消息队列(代码全篇)

前言先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.php扩展地址: http://pecl.php.net/package/amqp具体以官网为准 http://www.rabbitmq.com/getstarted.html 介绍config.php 配置信息BaseMQ.php MQ基类ProductMQ.php 生产者类ConsumerMQ.php 消费者类Consumer2MQ.php 消费者2(可有多个)config.php <?php return [ //配置 ‘host’ => [ ‘host’ => ‘127.0.0.1’, ‘port’ => ‘5672’, ’login’ => ‘guest’, ‘password’ => ‘guest’, ‘vhost’=>’/’, ], //交换机 ’exchange’=>‘word’, //路由 ‘routes’ => [], ];BaseMQ.php <?php /** * Created by PhpStorm. * User: pc * Date: 2018/12/13 * Time: 14:11 / namespace MyObjSummary\rabbitMQ; /* Member * AMQPChannel * AMQPConnection * AMQPEnvelope * AMQPExchange * AMQPQueue * Class BaseMQ * @package MyObjSummary\rabbitMQ / class BaseMQ { /* MQ Channel * @var \AMQPChannel / public $AMQPChannel ; /* MQ Link * @var \AMQPConnection / public $AMQPConnection ; /* MQ Envelope * @var \AMQPEnvelope / public $AMQPEnvelope ; /* MQ Exchange * @var \AMQPExchange / public $AMQPExchange ; /* MQ Queue * @var \AMQPQueue / public $AMQPQueue ; /* conf * @var / public $conf ; /* exchange * @var / public $exchange ; /* link * BaseMQ constructor. * @throws \AMQPConnectionException / public function __construct() { $conf = require ‘config.php’ ; if(!$conf) throw new \AMQPConnectionException(‘config error!’); $this->conf = $conf[‘host’] ; $this->exchange = $conf[’exchange’] ; $this->AMQPConnection = new \AMQPConnection($this->conf); if (!$this->AMQPConnection->connect()) throw new \AMQPConnectionException(“Cannot connect to the broker!\n”); } /* * close link / public function close() { $this->AMQPConnection->disconnect(); } /* Channel * @return \AMQPChannel * @throws \AMQPConnectionException / public function channel() { if(!$this->AMQPChannel) { $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection); } return $this->AMQPChannel; } /* Exchange * @return \AMQPExchange * @throws \AMQPConnectionException * @throws \AMQPExchangeException / public function exchange() { if(!$this->AMQPExchange) { $this->AMQPExchange = new \AMQPExchange($this->channel()); $this->AMQPExchange->setName($this->exchange); } return $this->AMQPExchange ; } /* queue * @return \AMQPQueue * @throws \AMQPConnectionException * @throws \AMQPQueueException / public function queue() { if(!$this->AMQPQueue) { $this->AMQPQueue = new \AMQPQueue($this->channel()); } return $this->AMQPQueue ; } /* Envelope * @return \AMQPEnvelope / public function envelope() { if(!$this->AMQPEnvelope) { $this->AMQPEnvelope = new \AMQPEnvelope(); } return $this->AMQPEnvelope; } }ProductMQ.php <?php //生产者 P namespace MyObjSummary\rabbitMQ; require ‘BaseMQ.php’; class ProductMQ extends BaseMQ { private $routes = [‘hello’,‘word’]; //路由key /* * ProductMQ constructor. * @throws \AMQPConnectionException / public function __construct() { parent::__construct(); } /* 只控制发送成功 不接受消费者是否收到 * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPExchangeException / public function run() { //频道 $channel = $this->channel(); //创建交换机对象 $ex = $this->exchange(); //消息内容 $message = ‘product message ‘.rand(1,99999); //开始事务 $channel->startTransaction(); $sendEd = true ; foreach ($this->routes as $route) { $sendEd = $ex->publish($message, $route) ; echo “Send Message:”.$sendEd."\n"; } if(!$sendEd) { $channel->rollbackTransaction(); } $channel->commitTransaction(); //提交事务 $this->close(); die ; } } try{ (new ProductMQ())->run(); }catch (\Exception $exception){ var_dump($exception->getMessage()) ; }ConsumerMQ.php <?php //消费者 C namespace MyObjSummary\rabbitMQ; require ‘BaseMQ.php’; class ConsumerMQ extends BaseMQ { private $q_name = ‘hello’; //队列名 private $route = ‘hello’; //路由key /* * ConsumerMQ constructor. * @throws \AMQPConnectionException / public function __construct() { parent::__construct(); } /* 接受消息 如果终止 重连时会有消息 * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPExchangeException * @throws \AMQPQueueException */ public function run() { //创建交换机 $ex = $this->exchange(); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 //echo “Exchange Status:”.$ex->declare()."\n"; //创建队列 $q = $this->queue(); //var_dump($q->declare());exit(); $q->setName($this->q_name); $q->setFlags(AMQP_DURABLE); //持久化 //echo “Message Total:”.$q->declareQueue()."\n"; //绑定交换机与队列,并指定路由键 echo ‘Queue Bind: ‘.$q->bind($this->exchange, $this->route)."\n"; //阻塞模式接收消息 echo “Message:\n”; while(True){ $q->consume(function ($envelope,$queue){ $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }); //$q->consume(‘processMessage’, AMQP_AUTOACK); //自动ACK应答 } $this->close(); } } try{ (new ConsumerMQ)->run(); }catch (\Exception $exception){ var_dump($exception->getMessage()) ; } ...

March 15, 2019 · 3 min · jiezi

Windows RabbitMQ安装

Windows RabbitMQ安装安装ErlangErlang云盘下载地址官网配置环境ERLANG_HOME - D:\dev_tools\erl7.1\erts-7.1path - %ERLANG_HOME%安装RabbitMQRabbitMQ云盘下载地址官网默认端口为15672配置环境激活 RabbitMQ’s Management Plugin可视化打开dos,输入"?D:\dev_tools\RabbitMQ Server\rabbitmq_server-3.6.5\sbin\rabbitmq-plugins.bat" enable rabbitmq_management完成提示The following plugins have been enabled: mochiweb webmachine rabbitmq_web_dispatch amqp_client rabbitmq_management_agent rabbitmq_managementApplying plugin configuration to rabbit@DESKTOP-5J52E61… started 6 plugins.管理员身份启动服务net stop RabbitMQ && net start RabbitMQ创建用户,密码,绑定角色进入安装目录下的sbincd /d D:\dev_tools\RabbitMQ Server\rabbitmq_server-3.6.5\sbin查看已有用户及用户的角色:rabbitmqctl.bat list_users新增用户:rabbitmqctl.bat add_user username password再次查看用户列表D:\dev_tools\RabbitMQ Server\rabbitmq_server-3.6.5\sbin>rabbitmqctl.bat list_usersListing users …rabbit []guest [administrator]将用户rabbit授权为超级管理员rabbitmqctl.bat set_user_tags rabbit administrator更改用户密码:rabbitmqctl change_password userName newPassword删除用户:rabbitmqctl.bat delete_user username访问地址:http://localhost:15672/参考地址

March 12, 2019 · 1 min · jiezi

mac 安装php的amqp扩展失败(mkdir:../pecl:no such file or directory)

RabbitMQ 安装mac上使用homebrew安装rabbitmq,会把Erlang的环境一同装上,首先更新homebrewbrew update然后再brew install rabbitmq在此处直接安装rabbitmq的时候会出现homebrew下载失败的情况,反复尝试多次,并且修改了清华的镜像之后也不管用。需要点击小飞机选择复制终端代理命令,然后打开iterm2 或者自带终端,然后粘贴,敲回车。再执行brew install rabbitmq很快装好RabbitMQ-C 安装rabbitmq-c是一个C语言操作的AMQP客户端库,用来与RabbitMQ broker进行交互直接brew install rabbitmq-c即可,记住rabbitmq-c的安装目录我的是/usr/local/Cellar/rabbitmq-c/0.9.0AMQP扩展安装AMQP扩展是php操作符合AMQP协议的消息队列的扩展,包括rabbitmq等实现了AMQP协议的消息队列wget -c https://pecl.php.net/get/amqp-1.9.4.tgztar -zxcv amqp-1.9.4.tgzcd amqp-1.9.4./configure –with-php-config=/usr/local/Cellar/php@7.2/7.2.14/bin/php-config –with-amqp –with-librabbitmq-dir=/usr/local/Cellar/rabbitmq-c/0.9.0在执行上述./configure的时候需要指定对应php-config目录以及之前记录的rabbitmq-c目录。然后make make installmake install的时候就会出现上述问题mkdir:../pecl:no such file or directory我网上查了好久也没发现具体的原因,这地方,需要修改php-config中的extension_dir 将他修改成对应扩展安装的目录即可,我的是/usr/local/Cellar/php@7.2/7.2.14/lib/php/20170718即修改为如下extension_dir="/usr/local/Cellar/php@7.2/7.2.14/lib/php/20170718"再make install就可以了

March 5, 2019 · 1 min · jiezi

RabbitMQ-镜像队列配置相关

设置policy方法例如我们将满足规则“_mirrored_queue”的队列设置为镜像队列./rabbitmqctl set_policy -p wenda-action mirrored_queue_policy_all “._mirrored_queue” ‘{“ha-mode”:“all”}‘之后申请的所有满足“*_mirrored_queue”规则的队列都是镜像队列tips:通过测试表明,使用policy的正则规则,可以设置镜像队列。如果没有设置policy,在申明队列的时候(queue_declare方法)使用x-ha-policy这个参数设置的镜像队列是无效的。设置了policy正则规则,不使用x-ha-policy参数,只要满足正则规则的queue也会自动变成镜像队列,(许多博客上是说明了用队列的x-ha-policy参数来设置镜像队列,却没有指明需要提前创建好policy规则)。rabbitmq文档说明:镜像队列的同步机制当一个队列申明为镜像队列之后,并且ha-mode:all,集群中所有的节点都会有该队列的镜像,当队列已经有一些数据之后,往集群中加入一个新节点,新节点加入集群之后会自动创建该队列的镜像,但是队列之前是有数据的,这些数据是立马同步到新节点还是手动同步,这时候需要镜像队列的同步机制queue_declare()方法中arg参数中有该配置:ha-sync-mode manual 手动同步 | automatic 自动同步tips:一个队列正在同步的时候,改队列的所有操作都会阻塞,因为各种原因可能会阻塞几分钟、几小时甚至几天。(所以队列同步需谨慎)

February 28, 2019 · 1 min · jiezi

Node.js amqplib 连接 Rabbit MQ 最佳实践

客户端设置connection_name在建立连接时,设置connection_name属性,可以在RabbitMQ Managerment 中查看到连接来自那个实例。amqp.connect(rabbitMqAddress, { clientProperties: { connection_name: ‘your host name’ }})队列属性autoDelete durable如无必要,建议将队列设置成自动删除,这个在TCP连接断开后,队列会自动删除。另外也不要使用持久化队列。channel.assertQueue(queueName, { autoDelete: true, durable: false})connnection 和 channel管理connnection的内存消耗。一个connection至少要使用100kb的内存空间,过多的connetion占用将会导致内存溢出,服务崩溃等问题。(我曾遇到过生产系统,RabbitMQ连接过多,系统一直处于崩溃边缘的险境)重用connection或者channel,而不要重复开关,下面给出AMQP各个阶段所耗费的tcp包,重复的开关,将会导致AMQP connections: 7 TCP packagesAMQP channel: 2 TCP packagesAMQP publish: 1 TCP package (more for larger messages)AMQP close channel: 2 TCP packagesAMQP close connection: 2 TCP packagesTotal 14-19 packages (+ Acks)一个进程,一个channel, 一个connection,如果一个pod向RabbitMQ建立了多条tcp连接,你就要警惕了。监控进程与RabbitMQ TCP连接数,将NodeJS的运行信息写入influxDB,并在grafana做监控和告警。其中tcp连接数可以使用shelljs执行命令获取。function getRabbitMqConnnectionCount (params) { shell.exec(netstat -nt | grep ${rabbitmqHost} | wc -l, (code, stdout, stderr) => { try { if (code === 0) { rabbitMqConnnectionCount = parseInt(stdout) || 0 } } catch (error) { log.error(error.message) } })}参考https://www.cloudamqp.com/blo…https://www.cloudamqp.com/blo...https://www.cloudamqp.com/blo… ...

January 24, 2019 · 1 min · jiezi

paascloud开源项目学习(2) -- centos7下安装SpringCloud+Vue环境

前言github 开源项目–paascloud-master:https://github.com/paascloud/…paascloud-master 官方环境搭建:http://blog.paascloud.net/201…基本环境rzyum install lrzszzip 和 unzipyum install -y unzip zipvimyum -y install vim* Java 环境jdk 8tar.gz包安装,参考:https://www.cnblogs.com/chy12…rpm包安装,参考:https://www.cnblogs.com/zengh…mysql 5.7下载mysql yum源 版本为5.7下载地址:https://dev.mysql.com/downloads/file/?id=470281查看yum源安装mysql版本上面下载后,yum localinstall mysql57-community-release-el7-11.noarch.rpmvim /etc/yum.repos.d/mysql-community.repo # 确定使用的版本,enable设为1yum install -y mysql-community-server启动mysqlsystemctl status mysqld.servicesystemctl start mysqld.service查看mysql密码cat /etc/my.cnf# log-error=/var/log/mysqld.log# pid-file=/var/run/mysqld/mysqld.pidcat /var/log/mysqld.log | grep password登录mysql数据库mysql -u root -p 修改密钥复杂度配置mysql> set global validate_password_policy=0;mysql> set global validate_password_length=6;修改密码mysql> alter user ‘root’@’localhost’ identified by ‘123456’;远程访问权限mysql> GRANT ALL PRIVILEGES ON . TO ‘root’@’%’ IDENTIFIED BY ‘123456’ WITH GRANT OPTION;mysql> flush privileges;mysql 备份参考博客:https://blog.csdn.net/SWPU_Li…crontab 命令:https://www.cnblogs.com/kensh…dockerdocker 在线安装非常慢,不推荐。依次执行下面命令yum remove docker docker-common docker-selinux docker-engineyum install -y yum-utils device-mapper-persistent-data lvm2yum-config-manager –add-repo https://download.docker.com/linux/centos/docker-ce.repoyum-config-manager –enable docker-ce-edgeyum-config-manager –enable docker-ce-testyum-config-manager –disable docker-ce-edgeyum makecache fastyum -y install docker-cesystemctl start dockerdocker run hello-worlddocker imagesREPOSITORY TAG IMAGE ID CREATED SIZEhello-world latest 1815c82652c0 2 months ago 1.84kBdocker 本地安装从官方安装包下载:docker-ce-17.06.0.ce-1.el7.centos.x86_64.rpm。安装yum install /usr/local/src/tool/docker-ce-17.06.0.ce-1.el7.centos.x86_64.rpm -y启动systemctl start docker查看docker版本docker -v开机启动# systemctl enable dockerCreated symlink from /etc/systemd/system/multi-user.target.wants/docker.service to /usr/lib/systemd/system/docker.service.docker 卸载查看已安装的docker安装包yum list installed|grep docker删除上面显示的安装包列表yum –y remove docker.x86_64.XXX删除docker镜像rm -rf /var/lib/dockerredis 4.0.2redis 单机tar 包安装下载,解压,编译:wget http://download.redis.io/releases/redis-4.0.2.tar.gztar xzf redis-4.0.2.tar.gzcd redis-4.0.2make二进制文件是编译完成后在 src 目录下,通过下面的命令启动 Redis 服务:src/redis-server使用内置的客户端命令 redis-cli 进行使用:# src/redis-cliredis> set foo barOKredis> get foo"bar"停止服务:# 第一种:杀死进程PID,kill -9 PIDps aux|grep redis# 第二种src/redis-cli shutdownyum 安装安装,启动yum install epel-releaseyum install redissystemctl start redis.serviceredis-server /etc/redis.confsystemctl enable redis常用配置vi /usr/local/redis-4.0.2/redis.confrequirepass paasword #配置密码# bind 127.0.0.1 #允许远程访问daemonize yes #后台启动自定义配置启动src/redis-server ../redis.confsrc/redis-cli -a paaswordredis 集群参考博客:http://blog.paascloud.net/201…docker 下安装 redisdocker run -d -p 6379:6379 redis:4.0.8 –requirepass “123456"nginx 1.14.X下载对应当前系统版本的 nginx 包wget http://nginx.org/packages/centos/7/noarch/RPMS/nginx-release-centos-7-0.el7.ngx.noarch.rpm建立 nginx 的 yum 仓库rpm -ivh nginx-release-centos-7-0.el7.ngx.noarch.rpm安装 nginxyum -y install nginx启动 nginxsystemctl start nginx版本号nginx -vnginx version: nginx/1.14.1默认配置文件路径/etc/nginx/nginx.confrocketmq 4.2.X主要是搭建集群环境同步双写(2m-2s-sync)参考博客:http://blog.paascloud.net/201…异步复制(2m-2s-async)参考博客:https://blog.csdn.net/weixin_… 注意:如果 broker 启动失败,可能是 runbroker.sh、runserver.sh 里的内存大小设置默认过大。RocketMQ Web管理界面rocketmq 提供多种管理方式,命令行和界面等,apache 提供一个开源的扩展项目: https://github.com/apache/roc… 里面包含一个子项目 rocketmq-console,配置下,打个包就可以用了。或者可以百度搜索一下rocketmq-console.war。具体安装参考博客:https://www.jianshu.com/p/e5b…rabbitmq 3.7.3参考博客:http://blog.paascloud.net/201…zookeeper 3.4.X单机、集群、伪集群:https://www.cnblogs.com/sundd…paascloue 集群环境:http://blog.paascloud.net/201…命令启动 rabbitmq/etc/init.d/rabbitmq-server start # 或 service rabbitmq-service start 启用 RabbitMQWeb 管理插件用户名/密码:guest/guest启动rabbitmq-plugins enable rabbitmq_management 访问(修改为自己 ip):http://192.168.241.101:15672/启动 zookeeper根据上面参考博客1搭建的伪集群,因为配置文件在一个机器上的 zookeeper 目录下,所以启动时对应不同的配置文件。进入zookeeper的 conf目录下cd /root/software/zookeeper-3.4.9/conf启动# 添加了环境变量zkServer.sh start zoo1.cfgzkServer.sh start zoo2.cfgzkServer.sh start zoo3.cfg查看状态zkServer.sh status zoo1.cfgzkServer.sh status zoo2.cfgzkServer.sh status zoo3.cfg启动 zookeeper 图形化界面zookeeper 图形化的客户端工具–ZooInspector,具体使用参考博客:https://blog.csdn.net/qq_2685…。启动 zookeeper 集群后,运行 ZooInspector jar 包,当 paascloud 项目启动后,出现下面效果启动 rocketmq 集群根据上面 rocketmq集群 目录下的第一个参考博客来启动。2m-2s-sync。启动 NameServer A 192.168.241.101nohup sh /usr/local/rocketmq/bin/mqnamesrv &启动 NameServer A 192.168.241.102nohup sh /usr/local/rocketmq/bin/mqnamesrv &启动 BrokerServer A-master 192.168.241.101nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties&启动 BrokerServer A-slave 192.168.241.101nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties&启动 BrokerServer B-master 192.168.241.102nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties&启动 启动BrokerServer B-slave 192.168.241.102nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties&查看日志netstat -ntlpjpstail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log停止服务sh /usr/local/rocketmq/bin/mqshutdown namesrvsh /usr/local/rocketmq/bin/mqshutdown broker清理数据rm -rf /usr/local/rocketmq/data/masterrm -rf /usr/local/rocketmq/data/slavemkdir -p /usr/local/rocketmq/data/master/store/commitlogmkdir -p /usr/local/rocketmq/data/slave/store/commitlogmkdir -p /usr/local/rocketmq/data/master/store/consumequeuemkdir -p /usr/local/rocketmq/data/slave/store/consumequeuemkdir -p /usr/local/rocketmq/data/master/store/indexmkdir -p /usr/local/rocketmq/data/slave/store/indexrocketmq 集群控制台启动解压在tomcat目录,./tomcat/bin/startup.sh 启动即可。访问地址:http://192.168.0.110:8080/roc… ...

January 23, 2019 · 2 min · jiezi

windows环境重装后报错RabbitMQ service is already present 解决

Windows10 下重装rabbitmq时报错误如下:C:UsersAdministrator>rabbitmq-service installRabbitMQ service is already present - only updating service parametersC:Program Fileserlertsbinerlsrv: Warning, could not set correct interactive mode.Error: 句柄无效。—此行有时显示中文乱码C:Program Fileserlertsbinerlsrv: Warning, could not set correct service description (comment)Error: 句柄无效。—此行有时显示中文乱码测试解决方法如下:1、卸载Erlang 和rabbitmq2、运行输入regedit进入注册表 清除 注册表中HKEY_LOCAL_MACHINESOFTWAREEricsson项目3、删除C:UsersAdministrator、C:Windows 、C:WindowsSystem32configsystemprofile等搜索出的所有.erlang.cookie 文件4、管理员权限安装otp_win64_21.2.exe、和rabbitmq-server-3.7.10.exe5、启动 RabbitMQ Command Prompt (sbin dir) 执行命令开启web管理:rabbitmq-plugins enable rabbitmq_management6、重启服务:依次执行 net stop RabbitMQ net start RabbitMQ7、http://localhost:15672/ 访问rabbit

January 13, 2019 · 1 min · jiezi

Socket Error 104 bug

bug概述技术栈nginxuwsgibottle错误详情报警机器人经常有如下警告:<27>1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Socket Error: 104<31>1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Removing timeout for next heartbeat interval<28>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Socket closed when connection was open<31>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Added: {‘callback’: <bound method SelectConnection._on_connection_start of <pika.adapters.select_connection.SelectConnection object at 0x7f74752525d0>>, ‘only’: None, ‘one_shot’: True, ‘arguments’: None, ‘calls’: 1}<28>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Disconnected from RabbitMQ at xx_host:5672 (0): Not specified<31>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Processing 0:_on_connection_closed<31>1 2018-xx-xxT06:59:03.040Z 660ece0ebaad admin/admin 14 - - Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f74752513f8>> for “0:_on_connection_closed"debug过程确定报错位置有日志就很好办, 首先看日志在哪里打的. 从三个地方入手.我们自己的代码没有.uwsgi的代码root@660ece0ebaad:/# uwsgi –version2.0.14从github上co下来, 没有.python library的代码在容器中执行>>> import sys>>> sys.path[’’, ‘/usr/lib/python2.7’, ‘/usr/lib/python2.7/plat-x86_64-linux-gnu’, ‘/usr/lib/python2.7/lib-tk’, ‘/usr/lib/python2.7/lib-old’, ‘/usr/lib/python2.7/lib-dynload’, ‘/usr/local/lib/python2.7/dist-packages’, ‘/usr/lib/python2.7/dist-packages’, ‘/usr/lib/python2.7/dist-packages/PILcompat’, ‘/usr/lib/python2.7/dist-packages/gtk-2.0’]在这些目录下grep, 在pika中找到root@660ece0ebaad:/usr/local/lib/python2.7# grep “Socket Error” -R .Binary file ./dist-packages/pika/adapters/base_connection.pyc matches./dist-packages/pika/adapters/base_connection.py: LOGGER.error(“Fatal Socket Error: %r”, error_value)./dist-packages/pika/adapters/base_connection.py: LOGGER.error(“Socket Error: %s”, error_code)确定pika版本.>>> import pika>>> pika.version‘0.10.0’确定错误逻辑通过代码可以看到, Socket Error是errno的错误码, 确定错误码含义是对端发送了RST.>>> import errno>>> errno.errorcode[104]‘ECONNRESET’怀疑rabbitmq server地址错误, 一个未listen的端口是会返回RST的, 验证后发现不是.接着怀疑链接超时断开未通知客户端之类. 看rabbitmq server日志, 发现大量:=ERROR REPORT==== 7-Dec-2018::20:43:18 ===closing AMQP connection <0.9753.18> (172.17.0.19:27542 -> 192.168.44.112:5672):missed heartbeats from client, timeout: 60s–=ERROR REPORT==== 7-Dec-2018::20:43:18 ===closing AMQP connection <0.9768.18> (172.17.0.19:27544 -> 192.168.44.112:5672):missed heartbeats from client, timeout: 60s发现rabbitmq server和 admin docker的链接已经全部断开root@xxxxxxx:/home/dingxinglong# netstat -nap | grep 5672 | grep “172.17.0.19"那么, 为什么rabbitmq server会踢掉 pika建立的链接呢? 看pika代码注释: :param int heartbeat_interval: How often to send heartbeats. Min between this value and server’s proposal will be used. Use 0 to deactivate heartbeats and None to accept server’s proposal.我们没有传入心跳间隔, 理论上应该使用服务端默认的60S. 实际上, 客户端从来没有发出过心跳包. 于是继续看代码:通过打印, 确认了HeartbeatChecker对象成功创建, 也成功地创建了timer, 但是timer从来没有回调过.从代码一路跟下去, 我们用的是blocking_connections, 在其add_timeout注释中看到:def add_timeout(self, deadline, callback_method): “““Create a single-shot timer to fire after deadline seconds. Do not confuse with Tornado’s timeout where you pass in the time you want to have your callback called. Only pass in the seconds until it’s to be called. NOTE: the timer callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events and BlockingChannel.start_consuming. :param float deadline: The number of seconds to wait to call callback :param callable callback_method: The callback method with the signature callback_method()timer的触发要靠process_data_events, 而我们没有调用. 所以客户端的heartbeat从来没被触发. 简单地将heartbeat关掉以解决这个问题.具体触发点调用代码如下: 没有跑main_loop, 故, 没处理 rabbitmq_server的FIN包, 无法跟踪链接状态.一路跟踪basic_publish接口的代码.在发送时, 收到RST, 最终跑到 base_connection.py:452, _handle_error函数中打印socket_error.def connect_mq(): mq_conf = xxxxx connection = pika.BlockingConnection( pika.ConnectionParameters(mq_conf[‘host’], int(mq_conf[‘port’]), mq_conf[‘path’], pika.PlainCredentials(mq_conf[‘user’], mq_conf[‘pwd’]), heartbeat_interval=0)) channel = connection.channel() channel.exchange_declare(exchange=xxxxx, type=‘direct’, durable=True) return channelchannel = connect_mq()def notify_xxxxx(): global channel def _publish(product): channel.basic_publish(exchange=xxxxx, routing_key=‘xxxxx’, body=json.dumps({‘msg’: ‘xxxxx’})) ...

January 12, 2019 · 2 min · jiezi

Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)

应用场景我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次。然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送。对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理。为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用。RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍以下如何利用Spring Cloud Stream以及RabbitMQ轻松的处理上述问题。动手试试插件安装关于RabbitMQ延迟消息的插件介绍可以查看官方网站:https://www.rabbitmq.com/blog…安装方式很简单,只需要在这个页面:http://www.rabbitmq.com/commu… 中找到rabbitmq_delayed_message_exchange插件,根据您使用的RabbitMQ版本选择对应的插件版本下载即可。注意:只有RabbitMQ 3.6.x以上才支持在下载好之后,解压得到.ez结尾的插件包,将其复制到RabbitMQ安装目录下的plugins文件夹。然后通过命令行启用该插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange该插件在通过上述命令启用后就可以直接使用,不需要重启。另外,如果您没有启用该插件,您可能为遇到类似这样的错误:ERROR 156 — [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type ‘x-delayed-message’, class-id=40, method-id=10)应用编码下面通过编写一个简单的例子来具体体会一下这个属性的用法:@EnableBinding(TestApplication.TestTopic.class)@SpringBootApplicationpublic class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } @Slf4j @RestController static class TestController { @Autowired private TestTopic testTopic; /** * 消息生产接口 * * @param message * @return / @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { log.info(“Send: " + message); testTopic.output().send(MessageBuilder.withPayload(message).setHeader(“x-delay”, 5000).build()); return “ok”; } } /* * 消息消费逻辑 */ @Slf4j @Component static class TestListener { @StreamListener(TestTopic.INPUT) public void receive(String payload) { log.info(“Received: " + payload); } } interface TestTopic { String OUTPUT = “example-topic-output”; String INPUT = “example-topic-input”; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); }}内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了一条消息,一条消息的头信息中包含了x-delay字段,该字段用来指定消息延迟的时间,单位为毫秒。所以上述代码发送的消息会在5秒之后被消费。在消息监听类TestListener中,对TestTopic.INPUT通道定义了@StreamListener,这里会对延迟消息做具体的逻辑。由于消息的消费是延迟的,从而变相实现了从消息发送那一刻起开始的定时任务。在启动应用之前,还要需要做一些必要的配置,下面分消息生产端和消费端做说明:消息生产端spring.cloud.stream.bindings.example-topic-output.destination=delay-topicspring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true注意这里的一个新参数spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange,用来开启延迟消息的功能,这样在创建exchange的时候,会将其设置为具有延迟特性的exchange,也就是用到上面我们安装的延迟消息插件的功能。消息消费端spring.cloud.stream.bindings.example-topic-input.destination=delay-topicspring.cloud.stream.bindings.example-topic-input.group=testspring.cloud.stream.rabbit.bindings.example-topic-input.consumer.delayed-exchange=true在消费端也一样,需要设置spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true。如果该参数不设置,将会出现类似下面的错误:ERROR 9340 — [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg ’type’ for exchange ‘delay-topic’ in vhost ‘/’: received ’topic’ but current is ‘‘x-delayed-message’’, class-id=40, method-id=10)完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:2019-01-02 23:28:45.318 INFO 96164 — [ctor-http-nio-3] c.d.s.TestApplication$TestController : Send: hello2019-01-02 23:28:45.328 INFO 96164 — [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]2019-01-02 23:28:45.333 INFO 96164 — [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#5c5f9a03:0/SimpleConnection@3278a728 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 53536]2019-01-02 23:28:50.349 INFO 96164 — [ay-topic.test-1] c.d.stream.TestApplication$TestListener : Received: hello从日志中可以看到,Send: hello和Received: hello两条输出之间间隔了5秒,符合我们上面编码设置的延迟时间。深入思考在代码层面已经完成了定时任务,那么我们如何查看延迟的消息数等信息呢?此时,我们可以打开RabbitMQ的Web控制台,首先可以进入Exchanges页面,看看这个特殊exchange,具体如下:可以看到,这个exchange的Type类型是x-delayed-message。点击该exchange的名称,进入详细页面,就可以看到更多具体信息了:代码示例本文示例读者可以通过查看下面仓库的中的stream-delayed-message项目:GithubGitee如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!以下专题教程也许您会有兴趣Spring Boot基础教程Spring Cloud基础教程本文首发于我的独立博客:http://blog.didispace.com/spr… ...

January 4, 2019 · 1 min · jiezi

reactor-rabbitmq小试牛刀

序本文主要研究一下如何使用reactor-rabbitmqmaven <dependency> <groupId>io.projectreactor.rabbitmq</groupId> <artifactId>reactor-rabbitmq</artifactId> <version>1.0.0.M2</version> </dependency>rabbitmq参考docker搭建rabbitmq集群当前使用的镜像是bijukunjummen/rabbitmq-server:3.7.0,docker-compose文件配置的账号密码为myuser/mypass访问http://192.168.99.100:15672可以查看界面实例 @Test public void testProducer() throws InterruptedException { int count = 100; ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.useNio(); connectionFactory.setUsername(“myuser”); connectionFactory.setPassword(“mypass”); SenderOptions senderOptions = new SenderOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address(“192.168.99.100”,5672), new Address(“192.168.99.100”,5673), new Address(“192.168.99.100”,5674)}, “reactive-sender”)) .resourceCreationScheduler(Schedulers.elastic()); Sender sender = ReactorRabbitMq.createSender(senderOptions); Flux<OutboundMessageResult> confirmations = sender.sendWithPublishConfirms(Flux.range(1, count) .map(i -> new OutboundMessage("", QUEUE, (“Message_” + i).getBytes()))); CountDownLatch latch = new CountDownLatch(count); sender.declareQueue(QueueSpecification.queue(QUEUE)) .thenMany(confirmations) .doOnError(e -> LOGGER.error(“Send failed”, e)) .subscribe(r -> { if (r.isAck()) { LOGGER.info(“Message {} sent successfully”, new String(r.getOutboundMessage().getBody())); latch.countDown(); } }); latch.await(10, TimeUnit.SECONDS); sender.close(); } @Test public void testConsumer() throws InterruptedException { int count = 100; CountDownLatch latch = new CountDownLatch(count); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.useNio(); connectionFactory.setUsername(“myuser”); connectionFactory.setPassword(“mypass”); SenderOptions senderOptions = new SenderOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address(“192.168.99.100”,5672), new Address(“192.168.99.100”,5673), new Address(“192.168.99.100”,5674)}, “reactive-sender”)) .resourceCreationScheduler(Schedulers.elastic()); Sender sender = ReactorRabbitMq.createSender(senderOptions); Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE)); ReceiverOptions receiverOptions = new ReceiverOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address(“192.168.99.100”,5672), new Address(“192.168.99.100”,5673), new Address(“192.168.99.100”,5674)}, “reactive-receiver”)) .connectionSubscriptionScheduler(Schedulers.elastic()); Receiver receiver = ReactorRabbitMq.createReceiver(receiverOptions); Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE); Disposable disposable = queueDeclaration.thenMany(messages).subscribe(m -> { LOGGER.info(“Received message {}”, new String(m.getBody())); latch.countDown(); }); latch.await(10, TimeUnit.SECONDS); disposable.dispose(); sender.close(); receiver.close(); }由于设置了账号密码,因而需要在ConnectionFactory那里指定账号密码另外由于使用了rabbitmq集群,因而通过connectionSupplier指定要连接的多个rabbitmq地址这里不管是producer还是consumer,都通过queueDeclaration进行操作小结reactor-rabbitmq对rabbitmq的api进行封装,改造为reactive streams模式,提供了Non-blocking Back-pressure以及End-to-end Reactive Pipeline特性。docreactor-rabbitmq-samplesReactor RabbitMQ Reference Guide ...

October 7, 2018 · 1 min · jiezi

rabbitMq常用创建消息应用的maven demo项目(一)---路由routing

rabbitmq官网上提供了6个demo,分别从是hello world、工作队列、发布/订阅、路由、主题、rpc这六个demo。基本上看完这6哥demo之后,对rabbitmq应该就有了清晰的认识,并且可以达到基本数量应用的程度。下面我挑选最常用的路由和主题这两个demo,为大家翻译下。个人加谷歌翻译,有不合适的地方,欢迎大家批评指正。Routing—路由在之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给许多接收者。在本教程中,我们将在他的基础上添加一个功能 - 只订阅一部分消息。例如,我们只将严重错误的消息导入日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。 ...

May 14, 2018 · 2 min · jiezi