关于mq:阿里二面MQ-消息积压了增加消费者有用吗

大家好,我是不才陈某~ 明天分享一道有意思的面试题。 面试官:RocketMQ 音讯积压了,减少消费者有用吗? 我:这个要看具体的场景,不同的场景下状况是不一样的。 面试官:能够具体说一下吗? 我:如果消费者的数量小于 MessageQueue 的数量,减少消费者能够放慢音讯生产速度,缩小音讯积压。比方一个 Topic 有 4 个 MessageQueue,2 个消费者进行生产,如果减少一个消费者,明细能够放慢拉取音讯的频率。如下图: 关注公众号:码猿技术专栏,回复关键词:1111 获取阿里外部Java性能调优手册! 如果消费者的数量大于等于 MessageQueue 的数量,减少消费者是没有用的。比方一个 Topic 有 4 个 MessageQueue,并且有 4 个消费者进行生产。如下图 面试官:你说的第一种状况,减少消费者肯定能放慢音讯生产的速度吗? 我:这...,个别状况下是能够的。 面试官:有非凡的状况吗? 我:当然有。消费者音讯拉取的速度也取决于本地音讯的生产速度,如果本地音讯生产的慢,就会提早一段时间后再去拉取。 面试官:在什么状况下消费者会提早一段时间后后再去拉取呢? 我:消费者拉取的音讯存在 ProcessQueue,消费者是有流量管制的,如果呈现上面三种状况,就不会被动去拉取: ProcessQueue 保留的音讯数量超过阈值(默认 1000,能够配置);ProcessQueue 保留的音讯大小超过阈值(默认 100M,能够配置);对于非程序生产的场景,ProcessQueue 中保留的最初一条和第一条音讯偏移量之差超过阈值(默认 2000,能够配置)。这部分源码请参考类:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl。面试官:还有其余状况吗? 我:对于程序生产的场景,ProcessQueue 加锁失败,也会提早拉取,这个延迟时间是 3s。 面试官:消费者提早拉取音讯,个别可能是什么起因导致的呢? 我:其实提早拉取的实质就是消费者生产慢,导致下次去拉取的时候 ProcessQueue 中积压的音讯超过阈值。以上面这张架构图为例: 消费者生产慢,可是能上面的起因: 消费者解决的业务逻辑简单,耗时很长;消费者有慢查问,或者数据库负载高导致响应慢;缓存等中间件响应慢,比方 Redis 响应慢;调用内部服务接口响应慢。面试官:对于内部接口响应慢的状况,有什么应答措施吗? 我:这个要分状况探讨。 如果调用内部零碎只是一个告诉,或者调用内部接口的后果并不解决,能够采纳异步的形式,异步逻辑里采纳重试的形式保障接口调胜利。 如果内部接口返回后果必须要解决,能够思考接口返回的后果是否能够缓存默认值(要思考业务可行),在调用失败后采纳疾速降级的形式,应用默认值代替返回接口返回值。 如果这个接口返回后果必须要解决,并且不能缓存,能够把拉取到的音讯存入本地而后给 Broker 间接返回 CONSUME_SUCCESS。等内部零碎恢复正常后再从本地取出来进行解决。 面试官:如果消费者数小于 MessageQueue 数量,并且内部零碎响应失常,为了疾速生产积压音讯而减少消费者,有什么须要思考的吗? 我:内部零碎尽管响应失常,然而减少多个消费者后,内部零碎的接口调用量会突增,如果达到吞吐量下限,内部零碎会响应变慢,甚至被打挂。 同时也要思考本地数据库、缓存的压力,如果数据库响应变慢,解决音讯的速度就会变慢,起不到缓解音讯积压的作用。 面试官:新减少了消费者后,怎么给它调配 MessageQueue 呢? ...

June 13, 2023 · 2 min · jiezi

关于mq:消息中间件基础知识

by zhimaxingzhe from 消息中间件基础知识 欢送分享链接,转载请注明出处,尊重版权,若急用请分割受权。 https://zhimaxingzhe.github.io前言本文梳理笔者的MQ常识,从消息中间件的基础知识讲起,在有了基础知识后,对市面上各支流的消息中间件进行具体的解析,包含 RabbitMQ、RocketMQ、Kafka、Pulsar,最初再横向比照这几款支流的消息中间件。 介绍MQ的文章网上千千万,最好的学习路径还是官网文档,文中介绍的这几款MQ都在致力推广本人,所以文档在权威性、全面性、专业性、时效性都是无人能及其左右,当初的官网文档甚至本人做竞品比对,比方RocketMQ就本人放了比对表格在首页。所以要学好哪一款MQ,就去看它的官网吧,地址放在文末参考资料中了。 最好的学习办法是带着问题去寻找答案,以费曼学习法为规范,产出可教学的材料,所以本文多是集体的所学梳理和所想记录,集体只是无限,不免有所疏漏,文中有谬误和疏漏请不吝赐教,感激!若有帮忙,请一键三连吧。文中许多图片内容是随笔摘抄,若有触犯,侵删。 消息中间件的倒退曾经有近40年历史,早在上个世纪80年代就诞生了第一款音讯队列 The Information Bus。 到90年代 IBM、Oracle、Microsoft 纷纷推出自家的MQ,但都是免费且闭源的产品,次要面向高端的企业用户,这些MQ个别都采纳高端硬件,软硬件一体机交付,须要洽购专门的保护服务,MQ自身的架构是单机的架构,用户的自主性较差。 进入新世纪后,随着技术成熟,人们开始探讨MQ的协定,诞生了JMS、AMPQ 两大协定规范,随之别离有 ActiveMQ、RabbitMQ的具体实现,并且是开源共建的,这使得这两款MQ在过后迅速风行开来,MQ的应用门槛也随之升高,越来越多零碎融入了MQ作为根底能力。 再起初PC互联网、挪动互联网的爆发式倒退,因为传统的音讯队列无奈接受亿级用户的拜访流量和海量数据传输,诞生了互联网消息中间件,外围能力是全面采纳分布式架构、具备很强的横向扩大能力,开源典型代表有 Kafka、RocketMQ、Pulsar。Kafka 的诞生还将消息中间件从Messaging畛域延长到了 Streaming 畛域,从分布式应用的异步解耦场景延长到大数据畛域的流存储和流计算场景。Pulsar 更是在 Kafka 之后集大家之成,在企业级利用上做得更好,存储和计算拆散的设计使得拓展更加轻松。 现在,IoT、云计算、云原生引领了新的技术趋势。面向IoT的场景,音讯队列开始从云内服务端利用通信,延长到边缘机房和物联网终端设备,反对MQTT等物联网标准协议也成了各大音讯队列的标配,咱们看到Pulsar、Kafka、RocketMQ 都在致力追随时代步调,拓展本人在各种应用场景下的能力。 1、消息中间件的定义在早些年MQ始终被叫做音讯队列,就能够定义为传递音讯的容器,随着时代的倒退,MQ 都在致力拓展进去越来越多的性能,越来越多需要加在MQ纸上,消息中间件的能力越来越强,利用的场景也越来越多,如果非要用一个定义来概括只能是形象进去一些概念,概括为跨服务之间传递信息的软件。 2、用处异步解决能够把接口申请依据业务的时效性水平,将不紧急的解决逻辑生成音讯、事件放到MQ当中,再由专门的零碎解决该音讯、事件;如日志上报、归档事件、数据推送、数据分析、触发策略、变更举荐、增加积分、发送告诉音讯等。 削峰填谷作为零碎外部的一个音讯池,抵制洪峰,对后端服务起到爱护作用。流量洪峰进来的时候,会转换为音讯落到MQ当中,后端服务能够依据本人的解决能力来,流量不会间接冲击到后端服务,特地是落库、IO等操作。 服务解耦缩小零碎、模块之间间接对接带来的耦合,交互对立按MQ中音讯的协定,按需生产和生产,耦合水平大大降低。 公布订阅零碎产生的行为不须要通过接口等形式来告诉到相干服务,只须要公布一次音讯,订阅者都能生产到音讯,执行服务本身的本职工作。 当然,所有收益都是有代价的,对于零碎架构自身来说,会引入新组件,带来零碎复杂度的晋升,整体零碎的可靠性也会是挑战,减少消息中间件的运维老本,还会带来整体零碎一致性的问题。所以须要衡量本身零碎是否有必要引入MQ,能解决什么痛点,投入产出是否让组织称心,对于自身流量不大的零碎来说,放弃简略架构是大快人心的事件,毕竟,越简略越稳固,越耐用。 3、音讯模型队列模型 一种是音讯队列,生产者往队列写音讯,消费者从这个队列生产音讯,当然生产者能够是多个,消费者也能够是多个,然而一条音讯只能被生产一次,具体怎么做的,这就波及到具体的应用需要和每一款消息中间件的实现了,前面第二局部的时候会波及到。这是最早的音讯模型,这也是为什么音讯队列MQ这个名字也始终有人在用吧。 订阅模型 起初上个世纪80年代有人提出公布订阅模式,就是topic模式,生产者公布的音讯,消息中间件会把音讯投递给每一个订阅者,这个投递的过程有可能是推也可能是拉,反对哪一种也要看每一款的具体实现。 4、音讯协定从音讯的生命周期来看,音讯的生产、存储、生产的整个过程来标准步骤要达到的规范。比方金融级别的音讯协定会要求保障音讯生产过程中不丢、不反复,存储过程中也能有持久性、一致性的要求,在生产过程中保障音讯正确被生产,如不反复、不错位等。 常见的音讯协定: 接下来举例 AMPQ 协定的生产、生产过程规范。 AMQP 协定高级音讯队列协定(Advanced Message Queuing Protocol),一个提供对立音讯服务的应用层规范高级音讯队列协定,是应用层协定的一个凋谢规范,为面向音讯的中间件设计。基于此协定的客户端与消息中间件可传递音讯,并不受客户端/中间件不同产品,不同的开发语言等条件的限度。 生产音讯生产音讯 MQTT 协定MQTT(音讯队列遥测传输)是ISO 规范(ISO/IEC PRF 20922)下基于公布/订阅范式的音讯协定。它工作在 TCP/IP协定族上,是为硬件性能低下的近程设施以及网络情况蹩脚的状况下而设计的公布/订阅型音讯协定。 MQTT协定是轻量、简略、凋谢和易于实现的,这些特点使它适用范围十分宽泛。在很多状况下,包含受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶然拨号的医疗设施、智能家居、及一些小型化设施中已宽泛应用。 其余协定另外还有STOMP、OpenMessaging等,这里不做开展。以后市面上支流的消息中间件多是有自定义的协定倒退起来的,如Kafka 在最开始并不算是一个消息中间件,而是用于日志记录零碎的一部分,所以并不是基于某种中间件音讯协定来做的,而是基于TCP/IP,依据自定义的音讯格局,来传递日志音讯,为满足对于音讯失落是有肯定容忍度的;在起初逐渐倒退到能够反对正好一次(Exactly Once)语义,实际上是通过 At Least Once + 幂等性 = Exactly Once 。 ...

February 1, 2023 · 3 min · jiezi

关于mq:关于mq重复消息

producer反复发送, RocketMQ的broker是不论的, Kafka开启幂等后, 每个音讯都会带一个"过程pid+topic+自增序列号"的惟一标识, 重试的音讯的惟一标识是雷同的.consumer反复生产, 能够用bitmap来做去重.前提是mq必须有自定义路由, 让同一个音讯发到同一个队列, 反复的音讯会在同一个队列. 一个consumer可能对应多个队列(或分区),那么就须要用一个map或list来别离保留各个分区的bitmap,因为bitmap是数组下标来判断是否曾经生产, 所以音讯的惟一id要是数字, 如果是字符+数字, 也得是枚举类型的字符, 这样就只能用map来对枚举字段进行维度划分. 这样搞有个问题, 就是rebalance的时候, 这种计划可能还是会产生反复生产, 解决方案就是监听rebalance, rebalance之后, 就把本人的bitmap播送进来(能够用zk,redis,也能够发一个mq), consumer会取到rebalance之后本人负责的队列的bitmap

August 25, 2022 · 1 min · jiezi

关于mq:简单易用的任务队列beanstalkd

更多技术文章,请关注我的集体博客 www.immaxfang.com 和小公众号 Max的学习札记。概述beanstalkd 是一个简略疾速的分布式工作队列零碎,协定基于 ASCII 编码运行在 TCP 上。其最后设计的目标是通过后盾异步执行耗时工作的形式升高高容量 Web 利用的页面延时。其具备简略、轻量、易用等特点,也反对对工作优先级、延时/超时重发等管制,同时还有泛滥语言版本的客户端反对,这些长处使得它成为各种须要队列零碎场景的一种常见抉择。 beanstalkd 长处如他官网的介绍,simple&fast,应用非常简单,适宜须要引入音讯队列又不想引入 kafka 这类重型的 mq,保护成本低;同时,它的性能十分高,大部分场景下都能够 cover 住。反对长久化反对音讯优先级,topic,延时音讯,音讯重试等支流语言客户端都反对,还能够依据 beanstalkd 协定自行实现。beanstalkd 有余无最大内存管制,当业务音讯极多时,服务可能会不稳固。官网没有提供集群故障切换计划(主从或哨兵等),须要本人解决。beanstalkd 重点概念job工作,队列中的根本单元,每个 job 都会有 id 和优先级。有点相似其余音讯队列中的 message 的概念。但 job 有各种状态,下文介绍生命周期局部会重点介绍。job 寄存在 tube 中。 tube管道,用来存储同一类型的 job。有点相似其余音讯队列中的 topic 的概念。beanstalkd 通过 tube 来实现多任务队列,beanstalkd 中能够有多个管道,每个管道有本人的 producer 和 consumer,管道之间相互不影响。 producerjob 生产者。通过 put 命令将一个 job 放入到一个 tube 中。 consumerjob 消费者。通过 reserve 来获取 job,通过 delete、release、bury 来扭转 job 的状态。 beanstalkd 生命周期上文介绍到,beanstalkd 中 job 有状态辨别,在整个生命周期中,job 可能有四种状态:READY, RESERVED, DELAYED, BURIED。只有处于READY状态的 job 能力被生产。下图介绍了各状态之间的流转状况。 ...

July 21, 2022 · 3 min · jiezi

关于mq:python3调用rocket-mq

installhttps://github.com/apache/roc... wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm Installationpip install rocketmq-client-pythonProducerfrom rocketmq.client import Producer, Messageproducer = Producer('test-litx')producer.set_name_server_address('rxxrocketmq-namesrv.sit.dexxxxm:9876')producer.start()msg = Message('itworkspace')msg.set_keys('status')msg.set_tags('json')msg.set_body('{"alertname": "test666666666","building": "t11111tttt"}')ret = producer.send_sync(msg)print(ret.status, ret.msg_id, ret.offset)producer.shutdown() PushConsumerimport timefrom rocketmq.client import PushConsumer, ConsumeStatusdef callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESSconsumer = PushConsumer('itworkxxxce-consumer')consumer.set_name_server_address('red-rocketmq-namesrv.sit.devops.xixxxu.com:9876')consumer.subscribe('itwxxce', callback)consumer.start()time.sleep(3)consumer.shutdown()

July 8, 2022 · 1 min · jiezi

关于mq:系统学习消息队列RabbitMQ的消息发布确认

1.MQ发送信息的时候产生的问题2.MQ的公布确认原理3.MQ的公布确认策略 1.MQ发送信息的时候产生的问题 咱们在前一篇博客零碎学习音讯队列——RabbitMQ的音讯应答和长久化中学过,当消费者挂掉的时候,有音讯重发,当队列挂掉的时候,有音讯长久化,然而咱们却无奈保障生产者发送到队列的音讯是否确定发送胜利,这个时候就有了音讯的公布确认。 2.MQ的公布确认原理当咱们的信道被设置成公布确认(confirm)模式,那么所有在该信道下面公布的音讯都会被指派一个惟一的ID,一旦音讯胜利投递,broker就会发送一个确认给生产者,生产者此时就晓得音讯曾经投递胜利,生产者就会把这条音讯进行删除。 confirm模式能够是同步的,也能够是异步的,同步的状况下是发送之后马上进行确认,异步的话生产者能够无需期待确认只管发送音讯,如果某些音讯失去确认,生产者将就能够通过回调办法来确认音讯。 3.MQ的公布确认策略 3.1)开启确认公布公布确认模式默认是没有开启的,咱们须要调用办法将它关上。 Channel channel = connection.createChannel(); //开启公布确认 channel.confirmSelect();3.2)单个确认公布 这是一种简略的同步确认形式,发送一条音讯,确认一条音讯,后续的音讯能力持续发送。 长处:简略易懂。毛病:公布速度过慢,如果后面的音讯没有失去确认,前面的音讯就不得发送,容易阻塞。 public class ProducerSingle { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开 try (Channel channel = RabbitMqUtils.getChannel()) { /** * 申明一个队列 * 1.队列名称 * 2.队列外面的音讯是否长久化 默认音讯存储在内存中 * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产 * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除 * 5.其余参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //开始工夫 long begin = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message = i + ""; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //确认是否发送胜利,服务端返回 false 或超时工夫内未返回,生产者能够音讯重发 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("音讯发送胜利"); } } //发送完结工夫 long end = System.currentTimeMillis(); System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms"); } }}3.3)批量确认公布单个确认公布的速度十分慢,其实咱们能够先发送一批,而后确认一批,再公布一批。长处:比单个确认公布速度快,吞吐量大。毛病:当其中一个音讯出问题的时候,不晓得是哪个音讯呈现了问题,咱们必须将整个批处理音讯保留在内存里,以记录重要的音讯后从新公布音讯。这种办法也是阻塞的,一样阻塞音讯的公布。 ...

June 15, 2022 · 3 min · jiezi

关于mq:系统学习消息队列RabbitMQ的消息应答和持久化

1.MQ的Hellow World程序2.MQ的音讯应答3.MQ的长久化 1.MQ的Hellow World程序依据上一篇文章零碎学习音讯队列——RabbitMQ的根底概念,咱们学习了mq的基础理论,实践诚然重要,实际也尤其重要,咱们先来编写一个RabbitMq的Hello World程序,来感受一下队列。 在这里咱们创立一个音讯生产者,两个音讯消费者,轮询进行散发音讯。 pom: <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency>连贯工具类: import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * @author sulingfeng * @title: RabbitMqUtils * @date 2022/6/14 9:33 */public class RabbitMqUtils { //失去一个连贯的工具类 public static Channel getChannel() throws Exception { //创立一个连贯工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; }}生产者: import com.rabbitmq.client.Channel;import java.util.Scanner;/** * @author sulingfeng * @title: Producer * @date 2022/6/13 19:55 */public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开 try (Channel channel = RabbitMqUtils.getChannel()) { /** * 申明一个队列 * 1.队列名称 * 2.队列外面的音讯是否长久化 默认音讯存储在内存中 * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产 * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除 * 5.其余参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); //从控制台当中承受信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); /** * 发送一个音讯 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其余的参数信息 * 4.发送音讯的音讯体 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送音讯实现:"+message); } } }}消费者: ...

June 14, 2022 · 3 min · jiezi

关于mq:系统学习消息队列RabbitMQ的基础概念

1.什么是MQ2.为什么要应用MQ3.MQ的分类和抉择4.RabbitMQ的根底概念5.RabbitMQ的装置 1.什么是MQMQ(message queue),全名为音讯队列,接下来咱们从数据结构、操作系统和日常开发的角度来解释一下这个概念。从数据结构的角度上来看:MQ是一个队列,领有先进先出个性的一种数据结构。 从操作系统的角度上来看:操作系统上有一个概念,叫做管道通信。所谓管道,就是连贯一个读线程和一个写线程,以实现它们之间的通信的一个共享文件。有了管道,就意味着两个线程能够一边写,一边读,既不必放心读写管制的问题,也能够防止写过程忙碌/读过程闲暇或者写过程闲暇/读过程忙碌造成的阻塞,使两个过程有一个很好的解耦。 从日常开发的角度上来看:咱们在日常开发的时候,常常会须要有排队、异步、限流等操作,有了mq当前,音讯只须要发送给mq,不必依赖其它服务。 2.为什么要应用MQ咱们应用mq个别有三个目标:削峰填谷,利用解耦,异步解决,咱们来挨个解释一下这三个概念。 2.1)削峰填谷假如当初有一个电商零碎,一秒钟能解决一万个申请,在失常的时间段,零碎的生产能力齐全能够消化。然而在秒杀/流动/热品的顶峰期间,如果一瞬间有两万个申请,那么这个时候操作系统是解决不了的,当初有了MQ,咱们就能够把所有的音讯放在MQ里,而后让零碎能生产的速度进行解决,就达到了流量的一个削峰。在服务器应用顶峰后,往往会有一段低谷,就把生产不过去的那一部分订单丢到零碎应用低谷期间进行解决,这就是削峰填谷。 2.2)利用解耦咱们假如还是一个电商零碎,其中会有领取,订单,库存,物流等零碎,当用户下单时,上游零碎挨个调用上游零碎,然而任何一个子系统呈现了故障,下单都会失败,用户体验不佳。当零碎调用形式变为音讯队列的时候,咱们只须要把音讯发送给其它零碎,就算其它零碎出了故障,也能够保障本人的零碎胜利,在故障复原之后,因为音讯还能够保留在mq当中,能够持续生产,两头用户感知不到某些零碎的异样,晋升零碎可用性。 2.3)异步解决比方咱们当初有一个注册性能,有输出邀请码和不输出邀请码两个选项,如果输出邀请码,邀请者会取得处分。此时,咱们就能够让音讯队列来进行实现,把处分的逻辑放在队列的消费者外面,音讯生产者只须要实现注册逻辑,再发送一条音讯给队列,就能够大大减少逻辑的解决工夫,无需阻塞。 3.MQ的分类和抉择 3.1)ActiveMQ 长处:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,音讯可靠性较低的概率失落数据 毛病:官网社区当初对 ActiveMQ 5.x 保护越来越少,高吞吐量场景较少应用。 3.2)Kafka 长处: 性能卓越,单机写入 TPS 约在百万条/秒,最大的长处,就是吞吐量高。时效性 ms 级可用性十分高,kafka 是分布式的,一个数据多个正本,多数机器宕机,不会失落数据,不会导致不可用,消费者采纳 Pull 形式获取音讯, 音讯有序, 通过管制可能保障所有音讯被生产且仅被生产一次;有优良的第三方KafkaWeb 治理界面 Kafka-Manager;在日志畛域比拟成熟,被多家公司和多个开源我的项目应用;性能反对: 性能较为简单,次要反对简略的 MQ 性能,在大数据畛域的实时计算以及日志采集被大规模应用 毛病:Kafka 单机超过 64 个队列/分区,Load 会产生显著的飙高景象,队列越多,load 越高,发送音讯响应工夫变长,应用短轮询形式,实时性取决于轮询间隔时间,生产失败不反对重试;反对音讯程序,然而一台代理宕机后,就会产生音讯乱序,社区更新较慢; 3.3)RocketMQ 长处:单机吞吐量十万级,可用性十分高,分布式架构,音讯能够做到 0 失落,MQ 性能较为欠缺,还是分布式的,扩展性好,反对 10 亿级别的音讯沉积,不会因为沉积导致性能降落,源码是 java 咱们能够本人浏览源码,定制本人公司的 MQ 毛病:反对的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度个别,没有在MQ外围中去实现 JMS 等接口,有些零碎要迁徙须要批改大量代码 3.4)RabbitMQ 长处:因为 erlang 语言的高并发个性,性能较好;吞吐量到万级,MQ 性能比拟齐备,强壮、稳固、易用、跨平台、反对多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,反对 AJAX 文档齐全;开源提供的治理界面十分棒,用起来很好用,社区活跃度高;更新频率相当高 ...

June 13, 2022 · 1 min · jiezi

关于mq:mq从零开始实现-mq11消费者消息回执添加分组信息-pull-message-ack-groupName

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 【mq】从零开始实现 mq-07-负载平衡 load balance 【mq】从零开始实现 mq-08-配置优化 fluent 【mq】从零开始实现 mq-09-消费者拉取音讯 pull message 【mq】从零开始实现 mq-10-消费者拉取音讯回执 pull message ack 【mq】从零开始实现 mq-11-消费者音讯回执增加分组信息 pull message ack groupName 状态回执上一节咱们实现了音讯的回执,然而存在一个问题。 同一个音讯,能够被不同的 groupName 进行生产,所以回执是须要依据 groupName 进行离开的,这个上一节中脱漏了。 Broker 推送音讯的调整以前推送音讯是间接推送,然而短少 groupName 信息。 订阅列表获取获取订阅列表的实现调整如下: public List<ChannelGroupNameDto> getPushSubscribeList(MqMessage mqMessage) { final String topicName = mqMessage.getTopic(); Set<ConsumerSubscribeBo> set = pushSubscribeMap.get(topicName); if(CollectionUtil.isEmpty(set)) { return Collections.emptyList(); } //2. 获取匹配的 tag 列表 final List<String> tagNameList = mqMessage.getTags(); Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>(); for(ConsumerSubscribeBo bo : set) { String tagRegex = bo.getTagRegex(); if(RegexUtil.hasMatch(tagNameList, tagRegex)) { String groupName = bo.getGroupName(); MapUtil.putToListMap(groupMap, groupName, bo); } } //3. 依照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 抉择 final String shardingKey = mqMessage.getShardingKey(); List<ChannelGroupNameDto> channelGroupNameList = new ArrayList<>(); for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) { List<ConsumerSubscribeBo> list = entry.getValue(); ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey); final String channelId = bo.getChannelId(); BrokerServiceEntryChannel entryChannel = registerMap.get(channelId); if(entryChannel == null) { log.warn("channelId: {} 对应的通道信息为空", channelId); continue; } final String groupName = entry.getKey(); ChannelGroupNameDto channelGroupNameDto = ChannelGroupNameDto.of(groupName, entryChannel.getChannel()); channelGroupNameList.add(channelGroupNameDto); } return channelGroupNameList;}ChannelGroupNameDto 的定义如下: ...

May 17, 2022 · 2 min · jiezi

关于mq:mq从零开始实现-mq09消费者拉取消息-pull-message

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 【mq】从零开始实现 mq-07-负载平衡 load balance 【mq】从零开始实现 mq-08-配置优化 fluent 【mq】从零开始实现 mq-09-消费者拉取音讯 pull message 音讯的推与拉大家好,我是老马。 这一节咱们来一起看一下 MQ 音讯中的推和拉两种模式。 推音讯由 broker 间接推送给消费者,实时性比拟好。 毛病是如果消费者解决不过去,就会造成大量问题。 拉音讯由消费者定时从 broker 拉取,长处是实现简略,能够依据消费者本人的解决能力来生产。 毛病是实时性绝对较差。 理论业务中,须要联合具体的场景,抉择适合的策略。 拉取策略实现push 策略咱们首先看一下 push 策略的简化外围实现: package com.github.houbb.mq.consumer.core;/** * 推送生产策略 * * @author binbin.hou * @since 1.0.0 */public class MqConsumerPush extends Thread implements IMqConsumer { @Override public void run() { // 启动服务端 log.info("MQ 消费者开始启动服务端 groupName: {}, brokerAddress: {}", groupName, brokerAddress); //1. 参数校验 this.paramCheck(); try { //0. 配置信息 //1. 初始化 //2. 连贯到服务端 //3. 标识为可用 //4. 增加钩子函数 //5. 启动实现当前的事件 this.afterInit(); log.info("MQ 消费者启动实现"); } catch (Exception e) { log.error("MQ 消费者启动异样", e); throw new MqException(ConsumerRespCode.RPC_INIT_FAILED); } } /** * 初始化实现当前 */ protected void afterInit() { } // 其余办法 /** * 获取生产策略类型 * @return 类型 * @since 0.0.9 */ protected String getConsumerType() { return ConsumerTypeConst.PUSH; }}咱们在 push 中预留了一个 afterInit 办法,便于子类重载。 ...

May 11, 2022 · 3 min · jiezi

关于mq:mq从零开始实现-mq08配置优化-fluent

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 【mq】从零开始实现 mq-07-负载平衡 load balance 【mq】从零开始实现 mq-08-配置优化 fluent fluent大家好,我是老马。 fluent 的配置形式,是我集体十分喜爱的一种配置形式。 传统的 java 应用 get/set 办法进行属性设置。 相似这种: MqBroker mqBroker = new MqBroker();mqBroker.setPort(9999);mqBroker.setAddress("127.0.0.1");fluent 写法能够让咱们写起来代码更加晦涩: MqBroker.newInstance().port(9999).address("127.0.0.1")写起来更加丝滑晦涩。 Broker 配置属性/** * 端口号 */private int port = BrokerConst.DEFAULT_PORT;/** * 调用治理类 * * @since 1.0.0 */private final IInvokeService invokeService = new InvokeService();/** * 消费者治理 * * @since 0.0.3 */private IBrokerConsumerService registerConsumerService = new LocalBrokerConsumerService();/** * 生产者治理 * * @since 0.0.3 */private IBrokerProducerService registerProducerService = new LocalBrokerProducerService();/** * 长久化类 * * @since 0.0.3 */private IMqBrokerPersist mqBrokerPersist = new LocalMqBrokerPersist();/** * 推送服务 * * @since 0.0.3 */private IBrokerPushService brokerPushService = new BrokerPushService();/** * 获取响应超时工夫 * @since 0.0.3 */private long respTimeoutMills = 5000;/** * 负载平衡 * @since 0.0.7 */private ILoadBalance<ConsumerSubscribeBo> loadBalance = LoadBalances.weightRoundRobbin();/** * 推送最大尝试次数 * @since 0.0.8 */private int pushMaxAttempt = 3;flent 配置public MqBroker port(int port) { this.port = port; return this;}public MqBroker registerConsumerService(IBrokerConsumerService registerConsumerService) { this.registerConsumerService = registerConsumerService; return this;}public MqBroker registerProducerService(IBrokerProducerService registerProducerService) { this.registerProducerService = registerProducerService; return this;}public MqBroker mqBrokerPersist(IMqBrokerPersist mqBrokerPersist) { this.mqBrokerPersist = mqBrokerPersist; return this;}public MqBroker brokerPushService(IBrokerPushService brokerPushService) { this.brokerPushService = brokerPushService; return this;}public MqBroker respTimeoutMills(long respTimeoutMills) { this.respTimeoutMills = respTimeoutMills; return this;}public MqBroker loadBalance(ILoadBalance<ConsumerSubscribeBo> loadBalance) { this.loadBalance = loadBalance; return this;}Producer 配置属性/** * 分组名称 */private String groupName = ProducerConst.DEFAULT_GROUP_NAME;/** * 中间人地址 */private String brokerAddress = "127.0.0.1:9999";/** * 获取响应超时工夫 * @since 0.0.2 */private long respTimeoutMills = 5000;/** * 检测 broker 可用性 * @since 0.0.4 */private volatile boolean check = true;/** * 调用治理服务 * @since 0.0.2 */private final IInvokeService invokeService = new InvokeService();/** * 状态治理类 * @since 0.0.5 */private final IStatusManager statusManager = new StatusManager();/** * 生产者-两头服务端服务类 * @since 0.0.5 */private final IProducerBrokerService producerBrokerService = new ProducerBrokerService();/** * 为残余的申请等待时间 * @since 0.0.5 */private long waitMillsForRemainRequest = 60 * 1000;/** * 负载平衡策略 * @since 0.0.7 */private ILoadBalance<RpcChannelFuture> loadBalance = LoadBalances.weightRoundRobbin();/** * 音讯发送最大尝试次数 * @since 0.0.8 */private int maxAttempt = 3;fluent 配置public MqProducer groupName(String groupName) { this.groupName = groupName; return this;}public MqProducer brokerAddress(String brokerAddress) { this.brokerAddress = brokerAddress; return this;}public MqProducer respTimeoutMills(long respTimeoutMills) { this.respTimeoutMills = respTimeoutMills; return this;}public MqProducer check(boolean check) { this.check = check; return this;}public MqProducer waitMillsForRemainRequest(long waitMillsForRemainRequest) { this.waitMillsForRemainRequest = waitMillsForRemainRequest; return this;}public MqProducer loadBalance(ILoadBalance<RpcChannelFuture> loadBalance) { this.loadBalance = loadBalance; return this;}public MqProducer maxAttempt(int maxAttempt) { this.maxAttempt = maxAttempt; return this;}Consuemr 配置属性/** * 组名称 */private String groupName = ConsumerConst.DEFAULT_GROUP_NAME;/** * 中间人地址 */private String brokerAddress = "127.0.0.1:9999";/** * 获取响应超时工夫 * @since 0.0.2 */private long respTimeoutMills = 5000;/** * 检测 broker 可用性 * @since 0.0.4 */private volatile boolean check = true;/** * 为残余的申请等待时间 * @since 0.0.5 */private long waitMillsForRemainRequest = 60 * 1000;/** * 调用治理类 * * @since 1.0.0 */private final IInvokeService invokeService = new InvokeService();/** * 音讯监听服务类 * @since 0.0.5 */private final IMqListenerService mqListenerService = new MqListenerService();/** * 状态治理类 * @since 0.0.5 */private final IStatusManager statusManager = new StatusManager();/** * 生产者-两头服务端服务类 * @since 0.0.5 */private final IConsumerBrokerService consumerBrokerService = new ConsumerBrokerService();/** * 负载平衡策略 * @since 0.0.7 */private ILoadBalance<RpcChannelFuture> loadBalance = LoadBalances.weightRoundRobbin();/** * 订阅最大尝试次数 * @since 0.0.8 */private int subscribeMaxAttempt = 3;/** * 勾销订阅最大尝试次数 * @since 0.0.8 */private int unSubscribeMaxAttempt = 3;fluent 配置public MqConsumerPush subscribeMaxAttempt(int subscribeMaxAttempt) { this.subscribeMaxAttempt = subscribeMaxAttempt; return this;}public MqConsumerPush unSubscribeMaxAttempt(int unSubscribeMaxAttempt) { this.unSubscribeMaxAttempt = unSubscribeMaxAttempt; return this;}public MqConsumerPush groupName(String groupName) { this.groupName = groupName; return this;}public MqConsumerPush brokerAddress(String brokerAddress) { this.brokerAddress = brokerAddress; return this;}public MqConsumerPush respTimeoutMills(long respTimeoutMills) { this.respTimeoutMills = respTimeoutMills; return this;}public MqConsumerPush check(boolean check) { this.check = check; return this;}public MqConsumerPush waitMillsForRemainRequest(long waitMillsForRemainRequest) { this.waitMillsForRemainRequest = waitMillsForRemainRequest; return this;}public MqConsumerPush loadBalance(ILoadBalance<RpcChannelFuture> loadBalance) { this.loadBalance = loadBalance; return this;}小结这一节的实现非常简单,能够说是没有啥技术难度。 ...

May 10, 2022 · 3 min · jiezi

关于mq:mq从零开始实现-mq07负载均衡-load-balance

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 【mq】从零开始实现 mq-07-负载平衡 load balance 为什么须要负载平衡大家好,我是老马。 这一节让咱们看一下如何实现 MQ 的负载平衡。 为什么须要负载平衡呢? 作用负载平衡最外围的作用: (1)能够防止单点故障 (2)能够让申请均分的扩散到每一个节点 实现思路负载平衡实现的形式比拟多,最简略的就是随机抉择一个。 拓展浏览: 从零手写实现负载平衡 http://houbb.github.io/2020/0... MQ 中用到负载平衡的中央生产者发送生产者发送音讯时,能够发送给任一 broker。 broker 推送给消费者broker 接管到音讯当前,在推送给消费者时,也能够任一抉择一个。 消费者的生产 ACK消费者生产完,状态回执给 broker,能够抉择任一一个。 音讯黏连有些音讯比拟非凡,比方须要保障生产的有序性,能够通过 shardingKey 的形式,在负载的时候固定到指定的片区。 代码实现生产者发送对立调整获取 channel 的办法。 @Overridepublic Channel getChannel(String key) { // 期待启动实现 while (!statusManager.status()) { log.debug("期待初始化实现..."); DateUtil.sleep(100); } RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(this.loadBalance, channelFutureList, key); return rpcChannelFuture.getChannelFuture().channel();}工具类实现为外围实现: /** * 负载平衡 * * @param list 列表 * @param key 分片键 * @return 后果 * @since 0.0.7 */public static <T extends IServer> T loadBalance(final ILoadBalance<T> loadBalance, final List<T> list, String key) { if(CollectionUtil.isEmpty(list)) { return null; } if(StringUtil.isEmpty(key)) { LoadBalanceContext<T> loadBalanceContext = LoadBalanceContext.<T>newInstance() .servers(list); return loadBalance.select(loadBalanceContext); } // 获取 code int hashCode = Objects.hash(key); int index = hashCode % list.size(); return list.get(index);}如果指定了 shardingKey,那么依据 shadringKey 进行 hash 判断。 ...

May 8, 2022 · 2 min · jiezi

关于mq:mq从零开始实现-mq06消费者心跳检测-heartbeat

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 为什么须要心跳?心跳(heartbeat ),顾名思义就是心脏的跳动。 医学上个别通过心跳是否跳动,来判断一个人是否活着。 那么,分布式服务中如何判断一个服务是否还活着呢? 实现思路比方 mq 中,broker 须要把音讯实时推送给在线的消费者。 那么如何判断一个消费者是否活着呢? 咱们能够让消费者定时,比方每 5 秒钟给 broker 发送一个心跳包,思考到网络提早等,如果间断 1min 都没有收到心跳,咱们则移除这个消费者,认为服务曾经挂了。 消费者实现上代码! 心跳实现心跳能够是一个很简略的音讯体。 @Overridepublic void heartbeat() { final MqHeartBeatReq req = new MqHeartBeatReq(); final String traceId = IdHelper.uuid32(); req.setTraceId(traceId); req.setMethodType(MethodType.C_HEARTBEAT); req.setAddress(NetUtil.getLocalHost()); req.setPort(0); req.setTime(System.currentTimeMillis()); log.debug("[HEARTBEAT] 往服务端发送心跳包 {}", JSON.toJSON(req)); // 告诉全副 for(RpcChannelFuture channelFuture : channelFutureList) { try { Channel channel = channelFuture.getChannelFuture().channel(); callServer(channel, req, null); } catch (Exception exception) { log.error("[HEARTBEAT] 往服务端解决异样", exception); } }}消费者把心跳告诉所有的 broker. ...

May 6, 2022 · 2 min · jiezi

关于mq:mq从零开始实现-mq05实现优雅停机

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 为什么须要优雅敞开?我记得多年前,那个时候 rpc 框架支流用的还是 dubbo,每次都是中午还是上线,上线上完根本都是凌晨 2-3 点。 为什么要中午上线呢? 因为这个时候个别业务流量最低。 还有就是上线公布,每次都要人工期待一段几分钟。 因为 rpc 调用入口曾经敞开了,然而自身可能还没有解决完。 那么有没有办法能够让服务的敞开更加优雅,而不是人工期待呢? 实现思路人工期待几分钟的形式个别能够解决问题,然而大部分状况是无用功,还比拟浪费时间。 比拟天然的一种形式是引入钩子函数。 当利用筹备敞开时,首先判断是否存在解决中的申请,不存在则间接敞开;存在,则期待申请实现再敞开。 实现生产者和消费者是相似的,咱们以生产者为例。 启动实现的调整@Overridepublic synchronized void run() { this.paramCheck(); // 启动服务端 log.info("MQ 生产者开始启动客户端 GROUP: {} brokerAddress: {}", groupName, brokerAddress); try { //0. 配置信息 ProducerBrokerConfig config = ProducerBrokerConfig.newInstance() .groupName(groupName) .brokerAddress(brokerAddress) .check(check) .respTimeoutMills(respTimeoutMills) .invokeService(invokeService) .statusManager(statusManager); //1. 初始化 this.producerBrokerService.initChannelFutureList(config); //2. 连贯到服务端 this.producerBrokerService.registerToBroker(); //3. 标识为可用 statusManager.status(true); //4. 增加钩子函数 final DefaultShutdownHook rpcShutdownHook = new DefaultShutdownHook(); rpcShutdownHook.setStatusManager(statusManager); rpcShutdownHook.setInvokeService(invokeService); rpcShutdownHook.setWaitMillsForRemainRequest(waitMillsForRemainRequest); rpcShutdownHook.setDestroyable(this.producerBrokerService); ShutdownHooks.rpcShutdownHook(rpcShutdownHook); log.info("MQ 生产者启动实现"); } catch (Exception e) { log.error("MQ 生产者启动遇到异样", e); throw new MqException(ProducerRespCode.RPC_INIT_FAILED); }}状态治理类这里咱们引入 statusManager 治理整体的状态。 ...

May 5, 2022 · 2 min · jiezi

关于mq:mq从零开始实现-mq04启动检测与实现优化

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 上一节咱们引入了中间人 broker,让音讯的生产者和消费者解耦。 这一节咱们对初始化代码进行优化,便于前期拓展保护。 生产者启动优化启动实现整体实现调整如下: @Overridepublic synchronized void run() { this.paramCheck(); // 启动服务端 log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}", groupName, port, brokerAddress); try { //channel future this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress, initChannelHandler(), check); // register to broker this.registerToBroker(); // 标识为可用 enableFlag = true; log.info("MQ 生产者启动实现"); } catch (Exception e) { log.error("MQ 生产者启动遇到异样", e); throw new MqException(ProducerRespCode.RPC_INIT_FAILED); }}看起来是不是比起原来清新很多呢? 然而复杂性只会转移,不会隐没。 答案就是封装到 initChannelFutureList 中去了。 ...

May 3, 2022 · 2 min · jiezi

关于mq:mq从零开始实现-mq03引入-broker-中间人

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 上一节咱们学习了如何实现生产者给消费者发送音讯,然而是通过直连的形式。 那么如何能力达到解耦的成果呢? 答案就是引入 broker,音讯的中间人。 MqBroker 实现外围启动类相似咱们后面 consumer 的启动实现: package com.github.houbb.mq.broker.core;/** * @author binbin.hou * @since 1.0.0 */public class MqBroker extends Thread implements IMqBroker { // 省略 private ChannelHandler initChannelHandler() { MqBrokerHandler handler = new MqBrokerHandler(); handler.setInvokeService(invokeService); handler.setRegisterConsumerService(registerConsumerService); handler.setRegisterProducerService(registerProducerService); handler.setMqBrokerPersist(mqBrokerPersist); handler.setBrokerPushService(brokerPushService); handler.setRespTimeoutMills(respTimeoutMills); return handler; } @Override public void run() { // 启动服务端 log.info("MQ 中间人开始启动服务端 port: {}", port); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(workerGroup, bossGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf)) .addLast(initChannelHandler()); } }) // 这个参数影响的是还没有被accept 取出的连贯 .option(ChannelOption.SO_BACKLOG, 128) // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 .childOption(ChannelOption.SO_KEEPALIVE, true); // 绑定端口,开始接管进来的链接 ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly(); log.info("MQ 中间人启动实现,监听【" + port + "】端口"); channelFuture.channel().closeFuture().syncUninterruptibly(); log.info("MQ 中间人敞开实现"); } catch (Exception e) { log.error("MQ 中间人启动异样", e); throw new MqException(BrokerRespCode.RPC_INIT_FAILED); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}initChannelHandler 中有不少新面孔,咱们前面会具体介绍。 ...

April 30, 2022 · 10 min · jiezi

关于mq:MQjava-从零开始实现消息队列-mq02如何实现生产者调用消费者

前景回顾上一节咱们学习了如何实现基于 netty 客服端和服务端的启动。 【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】java 从零开始实现音讯队列 mq-02-如何实现生产者调用消费者? 那么客户端如何调用服务端呢? 咱们本节就来一起实现一下。 消费者实现启动类的调整ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(workerGroup, bossGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf)) .addLast(new MqConsumerHandler(invokeService)); } }) // 这个参数影响的是还没有被accept 取出的连贯 .option(ChannelOption.SO_BACKLOG, 128) // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 .childOption(ChannelOption.SO_KEEPALIVE, true);这里咱们通过指定分隔符解决 netty 粘包问题。 解决 netty 粘包问题MqConsumerHandler 解决类MqConsumerHandler 的实现如下,增加对应的业务解决逻辑。 package com.github.houbb.mq.consumer.handler;/** * @author binbin.hou * @since 1.0.0 */public class MqConsumerHandler extends SimpleChannelInboundHandler { private static final Log log = LogFactory.getLog(MqConsumerHandler.class); /** * 调用治理类 * @since 1.0.0 */ private final IInvokeService invokeService; public MqConsumerHandler(IInvokeService invokeService) { this.invokeService = invokeService; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); RpcMessageDto rpcMessageDto = null; try { rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class); } catch (Exception exception) { log.error("RpcMessageDto json 格局转换异样 {}", new String(bytes)); return; } if (rpcMessageDto.isRequest()) { MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx); if(commonResp == null) { log.debug("以后音讯为 null,疏忽解决。"); return; } writeResponse(rpcMessageDto, commonResp, ctx); } else { final String traceId = rpcMessageDto.getTraceId(); // 抛弃掉 traceId 为空的信息 if(StringUtil.isBlank(traceId)) { log.debug("[Server Response] response traceId 为空,间接抛弃", JSON.toJSON(rpcMessageDto)); return; } // 增加音讯 invokeService.addResponse(traceId, rpcMessageDto); } }}rpc 音讯体定义为了统一标准,咱们的 rpc 音讯体 RpcMessageDto 定义如下: ...

April 23, 2022 · 8 min · jiezi

关于mq:mq从零开始实现-mq01生产者消费者启动

MQ 是什么?MQ(Message Queue)音讯队列,是根底数据结构中“先进先出”的一种数据结构。 指把要传输的数据(音讯)放在队列中,用队列机制来实现消息传递——生产者产生音讯并把音讯放入队列,而后由消费者去解决。 消费者能够到指定队列拉取音讯,或者订阅相应的队列,由MQ服务端给其推送音讯。 MQ 的作用?音讯队列中间件是分布式系统中重要的组件,次要解决利用解耦,异步音讯,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。 解耦:一个业务须要多个模块独特实现,或者一条音讯有多个零碎须要对应解决,只须要主业务实现当前,发送一条MQ,其余模块生产MQ音讯,即可实现业务,升高模块之间的耦合。 异步:主业务执行完结后隶属业务通过MQ,异步执行,减低业务的响应工夫,进步用户体验。 削峰:高并发状况下,业务异步解决,提供高峰期业务解决能力,防止零碎瘫痪。 ps: 以上内容摘选自百科。 实现 mq 的筹备工作maven 引入<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.42.Final</version></dependency><dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.76</version></dependency>模块划分The message queue in java. 作为 mq 的从零开始的学习我的项目,目前已开源。 我的项目的模块如下: 模块阐明mq-common公共代码mq-broker注册核心mq-producer音讯生产者mq-consumer音讯消费者音讯消费者接口定义package com.github.houbb.mq.consumer.api;/** * @author binbin.hou * @since 1.0.0 */public interface IMqConsumer { /** * 订阅 * @param topicName topic 名称 * @param tagRegex 标签正则 */ void subscribe(String topicName, String tagRegex); /** * 注册监听器 * @param listener 监听器 */ void registerListener(final IMqConsumerListener listener);}IMqConsumerListener 作为音讯监听类的接口,定义如下: ...

April 21, 2022 · 3 min · jiezi

关于mq:MQ面试问题整理

我的项目中为什么应用MQ长处解耦<br/>同一块业务的相干能力被很多个我的项目须要,后续也有极大可能会被其余我的项目须要。将该局部能力解耦,通过MQ进行音讯的对立解决。若有其余我的项目或业务须要相干性能,则对此进行订阅,解决相干业务。缩小我的项目间接依赖。<br/>异步<br/>大型项目相干合作人员,参加团队越来越多,我的项目之间的关联越来越深。造成整个链路特地长,且问题排查不容易。<br/>削峰<br/>对于局部业务,在某些工夫点,可能因为刹时申请过大,造成所有申请间接拜访至数据库对数据库造成压力,从而影响我的项目原有业务。个别MySQL数据库的解决能力在2k次/s,MQ的顶峰解决能力1w+。能无效的解决<br/><!-- more -->毛病零碎可用性升高因为新减少了MQ,因为MQ的可用性、可靠性会造成零碎可用性的额外负担。<br/> 可用性问题如何解决<br/>不同的MQ对此的解决方案不一样。ActiveMQZookeeper及多个activeMQ,批改长久化为性能更好的LevelDB替换掉默认的KahaDB.<br/>长处:简略实现了可用性。<br/>毛病:每次理论可用的MQ只有一个,一旦有一个MQ呈现问题,相干的音讯就会失落。<br/>RocketMQ 单master模式,一旦呈现问题,就会整个服务不可用。个别不会间接生产应用,适宜集体学习;多master模式,多个master节点组成集群。单个master呈现问题不影响。长处:所有模式中性能最高的。 毛病:单个master节点宕机期间,未被生产的音讯在节点复原之前不可用,音讯实时性就受到影响。留神:应用同步刷盘可保障音讯不失落,同时topic绝对应的queue应该散布在集权中各个master节点,而不是只在某一个master节点上;多master多slave异步复制模式,在多master模式下,每个master节点至多都有一个对应的slave;<br/>master节点可读可写,slave只可读 不可写。长处:在master宕机时,消费者能够间接从slave读取音讯,音讯实时性不会受到影响,性能简直与多master一样。毛病:应用异步复制可能造成音讯失落问题。多master多slave同步双写,同上一种模式相似,区别在于master与slave之间的数据同步形式。长处:同步双写的同步模式能保证数据不失落。毛病:发送单个音讯时长会比拟长,性能会比拟差;同步刷盘,异步刷盘的区别:在于多master之间的数据同步写入磁盘模式。 同步复制异步复制概念即等 Master 和 Slave 均写胜利后才反馈给客户端写胜利状态只有 Master 写胜利,就反馈客户端写胜利状态可靠性可靠性高,若 Master 呈现故障,Slave 上有全副的备份数据,容易复原若 Master 呈现故障,可能存在一些数据还没来得及写入 Slave,可能会失落效率因为是同步复制,会减少数据写入提早,升高零碎吞吐量因为只有写入 Master 即可,故数据写入提早较低,吞吐量较高理论利用中的举荐把Master和Slave设置成ASYNC_FLUSH的异步刷盘形式,主从之间配置成SYNC_MASTER的同步复制形式,这样即便有一台机器出故障,依然能够保证数据不丢。可靠性问题如何解决<br/>引入MQ造成的数据可靠性问题,或者如何解决音讯失落问题?整个MQ的链路中,可能存在音讯失落的地位仅有三个,一个是生产端,一个是MQ本身,最初一个就是上游的消费者。<br/>不同的MQ对此解决方案均不一样,需别离剖析。ActiveMQ生产端程序在每次发往MQ的音讯后,可接管到MQ对其的Ack响应, 零碎复杂度进步减少了MQ,引入了音讯是否会反复生产,音讯失落问题。<br/> 数据一致性引入MQ之后,繁多事务会被分拆为多个事务。引入了分布式事务一致性的问题。<br/> MQ如何解决反复生产问题幂等校验上游不能反复下发<br/>程序本人保障不反复创立音讯,应用业务事务进行管制。<br/>上游不能反复生产<br/>将音讯信息入库,应用数据库、Redis进行音讯存储。有音讯过去应用惟一主键进行存储,每次进行校验是否存在,不存在持续解决,否则依据业务进行疏忽或更新;<br/>将音讯信息入库,依据业务进行惟一主键束缚,若有反复数据,则会报惟一主键抵触,也插入不了;如果仅是redis的set操作,则无需解决。即时反复也不影响;MQ如何解决音讯失落问题http://yannis.club

February 22, 2022 · 1 min · jiezi

关于mq:RabbitMQ-Exchange-Types-四种类型

交换器RabbitMQ 消息传递模型的核心思想是生产者从不间接向队列发送任何音讯。生产者只将音讯发送到 Exchange 交换器中,并不知道音讯是否会被传送到队列。交换器负责接管生产者生产的音讯,并通过肯定路由规定将音讯发送到指定的队列,起到一个传递的作用 类型介绍RabbitMQ 罕用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP标准里还提到两种 Exchange Type,别离为 system 与自定义,这里不予以形容)。 fanoutfanout 类型的 Exchange 路由规定十分。它会把所有发送到该 Exchange 的音讯路由到所有与它绑定的 Queue 中。这种模式在 RabboitMQ 官网介绍中称之为:公布/订阅 图 1 fanout Exchange 来看下官网给出的代码示例: emit_log.php <?phprequire_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');$channel = $connection->channel();$exchange_name = 'exchange_name';$channel->exchange_declare($exchange_name, 'fanout', false, false, false);$data = implode(' ', array_slice($argv, 1));if (empty($data)) { $data = "info: Hello World!";}$msg = new AMQPMessage($data);$channel->basic_publish($msg, 'logs');echo ' [x] Sent ', $data, "\n";$channel->close();$connection->close();receive_logs.php ...

July 16, 2021 · 3 min · jiezi

关于mq:MQ-面试要点

为什么要应用MQ当面试官问出 “为什么要应用MQ” 时,其实也就是在问 MQ 有哪些作用? 1. 解耦一个我的项目随着业务迭代会一直减少上游调用方,依照传统形式,那么每减少一个上游,就须要批改一次以后我的项目的代码,减少新的上游交互逻辑,工作量不堪称不大。 而如果在这里思考接入 MQ 作为两头代理,那么只须要将数据写入到 MQ,而不必思考简单的交互通信。并且,上游减少时,根本也不必批改上游代码,只须要让上游监听对应的通道即可。甚至,配合上动静配置,能够实现上游零批改地减少上游服务。 2. 异步如果须要进步高延时接口的响应速度。那么,就能够思考应用 MQ 来实现异步执行,只须要将申请信息写入 MQ 就能够立刻返回,实现高响应速度,至于残余的工作交给 MQ 即可。 3. 流量削峰如果业务中有高流量的场景,为了防止高流量一下子打到数据库层,造成数据库瘫痪,能够思考将申请写入 MQ,即 MQ 作为一个缓冲区,而后在零碎反对的生产速度下,通过上游服务一直地生产 MQ 中累积的音讯。当高峰期过后,MQ 中的积压音讯将缓缓被生产掉。 MQ长处和毛病长处如上所述: 解耦异步流量削峰毛病升高了零碎的可用性。原先只有上下游,现在两头多了个 MQ。一旦 MQ 故障,整个零碎将不可用。进步了零碎的复杂性。多了 MQ 之后,开发过程中可能须要思考 MQ 反复生产,音讯失落,如果保障音讯程序的问题。一致性问题。MQ 作为两头代理,多个上游执行过程中,某个或多个执行失败,那么势必造成数据的不统一。不同的MQ区别 总结而言,如果是技术挑战不高,举荐应用 RabbitMQ(Erlang语言妨碍Java工程师深入研究把握它);如果研发能力强的,能够思考 RocketMQ;如果是用于大数据畛域,那么举荐 Kafka。 不同MQ架构剖析RabbitMQRocketMQKafka如何保障音讯不反复生产(幂等)如果将数据写入到 Mysql,须要依据表主键判断行是否存在进行新增或更新即可;如果将数据写入到 redis,间接写入即可。不过,须要一个业务惟一ID作为 redis 的 key;如果没有应用到 Mysql 或 redis,那么还是须要应用其中一个来存储曾经生产过的音讯,解决办法同上。如何保障音讯不失落(以RabbitMQ为例)1. 保障生产者不丢数据一是能够思考开启 RabbitMQ 提供的事务操作。当产生音讯失落异样时,能够回滚事务,而后从新发送。这种形式下会极大地升高 MQ 吞吐量; 二是开启 channel 的 confirm 模式。生产者发送音讯给 MQ 后,MQ 会回传一个 ack。如果生产者没有收到,就能够重试。该监听办法是异步的,不会阻塞,能够间接进行下一条的发送。 2. 保障 MQ 不丢数据这里须要开启 MQ 的长久化。RabbitMQ 开启长久化分两步,一是开启 queue 的长久化,这样 MQ 会长久化 queue 的元数据;二是设置发送音讯时的音讯选项,是否长久化。 ...

May 16, 2021 · 1 min · jiezi

关于mq:常见消息队列

April 4, 2021 · 0 min · jiezi

关于mq:网关交易同步优化

现有架构:对于mysql的操作: Req:一次插入Rsp:一次查问,一次批改交易同步:一次查问,一次删除Mysql表构造为交易全副信息,作用为匹配交易 定时工作: 同步失常交易同步开放平台已超时但又有响应的交易长期表剩下只有申请但没有响应的交易重放的交易只会记录一次,因为长期表依据traceNo,appId,timestamp匹配 批改后架构: 流程: 申请插入req的队列响应蕴含残缺交易,插入rsp,监听rsp队列将交易插入es,延时异步删除req定时工作将只有申请的交易标记为“无响应”插入rsp,再同步至esFlink监听rsp进行交易告警对于mysql的操作: Req:一次插入Rsp:延时删除定时工作:一次查问劣势: 流程简略,没有简单的匹配规定可记录所有的交易,蕴含重放交易Mysql表构造为交易申请,作用为记录申请,如果须要增加交易字段,不必批改表构造。即扩展性更好实时性更好,对mysql压力更小告警可用flink进行解决,可配置更简单的规定非凡的交易状况: 失常状况下每个申请在开放平台都有响应,不论是超时还是重放的交易,响应应该在mysq中找到申请信息。申请无任何响应(gateway承受申请后忽然挂掉):定时工作查看1分钟(gateway超时工夫)前在mysql的申请,标记为未响应(在es中查问?)插入rsp队列响应在mysql未找到申请(响应比申请先到mysql,网关超时又来了业务响应):延时删除申请标记为网关超时又来了业务响应(mq超时但响应又来,业务响应在mysql中找不req):Es记录两条流水号雷同的交易,一条为网关超时,一条没有申请信息反复的交易,即雷同的申请参数反复发(req插入mysql时可能会报错雷同ID,rsp删除mysql时找不到req):es全副记录,不论是否反复响应来了的删除策略: Req来了后插入mysql,收到rsp后删除如果响应查不到申请,过五百毫秒再查,防止响应比申请先入库的状况;如果没有,应该为业务已超时但响应又回来的交易,再在es查;如果还没有,标记为mysql找不到记录。

February 20, 2021 · 1 min · jiezi

关于mq:ZMQ-指南第二章-ZeroMQ进阶

第二章 ZeroMQ进阶第一章咱们简略试用了ZMQ的若干通信模式:申请-应答模式、公布-订阅模式、管道模式。这一章咱们将学习更多在理论开发中会应用到的货色: 本章波及的内容有: 创立和应用ZMQ套接字应用套接字发送和接管音讯应用ZMQ提供的异步I/O套接字构建你的应用程序在繁多线程中应用多个套接字失当地解决致命和非致命谬误解决诸如Ctrl-C的中断信号正确地敞开ZMQ应用程序查看ZMQ应用程序的内存泄露发送和接管多帧音讯在网络中转发音讯建设简略的音讯队列代理应用ZMQ编写多线程应用程序应用ZMQ在线程间传递信号应用ZMQ协调网络中的节点应用标识创立长久化套接字在公布-订阅模式中创立和应用音讯信封如何让长久化的订阅者可能从解体中复原应用阈值(HWM)避免内存溢出零的哲学ØMQ一词中的Ø让咱们纠结了很久。一方面,这个特殊字符会升高ZMQ在谷歌和推特中的收录量;另一方面,这会惹恼某些丹麦语种的民族,他们会嚷道Ø并不是一个奇怪的0。 一开始ZMQ代表零中间件、零提早,同时,它又有了新的含意:零治理、零老本、零节约。总的来说,零示意最小、最简,这是贯通于该项目标哲理。咱们致力于缩小复杂程度,进步易用性。 套接字API说实话,ZMQ有些移花接木的嫌疑。不过咱们并不会为此赔罪,因为这种概念上的切换相对不会有害处。ZMQ提供了一套相似于BSD套接字的API,但将很多音讯解决机制的细节暗藏了起来,你会逐步适应这种变动,并乐于用它进行编程。 套接字事实上是用于网络编程的标准接口,ZMQ之所那么吸引人眼球,起因之一就是它是建设在规范套接字API之上。因而,ZMQ的套接字操作非常容易了解,其生命周期次要蕴含四个局部: 创立和销毁套接字:zmq_socket(), zmq_close()配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()为套接字建设连贯:zmq_bind(), zmq_connect()发送和接管音讯:zmq_send(), zmq_recv()如以下C代码: void *mousetrap;// Create socket for catching micemousetrap = zmq_socket (context, ZMQ_PULL);// Configure the socketint64_t jawsize = 10000;zmq_setsockopt (mousetrap, ZMQ_HWM, &jawsize, sizeof jawsize);// Plug socket into mouse holezmq_connect (mousetrap, "tcp://192.168.55.221:5001");// Wait for juicy mouse to arrivezmq_msg_t mouse;zmq_msg_init (&mouse);zmq_recv (mousetrap, &mouse, 0);// Destroy the mousezmq_msg_close (&mouse);// Destroy the socketzmq_close (mousetrap);请留神,套接字永远是空指针类型的,而音讯则是一个数据结构(咱们下文会讲述)。所以,在C语言中你通过变量传递套接字,而用援用传递音讯。记住一点,在ZMQ中所有的套接字都是由ZMQ治理的,只有音讯是由程序员治理的。 创立、销毁、以及配置套接字的工作和解决一个对象差不多,但请记住ZMQ是异步的,伸缩性很强,因而在将其利用到网络结构中时,可能会须要多一些工夫来了解。 应用套接字构建拓扑构造在连贯两个节点时,其中一个须要应用zmq_bind(),另一个则应用zmq_connect()。通常来讲,应用zmq_bind()连贯的节点称之为服务端,它有着一个较为固定的网络地址;应用zmq_connect()连贯的节点称为客户端,其地址不固定。咱们会有这样的说法:绑定套接字至端点;连贯套接字至端点。端点指的是某个广为周知网络地址。 ZMQ连贯和传统的TCP连贯是有区别的,次要有: 应用多种协定,inproc(过程内)、ipc(过程间)、tcp、pgm(播送)、epgm;当客户端应用zmq_connect()时连贯就曾经建设了,并不要求该端点已有某个服务应用zmq_bind()进行了绑定;连贯是异步的,并由一组音讯队列做缓冲;连贯会体现出某种音讯模式,这是由创立连贯的套接字类型决定的;一个套接字能够有多个输出和输入连贯;ZMQ没有提供相似zmq_accept()的函数,因为当套接字绑定至端点时它就主动开始承受连贯了;应用程序无奈间接和这些连贯打交道,因为它们是被封装在ZMQ底层的。在很多架构中都应用了相似于C/S的架构。服务端组件式比较稳定的,而客户端组件则较为动静,来去自如。所以说,服务端地址对客户端而言往往是可见的,反之则不然。这样一来,架构中应该将哪些组件作为服务端(应用zmq_bind()),哪些作为客户端(应用zmq_connect()),就很显著了。同时,这须要和你应用的套接字类型相分割起来,咱们下文会具体讲述。 让咱们试想一下,如果先关上了客户端,后关上服务端,会产生什么?传统网络连接中,咱们关上客户端时肯定会收到零碎的报错信息,但ZMQ让咱们可能自在地启动架构中的组件。当客户端应用zmq_connect()连贯至某个端点时,它就曾经可能应用该套接字发送音讯了。如果这时,服务端启动起来了,并应用zmq_bind()绑定至该端点,ZMQ将主动开始转发音讯。 服务端节点能够仅应用一个套接字就能绑定至多个端点。也就是说,它可能应用不同的协定来建设连贯: zmq_bind (socket, "tcp://*:5555");zmq_bind (socket, "tcp://*:9999");zmq_bind (socket, "ipc://myserver.ipc");当然,你不能屡次绑定至同一端点,这样是会报错的。 ...

January 22, 2021 · 13 min · jiezi

关于mq:ZMQ-指南第一章-ZeroMQ基础

ZMQ 指南作者: Pieter Hintjens <ph@imatix.com>, CEO iMatix Corporation. 翻译: 张吉 <jizhang@anjuke.com>, 安居客团体 好租网工程师 With thanks to Bill Desmarais, Brian Dorsey, CAF, Daniel Lin, Eric Desgranges, Gonzalo Diethelm, Guido Goldstein, Hunter Ford, Kamil Shakirov, Martin Sustrik, Mike Castleman, Naveen Chawla, Nicola Peduzzi, Oliver Smith, Olivier Chamoux, Peter Alexander, Pierre Rouleau, Randy Dryburgh, John Unwin, Alex Thomas, Mihail Minkov, Jeremy Avnet, Michael Compton, Kamil Kisiel, Mark Kharitonov, Guillaume Aubert, Ian Barber, Mike Sheridan, Faruk Akgul, Oleg Sidorov, Lev Givon, Allister MacLeod, Alexander D'Archangel, Andreas Hoelzlwimmer, Han Holl, Robert G. Jakabosky, Felipe Cruz, Marcus McCurdy, Mikhail Kulemin, Dr. Gergő Érdi, Pavel Zhukov, Alexander Else, Giovanni Ruggiero, Rick "Technoweenie", Daniel Lundin, Dave Hoover, Simon Jefford, Benjamin Peterson, Justin Case, Devon Weller, Richard Smith, Alexander Morland, Wadim Grasza, Michael Jakl, and Zed Shaw for their contributions, and to Stathis Sideris for Ditaa. ...

January 22, 2021 · 8 min · jiezi

关于mq:RabbitMQ安装

https://www.erlang.org/downlo... 装置:tar -zxvf otp_src_19.3.tar.gzcd otp_src_19.3/./configure --prefix=/opt/erlang yum install ncurses-devel openssl-devel unixODBC-devel gcc-c++ 装置wxWidgetshttp://www.wxwidgets.org/down...https://github.com/wxWidgets/... yum install gtk3.i686yum install gtk3-develyum install mesa*yum install freeglut* ./configure --with-opengl --enable-debug --enable-unicode

November 9, 2020 · 1 min · jiezi

关于mq:中间件MQ如何保证消息的可靠性

分布式事务的一种计划:音讯可靠性 + 最终一致性, 那么 如何保障音讯的可靠性? 1. 音讯失落(最重要)音讯失落:如何保障音讯不失落? 1.1. 网络起因发不出-> 记录日志+定时工作解决比方网络起因公布进来:怎么弄? catch异样从新while(true)发几次?NoNoNo! 如果是网络起因, 很可能发不进来, 正确做法: 数据库建一个mq_message的表, 每发一条mq音讯都记录到库;定时工作扫, 状态是 "未发送胜利" 的记录, 从新发送;1.2 Broker长久化失败->生产者发送确认+失败重发咱们的音讯都要设置长久化保留, 咱们胜利将音讯发送到mq的Broker,然而broker须要将音讯长久化, 而后能力进入到queue里, 这个两头mq挂了, 怎么保障音讯不失落? Producer-->(Broker(Exchange)-->(Queue))-->Consumer 咱们应用生产者发送者的确认机制: 只有发送到broker, broker长久化实现音讯到达送给队列当前, broker才会给发送者确认;这样就能保障发送胜利 1.3 Consumer生产失落->敞开主动确认, 应用手动ACKMQ生产时宕机或生产失败?敞开生产时主动确认, 应用 手动manual ACK, 生产完确认! 2. 音讯反复2.1 音讯反复的2种情景:比方消费者生产音讯, 收到了两遍 consumer生产失败, 手动basicReject回绝了, 这种是容许的;consumer生产实现,手动ACK之前忽然宕机, 此时没给broker确认音讯生产实现;这样, 就有可能会再发给其余的消费者,造成音讯反复; 生产胜利,ack宕机,音讯由 unack变为ready, 而后broker会从新发送主旨: 设计生产时, 做到 幂等生产 2.2 幂等生产2.2.1 生产做状态判断业务设置状态, 生产时做状态判断, 解决过就疏忽 2.2.2 防重表:惟一标识防重表:发送的每一个音讯都有惟一标识, 解决过就疏忽 3. 音讯积压音讯积压会带来MQ性能的降落, 所以肯定要解决音讯积压的问题 3.1 音讯积压的3种起因3.1.1 消费者宕机3.1.2 消费者自身生产能力有余3.1.3 发送者流量太大3.2 解决音讯积压的计划3.2.1 限度业务流量+限度发送流量(上策)3.2.2 上线更多的消费者, 加大生产规模3.2.2.3 若生产逻辑简单,能够设置:先纯生产-记录数据库-离线解决能够解决mq压力, 转移mq压力 ...

September 9, 2020 · 1 min · jiezi

关于mq:常用的MQ中间件技术选型比较

参考:https://www.cnblogs.com/imstudy/p/11064589.html

August 20, 2020 · 1 min · jiezi

消息队列-MQ-1-3分钟让你快速了解消息队列

1 什么是音讯队列队列是一个根底的“先进先出”的数据结构,音讯队列,就是一个用来寄存音讯的队列。一个根本的音讯队列的模型如下图,生产者负责往音讯队列里写入音讯,消费者通过订阅生产者公布的音讯就能够生产音讯。 2 为什么要音讯队列次要的性能有利用解藕,流量消峰,音讯散发,除此之前还有放弃一致性和不便动静扩容等性能。 1.利用解耦比方电商零碎,用户创立订单,会波及到,“订单零碎”,”库存零碎“,“物流零碎” 和 “领取零碎”,如果这些是耦合的,则任何一个零碎的故障都会导致订单创立失败。 可是如果应用音讯队列进行利用解耦,则如下图,比方“物流零碎”产生故障,则物流零碎须要解决的内容被缓存在音讯队列里,用户的下单操作仍然能够实现,等到几分钟后物流零碎复原,这时再补充解决缓存在音讯队列里的订单音讯即可,终端用户是感知不到这几分钟的系统故障的。 2.流量消峰当高并发的时候,利用零碎的压力增大,此时通过音讯队列把大量的申请暂存起来,而后扩散到绝对长的一段时间内解决,大大缓解利用零碎的压力。 仍然以电商零碎为例,比方双11秒杀流动,假如“订单零碎” 每秒能解决10000次申请,可是此时流入的是20000次申请。要解决这样的状况,要么: 减少服务器性能,让他能反对20000次申请,可是这种形式不够经济,因为大多数状况下10000次申请的解决能力曾经入不敷出。订单超过10000次后就不容许用户下单,这个想也不太可能应用音讯队列,把1秒内下单订单扩散成一段时间来解决。这时尽管有些用户能在下单后几十秒能力收到下单胜利的状态。可是总比不能下单的体验好。 3.音讯散发数据产生方只须要把数据放到音讯队列,数据生产方依据本人的须要订阅感兴趣的数据,不同数据生产方能够反复也能够不反复,互不烦扰,也不须要和数据产生方关联。 如下图各个子系统将日志数据不停的写入音讯队列, 数据生产方比方“日志解决1” 还能够把本人解决后的数据从新放入音讯队列供其余数据生产方应用,能够防止反复计算。 应用场景,比方对于用户数据进行用户画像,精准推送等。 4.异步A零碎接管一个申请后,须要: 在本人本地写入数据 3ms在B零碎写入数据 300ms在C零碎写入数据 450ms在D零碎写入数据 200ms则总共须要 3+300+450+200=980ms 可是如果应用MQ,A零碎间断发送3条申请到MQ 5ms,则此时A零碎从承受一个申请到返回响应给客户,总共 3+5=8ms, 比之前的980ms快了很多。 3 音讯队列的两种模式点对点与公布式订阅. 3.1 PTP点对点音讯从一个生产者(producer/sender)通过一个队列传给另一个消费者(consumer/receiver)。能够有多个消费者监听一个队列,然而如果有一个消费者生产了音讯,则其余消费者则获取不到音讯了。所以叫做点到点模式。发送者和接收者之间在工夫上没有依赖性,队列保留着音讯,能够放在内存 中也能够长久化,直到他们被生产或超时。 3.2 公布订阅 Publish/Subscribe发布者把音讯公布到一个topic,多个订阅者能够订阅同一个topic,公布的音讯能够被所有的订阅者生产。所以叫做公布订阅模式。发布者和订阅者之间有工夫上的依赖性。如果发布者公布音讯的时候,订阅者处于某种原因没有接管到,这个音讯就失落了。 4 音讯队列的毛病零碎可用性升高:零碎引入的内部依赖越多则越容易挂掉,以下面的电商为例子,原本订单零碎,调用领取零碎,库存零碎,物流零碎的接口就好了,当初引入MQ进入,万一MQ挂了,整套零碎就奔溃了,所以升高零碎可用性。 零碎复杂性减少:退出MQ后,须要思考,如何保障一致性,如何保障音讯不被反复生产,如何报纸额音讯的传递程序,如何保障音讯牢靠传输等等。 一致性问题:比方下面的电商零碎,订单零碎解决完,间接返回胜利,人都认为你胜利了,然而问题是,物流零碎,领取零碎和库存零碎,只有其中一个挂了,那就会呈现数据不统一的问题。 音讯反复生产的问题:比方kafka里,kafka会给每条数据调配一个offset,消费者依照offset的编号程序去生产,生产后提交offset。消费者不是一生产好一条数据就立即去提交offset,是定期提交一次offset,如果在消费者还没提交的时候,消费者的过程重启了,那么此时曾经生产过的音讯的offset就没有提交,这次kafka会认为你没有生产这条数据,而后从新给你发送。导致反复生产。 数据失落:生产者在写音讯的时候,在网络传输过程就失落了生产者把音讯存到MQ,可是MQ本人出错了,没有保留胜利MQ收到音讯,并且暂存在内存里,可是在消费者还没来得及生产之前MQ挂了,导致暂存在内存的数据失落消费者生产到这个音讯,可是还没来得及解决本人就坏了,导致MQ认为他曾经解决完了,不再给他发消息。 5 常见的音讯队列 此外还有Amazon SQS/SNS 和 Aliyun MQTT.下一章节会着重介绍下 Aliyun MQTT和 RocketMQ的区别和应用场景。 参考:《RocketMQ实战与原理解析》https://my.oschina.net/u/4383...https://www.jianshu.com/p/fdd...https://www.erlang-solutions....

July 13, 2020 · 1 min · jiezi

聊聊rocketmqclientgo的QueueSelector

序本文主要研究一下rocketmq-client-go的QueueSelector QueueSelectorrocketmq-client-go-v2.0.0/producer/selector.go type QueueSelector interface { Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue}QueueSelector接口,定义了Select方法manualQueueSelectorrocketmq-client-go-v2.0.0/producer/selector.go type manualQueueSelector struct{}func NewManualQueueSelector() QueueSelector { return new(manualQueueSelector)}func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { return message.Queue}manualQueueSelector的select方法直接返回message.QueueNewRandomQueueSelectorrocketmq-client-go-v2.0.0/producer/selector.go type randomQueueSelector struct { rander *rand.Rand}func NewRandomQueueSelector() QueueSelector { s := new(randomQueueSelector) s.rander = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) return s}func (r randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { i := r.rander.Intn(len(queues)) return queues[i]}NewRandomQueueSelector方法先创建randomQueueSelector,然后设置其rander;Select方法通过r.rander.Intn(len(queues))随机选择index,然后从queue取值roundRobinQueueSelectorrocketmq-client-go-v2.0.0/producer/selector.go type roundRobinQueueSelector struct { sync.Locker indexer map[string]*int32}func NewRoundRobinQueueSelector() QueueSelector { s := &roundRobinQueueSelector{ Locker: new(sync.Mutex), indexer: map[string]*int32{}, } return s}func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { t := message.Topic if _, exist := r.indexer[t]; !exist { r.Lock() if _, exist := r.indexer[t]; !exist { var v = int32(0) r.indexer[t] = &v } r.Unlock() } index := r.indexer[t] i := atomic.AddInt32(index, 1) if i < 0 { i = -i atomic.StoreInt32(index, 0) } qIndex := int(i) % len(queues) return queues[qIndex]}roundRobinQueueSelector的qIndex为int(i) % len(queues)hashQueueSelectorrocketmq-client-go-v2.0.0/producer/selector.go ...

July 4, 2020 · 2 min · jiezi

activemq-延时消息

在mq 的安装目录下 的conf/activemq.xml在配置文件的40 行里加上schedulerSupport="true"<broker xmlns\="http://activemq.apache.org/schema/core" brokerName\="localhost" dataDirectory\="${activemq.data}" schedulerSupport\="true"\> 将修改的文件保存,服务重启,mq 的延时功能就可以正常执行了

June 30, 2020 · 1 min · jiezi