关于消息队列:消息队列
理解三者有什么区别? 各自的利用场景? KafkaRabbit MqRocket Mq
理解三者有什么区别? 各自的利用场景? KafkaRabbit MqRocket Mq
本文作者:李伟,社区里大家叫小伟,Apache RocketMQ Committer,RocketMQ Python客户端我的项目Owner ,Apache Doris Contributor,腾讯云RocketMQ开发工程师。 01 传输架构图 Namesrv:5.1.0 Broker:5.1.0 Dashboard:1.0.1-SNAPSHOT 02 筹备Namesrv、Broker、Client的ca证书、密钥 以下全副操作在的目录在:/etc/rocketmq, 并且Namesrv、Broker、Dashboard在同一个机器上 实际操作时, dashboard或者客户端能够是其余的机器 生成ca签名证书填写与反复填写ca证书明码。理论填写的时候是输出的字符是看不见的。 openssl req -newkey rsa:2048 -keyout ca_rsa_private.pem -x509 -days 365 -out ca.pem填写其余信息, 不填的话应用 “.” 生成ca签名证书 生成公私密钥。提供给客户端-服务端加密传输应用openssl req -newkey rsa:2048 -keyout server_rsa.key -out server.csrGenerating a 2048 bit RSA private key 生成加密密钥对 生成Namesrv、Broker加密密钥对,并且签发Namesrv、Broker证书openssl req -newkey rsa:2048 -keyout server_rsa.key -out server.csrGenerating a 2048 bit RSA private key 生成Namesrv、Broker密钥,签发证书 打包并加密Namesrv、Broker私钥 增加Namesrv、Broker应用的tls配置文件tls-broker.propertiestls.test.mode.enable=falsetls.server.need.client.auth=nonetls.server.keyPath=/etc/rocketmq/server.keytls.server.keyPassword=123456tls.server.certPath=/etc/rocketmq/server.pemtls.client.authServer=falsetls.client.trustCertPath=/etc/rocketmq/ca.pemtls-namesrv.propertiestls.test.mode.enable=falsetls.server.need.client.auth=nonetls.server.keyPath=/etc/rocketmq/server.keytls.server.keyPassword=123456tls.server.certPath=/etc/rocketmq/server.pemtls-client.propertiestls.client.trustCertPath=/etc/rocketmq/ca.pem至此,咱们失去了全副的tls配置文件: 全副配置文件 ...
音讯队列中间件是分布式系统中重要的组件,次要解决利用耦合,异步音讯,流量削锋等问题 实现高性能,高可用,可伸缩和最终一致性架构。最全面的Java面试网站 应用较多的音讯队列有 RocketMQ,RabbitMQ,Kafka,ZeroMQ,MetaMQ 以下介绍音讯队列在理论利用中罕用的应用场景。 异步解决,利用解耦,流量削锋、日志解决和音讯通信五个场景。 场景 1:异步解决场景阐明:用户注册后,须要发注册邮件和注册短信。传统的做法有两种 1.串行的形式;2.并行形式 本文曾经收录到Github仓库,该仓库蕴含计算机根底、Java根底、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等外围知识点,欢送star~ Github地址 如果拜访不了Github,能够拜访gitee地址。 gitee地址 (1)串行形式:将注册信息写入数据库胜利后,发送注册邮件,再发送注册短信。以上三个工作全副实现后,返回给客户端 (2)并行形式:将注册信息写入数据库胜利后,发送注册邮件的同时,发送注册短信。以上三个工作实现后,返回给客户端。与串行的差异是,并行的形式能够进步解决的工夫 假如三个业务节点每个应用 50 毫秒钟,不思考网络等其余开销,则串行形式的工夫是 150 毫秒,并行的工夫可能是 100 毫秒。 因为 CPU 在单位工夫内解决的申请数是肯定的,假如 CPU1 秒内吞吐量是 100 次。则串行形式 1 秒内 CPU 可解决的申请量是 7 次(1000/150)。并行形式解决的申请量是 10 次(1000/100) 小结:如以上案例形容,传统的形式零碎的性能(并发量,吞吐量,响应工夫)会有瓶颈。如何解决这个问题呢?引入音讯队列,将不是必须的业务逻辑,异步解决。革新后的架构如下: 依照以上约定,用户的响应工夫相当于是注册信息写入数据库的工夫,也就是 50 毫秒。注册邮件,发送短信写入音讯队列后,间接返回,因而写入音讯队列的速度很快,根本能够疏忽,因而用户的响应工夫可能是 50 毫秒。因而架构扭转后,零碎的吞吐量进步到每秒 20 QPS。比串行进步了 3 倍,比并行进步了两倍 场景 2:利用解耦场景阐明:用户下单后,订单零碎须要告诉库存零碎。传统的做法是,订单零碎调用库存零碎的接口。如下图 传统模式的毛病: 如果库存零碎无法访问,则订单减库存将失败,从而导致订单失败订单零碎与库存零碎耦合如何解决以上问题呢?引入利用音讯队列后的计划,如下图: 订单零碎:用户下单后,订单零碎实现长久化解决,将音讯写入音讯队列,返回用户订单下单胜利库存零碎:订阅下单的音讯,采纳拉/推的形式,获取下单信息,库存零碎依据下单信息,进行库存操作如果:在下单时库存零碎不能失常应用。也不影响失常下单,因为下单后,订单零碎写入音讯队列就不再关怀其余的后续操作了。实现订单零碎与库存零碎的利用解耦场景 3:流量削锋流量削锋也是音讯队列中的罕用场景,个别在秒杀或团抢流动中应用宽泛 利用场景:秒杀流动,个别会因为流量过大,导致流量暴增,利用挂掉。为解决这个问题,个别须要在利用前端退出音讯队列。 能够管制流动的人数能够缓解短时间内高流量压垮利用 用户的申请,服务器接管后,首先写入音讯队列。如果音讯队列长度超过最大数量,则间接摈弃用户申请或跳转到谬误页面秒杀业务依据音讯队列中的申请信息,再做后续解决场景 4:日志解决日志解决是指将音讯队列用在日志解决中,比方 Kafka 的利用,解决大量日志传输的问题。架构简化如下 日志采集客户端,负责日志数据采集,定时写入 Kafka 队列Kafka 音讯队列,负责日志数据的接管,存储和转发日志解决利用:订阅并生产 kafka 队列中的日志数据以下是新浪 kafka 日志解决利用案例 ...
引言在探索 Kafka 外围常识之前,咱们先思考一个问题:什么场景会促使咱们应用Kafka? 说到这里,咱们头脑中或多或少会蹦出异步解耦和削峰填谷等字样,是的,这就是 Kafka 最重要的落地场景。 异步解耦:同步调用转换成异步音讯告诉,实现生产者和消费者的解耦。设想一个场景,在商品交易时,在订单创立实现之后,须要触发一系列其余的操作,比方进行用户订单数据的统计、给用户发送短信、给用户发送邮件等等。如果所有操作都采纳同步形式实现,将重大影响零碎性能。针对此场景,咱们能够利用消息中间件解耦订单创立操作和其余后续行为。削峰填谷:利用 Broker 缓冲上游生产者刹时突发的流量,使消费者生产流量整体平滑。对于发送能力很强的上游零碎,如果没有消息中间件的爱护,上游零碎可能会间接被压垮导致全链路服务雪崩。设想秒杀业务场景,上游业务发动下单申请,上游业务执行秒杀业务(库存查看,库存解冻,余额解冻,生成订单等等),上游业务解决的逻辑是相当简单的,并发能力无限,如果上游服务不做限流策略,刹时可能把上游服务压垮。针对此场景,咱们能够利用 MQ 来做削峰填谷,让顶峰流量填充低谷闲暇资源,达到系统资源的正当利用。通过上述例子能够发现交易、领取等场景常须要异步解耦和削峰填谷性能解决问题,而交易、领取等场景对性能、可靠性要求特地高。那么,咱们本文的配角Kafka是否满足相应要求呢?上面咱们来探讨下。 Kafka 宏观认知在探索 Kafka 的高性能、高可靠性之前,咱们从宏观上来看下 Kafka 的零碎架构 如上图所示,Kafka 由 Producer、Broker、Consumer 以及负责集群治理的 ZooKeeper 组成,各局部性能如下: Producer: 生产者,负责音讯的创立并通过肯定的路由策略发送音讯到适合的 Broker;Broker:服务实例,负责音讯的长久化、直达等性能;Consumer :消费者,负责从 Broker 中拉取(Pull)订阅的音讯并进行生产,通常多个消费者形成一个分组,音讯只能被同组中的一个消费者生产;ZooKeeper:负责 Broker、Consumer 集群元数据的治理等;(留神:Producer 端间接连贯 Broker,不在 ZK 上存任何数据,只是通过 ZK 监听 Broker 和 Topic 等信息)上图音讯流转过程中,还有几个特地重要的概念—主题(Topic)、分区(Partition)、分段(Segment)、位移(Offset)。 topic:音讯主题。Kafka 按 Topic 对音讯进行分类,咱们在收发音讯时只需指定 Topic。partition:分区。为了晋升零碎的吞吐,一个 Topic 下通常有多个 Partition,Partition 散布在不同的 Broker 上,用于存储 Topic 的音讯,这使 Kafka 能够在多台机器上解决、存储音讯,给 Kafka 提供给了并行的音讯解决能力和横向扩容能力。另外,为了晋升零碎的可靠性,Partition 通常会分组,且每组有一个主 Partition、多个正本 Partition,且散布在不同的 Broker上,从而起到容灾的作用。Segment:分段。宏观上看,一个 Partition 对应一个日志(Log)。因为生产者生产的音讯会一直追加到 Log 文件开端,为避免 Log 文件过大导致数据检索效率低下,Kafka 采取了分段和索引机制,将每个 Partition 分为多个 Segment,同时也便于音讯的保护和清理。每个 Segment 蕴含一个.log日志文件、两个索引(.index、timeindex)文件以及其余可能的文件。每个 Segment 的数据文件以该段中最小的 Offset 为文件名,当查找 Offset 的 Message 的时候,通过二分查找快找到 Message 所处于的 Segment 中。Offset:音讯在日志中的地位,音讯在被追加到分区日志文件的时候都会调配一个特定的偏移量。Offset 是音讯在分区中的惟一标识,是一个枯燥递增且不变的值。Kafka 通过它来保障音讯在分区内的程序性,不过 Offset 并不逾越分区,也就是说,Kafka 保障的是分区有序而不是主题有序。Kafka 高可靠性、高性能探索在对 Kafka 的整体零碎框架及相干概念简略理解后,上面咱们来进一步深入探讨下高可靠性、高性能实现原理。 ...
本文曾经收录到Github仓库,该仓库蕴含计算机根底、Java根底、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等外围知识点,欢送star~ Github地址:https://github.com/Tyson0314/Java-learning 为什么要应用音讯队列?总结一下,次要三点起因:解耦、异步、削峰。 1、解耦。比方,用户下单后,订单零碎须要告诉库存零碎,如果库存零碎无法访问,则订单减库存将失败,从而导致订单操作失败。订单零碎与库存零碎耦合,这个时候如果应用音讯队列,能够返回给用户胜利,先把音讯长久化,等库存零碎复原后,就能够失常生产减去库存了。 2、异步。将音讯写入音讯队列,非必要的业务逻辑以异步的形式运行,不影响主流程业务。 3、削峰。生产端缓缓的依照数据库能解决的并发量,从音讯队列中缓缓拉取音讯。在生产中,这个短暂的高峰期积压是容许的。比方秒杀流动,个别会因为流量过大,从而导致流量暴增,利用挂掉。这个时候加上音讯队列,服务器接管到用户的申请后,首先写入音讯队列,如果音讯队列长度超过最大数量,则间接摈弃用户申请或跳转到谬误页面。 应用了音讯队列会有什么毛病零碎可用性升高。引入音讯队列之后,如果音讯队列挂了,可能会影响到业务零碎的可用性。零碎复杂性减少。退出了音讯队列,要多思考很多方面的问题,比方:一致性问题、如何保障音讯不被反复生产、如何保障音讯可靠性传输等。常见的音讯队列比照比照方向概要吞吐量万级的 ActiveMQ 和 RabbitMQ 的吞吐量(ActiveMQ 的性能最差)要比 十万级甚至是百万级的 RocketMQ 和 Kafka 低一个数量级。可用性都能够实现高可用。ActiveMQ 和 RabbitMQ 都是基于主从架构实现高可用性。RocketMQ 基于分布式架构。 kafka 也是分布式的,一个数据多个正本,多数机器宕机,不会失落数据,不会导致不可用时效性RabbitMQ 基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。其余三个都是 ms 级。性能反对除了 Kafka,其余三个性能都较为齐备。 Kafka 性能较为简单,次要反对简略的 MQ 性能,在大数据畛域的实时计算以及日志采集被大规模应用,是事实上的规范音讯失落ActiveMQ 和 RabbitMQ 失落的可能性非常低, RocketMQ 和 Kafka 实践上不会失落。总结: ActiveMQ 的社区算是比拟成熟,然而较目前来说,ActiveMQ 的性能比拟差,而且版本迭代很慢,不举荐应用。RabbitMQ 在吞吐量方面尽管稍逊于 Kafka 和 RocketMQ ,然而因为它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。然而也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做 erlang 源码级别的钻研和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种音讯队列中,RabbitMQ 肯定是你的首选。如果是大数据畛域的实时计算、日志采集等场景,用 Kafka 是业内规范的,相对没问题,社区活跃度很高,相对不会黄,何况简直是全世界这个畛域的事实性标准。RocketMQ 阿里出品,Java 系开源我的项目,源代码咱们能够间接浏览,而后能够定制本人公司的 MQ,并且 RocketMQ 有阿里巴巴的理论业务场景的实战考验。RocketMQ 社区活跃度绝对较为个别,不过也还能够,文档相对来说简略一些,而后接口这块不是依照规范 JMS 标准走的有些零碎要迁徙须要批改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被摈弃,社区黄掉的危险,那如果你们公司有技术实力我感觉用 RocketMQ 挺好的Kafka 的特点其实很显著,就是仅仅提供较少的外围性能,然而提供超高的吞吐量,ms 级的提早,极高的可用性以及可靠性,而且分布式能够任意扩大。同时 kafka 最好是撑持较少的 topic 数量即可,保障其超高吞吐量。kafka 惟一的一点劣势是有可能音讯反复生产,那么对数据准确性会造成极其轻微的影响,在大数据畛域中以及日志采集中,这点轻微影响能够疏忽这个个性人造适宜大数据实时计算以及日志收集。如何保障音讯队列的高可用?RabbitMQ:镜像集群模式 ...
本文作者:倪泽,Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer 01 背景 RocketMQ Streams 1.1.0版本已于近期公布,绝对之前版本有以下改良和优化: 1、API层面反对泛型,可自定义输入输出数据; 2、去掉冗余逻辑,简化代码,重写拓扑图构建和数据处理过程; 本文章承接上篇:RocketMQ Streams 1.1.0: 轻量级流解决再登程,从实现原理上介绍RocketMQ Streams是如何实现流计算拓扑图构建的以及探讨了数据流转过程和流转过程中的状态变动。 02 流解决拓扑构建过程 public class example { public static void main(String[] args) { StreamBuilder builder = new StreamBuilder("wordCount"); builder.source("sourceTopic", total -> { String value = new String(total, StandardCharsets.UTF_8); return new Pair<>(null, value); }) .flatMap((ValueMapperAction<String, List<String>>) value -> { String[] splits = value.toLowerCase().split("\W+"); return Arrays.asList(splits); }) .keyBy(value -> value) .count() .toRStream() .print(); TopologyBuilder topologyBuilder = builder.build(); Properties properties = new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") { @Override public void run() { rocketMQStream.stop(); } }); rocketMQStream.start(); }}在使用者书写上述及连表达式时,产生第一次构建,即逻辑节点的增加,前后算子具备父子关系,构建后造成逻辑节点,多个逻辑节点造成链表。 ...
本文曾经收录到Github仓库,该仓库蕴含计算机根底、Java根底、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等外围知识点,欢送star~ Github地址:https://github.com/Tyson0314/... 如果让你来设计一个 MQ,该如何下手?须要思考哪些问题?又有哪些技术挑战? 对于 MQ 来说,不论是 RocketMQ、Kafka 还是其余音讯队列,它们的实质都是:一发一存一生产。上面咱们以这个实质作为根,一起由浅入深地聊聊 MQ。 从 MQ 的实质说起将 MQ 掰开了揉碎了来看,都是「一发一存一生产」,再直白点就是一个「转发器」。 生产者先将音讯投递一个叫做「队列」的容器中,而后再从这个容器中取出音讯,最初再转发给消费者,仅此而已。 下面这个图便是音讯队列最原始的模型,它蕴含了两个关键词:音讯和队列。 1、音讯:就是要传输的数据,能够是最简略的文本字符串,也能够是自定义的简单格局(只有能按预约格局解析进去即可)。 2、队列:大家应该再相熟不过了,是一种先进先出数据结构。它是寄存音讯的容器,音讯从队尾入队,从队头出队,入队即发消息的过程,出队即收音讯的过程。 原始模型的进化再看明天咱们最罕用的音讯队列产品(RocketMQ、Kafka 等等),你会发现:它们都在最原始的音讯模型上做了扩大,同时提出了一些新名词,比方:主题(topic)、分区(partition)、队列(queue)等等。 要彻底了解这些形形色色的新概念,咱们化繁为简,先从音讯模型的演进说起(情理好比:架构从来不是设计进去的,而是演进而来的) 2.1 队列模型最后的音讯队列就是上一节讲的原始模型,它是一个严格意义上的队列(Queue)。音讯依照什么程序写进去,就依照什么程序读出来。不过,队列没有 “读” 这个操作,读就是出队,从队头中 “删除” 这个音讯。 这便是队列模型:它容许多个生产者往同一个队列发送音讯。然而,如果有多个消费者,实际上是竞争的关系,也就是一条音讯只能被其中一个消费者接管到,读完即被删除。 2.2 公布-订阅模型如果须要将一份音讯数据分发给多个消费者,并且每个消费者都要求收到全量的音讯。很显然,队列模型无奈满足这个需要。 一个可行的计划是:为每个消费者创立一个独自的队列,让生产者发送多份。这种做法比拟笨,而且同一份数据会被复制多份,也很节约空间。 为了解决这个问题,就演化出了另外一种音讯模型:公布-订阅模型。 在公布-订阅模型中,寄存音讯的容器变成了 “主题”,订阅者在接管音讯之前须要先 “订阅主题”。最终,每个订阅者都能够收到同一个主题的全量音讯。 认真比照下它和 “队列模式” 的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。惟一的不同点在于:一份音讯数据是否能够被屡次生产。 2.3 小结最初做个小结,下面两种模型说白了就是:单播和播送的区别。而且,当公布-订阅模型中只有 1 个订阅者时,它和队列模型就一样了,因而在性能上是齐全兼容队列模型的。 这也解释了为什么古代支流的 RocketMQ、Kafka 都是间接基于公布-订阅模型实现的?此外,RabbitMQ 中之所以有一个 Exchange 模块?其实也是为了解决音讯的投递问题,能够变相实现公布-订阅模型。 包含大家接触到的 “生产组”、“集群生产”、“播送生产” 这些概念,都和下面这两种模型相干,以及在利用层面大家最常见的情景:组间播送、组内单播,也属于此领域。 所以,先把握一些共性的实践,对于大家再去学习各个消息中间件的具体实现原理时,其实能更好地抓住实质,分清概念。 透过模型看 MQ 的利用场景目前,MQ 的利用场景十分多,大家能滚瓜烂熟的是:零碎解耦、异步通信和流量削峰。除此之外,还有提早告诉、最终一致性保障、程序音讯、流式解决等等。 那到底是先有音讯模型,还是先有利用场景呢?答案必定是:先有利用场景(也就是先有问题),再有音讯模型,因为音讯模型只是解决方案的形象而已。 MQ 通过 30 多年的倒退,能从最原始的队列模型倒退到明天百花齐放的各种消息中间件(平台级的解决方案),我感觉万变不离其宗,还是得益于:音讯模型的适配性很广。 咱们试着从新了解下音讯队列的模型。它其实解决的是:生产者和消费者的通信问题。那它比照 RPC 有什么分割和区别呢? ...
12月动静音讯队列 CKafka 版【商业化】国内站专业版反对按小时后付费。 音讯队列 RocketMQ 版【商业化】虚构集群正式商业化:TDMQ RocketMQ 共享版(虚构集群)于 2022年12月28日完结公测,正式商业化开始计费,计费形式为按量付费(后付费)。 【新性能】虚构集群收发音讯调用API 限流:为了保障虚构集群的稳定性,TDMQ RocketMQ版会以集群和 Topic 为维度进行限流,您能够在集群监控或者 Topic 监控页面查看对应的限流监控指标。 【新性能】虚构集群反对元数据迁徙:您能够应用元数据迁徙工具将开源版 RocketMQ 的元数据迁徙至腾讯云TDMQ RocketMQ 共享版(虚构集群)。 【新性能】新增反对 HTTP 协定接入,用户能够在集群详情页查看 HTTP 接入地址进行接入。 【新性能】音讯查问性能反对批量或者单条重发死信音讯,死信音讯在被从新发送后,会被投递到原队列的重试队列,但不会在死信队列中被立刻删除,在达到音讯生命周期(3天)后才会被删除。 【新性能】专享集群公网带宽反对平安组,开明公网带宽计费后,反对依据理论须要设置公网的IP白名单。 音讯队列 RabbitMQ 版【新性能】专享集群控制台集成vhost、exchange、queu、路由关系、用户和权限、策略管理等开源控制台能力,灵便适配社区应用体验。 【新性能】数据流、管控流公网地址提供可选能力。 音讯队列 Pulsar 版【白名单性能】Pulsar 专业版商业化广州公布,多租户物理隔离,提供丰盛的规格供选择,适宜于对稳定性和资源隔离性要求高、业务流量大的生产环境。 2023年 1月预报音讯队列 RocketMQ 版【新性能】音讯查问页面新增“查问近100条音讯”选项,查问后果确保严格的工夫先后顺序,以解决查问后果分页之间没有严格依照工夫程序的问题。 【新性能】管控台反对批量删除冗余的topic和group资源。 【稳定性优化】为了保障稳定性,防止元数据冗余,敞开group主动创立性能。 音讯队列 RabbitMQ 版【新性能】反对插件治理。 【新性能】反对设置limit。 【新性能】反对TLS传输加密。 【新性能】反对3AZ高可用部署。 音讯队列 Pulsar 版【新性能】虚构集群到业余集群的平滑迁徙反对。 【新性能】主动创立重试/死信队列的命名规定优化。 【新性能】反对订阅下提早音讯数量告警。
导语2022腾讯寰球数字生态大会已圆满闭幕,大会以“数实翻新、产业共进”为主题,聚焦数实交融,摸索以全真互联的数字技术助力实体经济高质量倒退。大会设有29个产品技术主题专场、18个行业主题专场和6个生态主题专场,各业务负责人与客户、合作伙伴独特总结经验、凝固共识,推动数实交融新倒退。 本次大会设立了微服务与中间件专场,本专场从产品研发、运维等最佳落地实际登程,具体论述云原生时代,企业在开发微服务和构建云原生中间件过程中应该怎么少走弯路,聚焦业务需要,助力企业倒退翻新。 随着大数据时代的到来,企业在生产和经营流动中产生的各类数据正以前所未有的速度增长,通过对实时及历史数据的交融剖析,及时开掘业务洞察和辅助决策,已成为企业的广泛口头。在云原生的浪潮下,企业须要聚焦业务,迫切需要简单易行,零代码地配置搭建起本人的能够达到将本增效成果的数据链路零碎。 本篇文章将从以下几个方面对围绕着音讯队列如何疾速搭建数据链路的落地实际进行分享。 数据链路构建的挑战技术架构体系的建设客户实际和落地案例视频:https://www.zhihu.com/zvideo/... 数据链路构建的挑战与开源生态数据链路构建的挑战如下图所示,这是一张经典的数据链路的架构图,从左到右顺次能够分为数据源、数据接入层、数据缓冲层、数据处理层和左边的数据指标。在这样一个典型的数据链路里,技术组件十分多,导致整个图非常复杂,这会减少运维老本。 图1 接下来看另一张图,如果把两头局部全副屏蔽掉,这个数据链路变为一款SaaS化的数据接入组件,那它就会十分轻量。 图2 所以在开源生态中,多样的数据源和数据指标,泛滥开源组件的学习老本,数据链路的搭建和运维是整个数据链路零碎次要面对的问题。 企业须要聚焦业务,因而数据链路零碎须要:SAAS 化、低代码化、简略易用、稳固牢靠、高性能、按量付费。以达到整体上的的降本增效。 咱们再回到图1,能够看到,它的缓冲层在业界次要都是 Kafka,而后围绕 Kafka 生态,具备丰盛的上下游,那复杂度、学习老本、保护老本这些问题要如何解决呢?持续往下看。 数据链路性能矩阵 图3 图4 如上图3所示,数据链路由数据源、数据库两局部组成。 数据源文本日志、CVM、容器、平安等 数据库数据库数据、被动上报数据等 这些数据须要解决上报而后发到上游,在业界更多的是 Filebeat、Flink、Logstash 等社区组件。想要达到图3这张图的成果,就须要图4这一堆组件,这就波及到下面提到过的问题。所以就衍生出了一个 SaaS化 的数据链路的计划。 Saas化的数据链路计划CKafka 连接器是腾讯云上 SaaS 化的数据接入和解决解决方案,一站式提供对数据的接入、解决和散发性能。 提供基于 HTTP/TCP 协定的 SDK 帮助客户实现数据上报;基于 CDC 机制订阅、存储多款数据库变更信息;简略可配置的数据荡涤 (ETL) 能力;丰盛的数据散发渠道;买通了混合云/跨云的丰盛的数据源(MQ, 数据库,事件等)数据接入。 帮助客户低成本搭建数据流转链路,构建数据源和数据处理系统间的桥梁。 利用场景数据链路构建在失常业务当中,用户须要将多种数据源的数据通过客户单采集,实时处理缓冲,传到上游的搜寻,这时就能够通过这套链路间接把数据一条链路齐全买通,间接把数据源打到上游的存储,这就十分便当了。 在理论业务过程中,用户常常须要将多个数据源的数据汇总到音讯队列中,比方业务客户端数据、业务 DB 数据、业务的运行日志数据汇总到音讯队列中进行剖析解决。失常状况下,须要先将这些数据进行荡涤格式化后,再做对立的转储、剖析或解决。 CKafka 连接器反对将不同环境(腾讯私有云、用户自建 IDC、跨云、混合云等)的不同数据源(数据库、中间件、日志、利用零碎等)的数据集成到私有云的音讯队列服务中,以便进行数据的解决和散发。提供了数据聚合、存储、解决、转储的能力,即 数据集成 的能力,将不同的数据源连贯到上游的数据指标中。 数据接入散发另外三个场景别离是数据上报、数据库订阅和数据的清理和散发。 客户、业务端或者运维端可能有很多数据须要上报,须要本人搭建一个上报的 Server,但如果应用 Sass 化数据接入产品,它就能够很轻量化的实现数据上报。 数据库订阅和数据的清理散发等性能是一样的原理,须要做的就是把数据从各种数据源很 Saas 化的接进来,而后简略轻量的荡涤进来。 数据上报 数据库数据订阅 ...
10月动静音讯队列 RocketMQ 版【商业化】音讯队列 RocketMQ 版专享集群正式商业化。基于开源RocketMQ打造,兼容社区SDK,具备低提早、高性能、高牢靠、万亿级音讯吞吐等特点。专享版于 5 月开始在内部客户侧进行白名单凋谢和打磨,曾经在多个内部客户落地,波及教育、出行、游戏等多个行业。 购买指南: https://cloud.tencent.com/doc... 【新性能】专享集群反对死信音讯查问,在音讯查问页面,用户能够依据专享集群的 group 或对应的 messsage ID 查看相应的死信音讯。 共享集群(虚构集群)的死信查问性能将在后续版本反对。 【新性能】新增音讯验证能力,查问到特定音讯后,用户能够将指定音讯推送给指定的在线客户端,以检测客户端生产逻辑和后果是否合乎预期等。 音讯队列 RabbitMQ 版【商业化】10月9日起,音讯队列 RabbitMQ 版专享集群正式商业化。基于开源 RabbitMQ 音讯队列引擎,提供稳固牢靠、高扩展性、易用免运维的音讯队列服务。AMQP 协定的标杆,提供灵便的路由适应各类业务的音讯投递规定。目前曾经落地教育、出行、游戏、金融等多个行业的客户。 购买指南: https://cloud.tencent.com/doc... 【新性能】反对自定义配置社区管控台拜访IP白名单,无需再提单加白。 音讯队列 CKafka 版【新性能】反对2021年09月09日前购买的存量标准版实例降级到专业版。 【新性能】反对下载音讯,下载内容蕴含 header,key 和 value。 【新性能】反对在控制台展现所有 VIP 列表,便于在平安组配置放通所有端口。 【新性能】反对实例级别设置默认最大音讯大小,作为新建 Topic 的默认初始值,能够前期独自针对 Topic 进行批改。 【新性能】当 Consumer Group 状态为 Empty 时,反对删除关联的某个 Topic 的订阅关系。 【新性能】反对流出数据到时序数据库(CTSDB)和剖析型数据库 Doris。 【新性能】MariaDB数据订阅、TDSQL-C MYSQL数据订阅反对订阅多库多表,并散发到不同topic。 11月预报音讯队列 RocketMQ 版控制台新增集群、存储和消费者组等维度的监控展现,以及对应指标对接云监控的告警。专享版反对对接公网,并且为公网拜访设置平安规定。group页面展现生产客户端优化,反对查看音讯的生产状态、订阅关系一致性和过滤表达式等。控制台反对发送测试音讯。音讯队列 RabbitMQ 版专享集群反对升配。专享集群反对跨可用区部署,可能让您的实例在单个可用区不可用状况下仍能失常提供服务。反对敞开公网拜访地址。腾讯云控制台集成更多社区管控台能力。音讯队列 CKafka 版上线弹性按量topic,您无需关怀底层资源,按需应用,按量付费。CKafka实例反对后付费,您能够按需抉择流量规格以及磁盘容量等配置项,计费以小时为单位。提供询价接口。更多功能,敬请期待。
你好,我是程序员Alan,很快乐遇见你。 一、RocketMQ目录构造在正式开始搭建调试环境之前,咱们先理解一下RockeMQ源码的整体架构。 这是因为把握了整体架构,能够让咱们迅速理解各个方面的个性,并且能够不便咱们后续疾速定位功能模块对应的代码文件。话不多说,咱们开始看RocketMQ目录构造。 acl: 权限管制。能够给话题指定权限,只有领有权限的消费者才能够进行生产。 broker: RocketMQ 的 Broker 相干代码,用来启动 Broker 过程。Broker 就是用来收客户端发的音讯、存储音讯传、递音讯给生产端的组件。 client:RocketMQ 的 Producer、Consumer 这些客户端的代码,用来生产音讯、生产音讯。 common:公共模块。 distribution:用来部署 RocketMQ 的,比方 bin 目录 ,conf 目录。 example: RocketMQ 的用例。 filter:RocketMQ 过滤器。 namesvr:NameServer 的源码。NameServer 就是所有 Broker 都须要注册的中央,注册核心。 remoting:RocketMQ 的近程网络通信模块。 srvutil:工具类。 store:音讯存储。 style:代码查看。 tools:命令行监控工具相干。 二、获取RocketMQ源码源码地址:https://github.com/apache/roc... 我下载的是5.0.0版本,你也能够在github下载其余版本。 如果下载遇到困难,能够留言或者私信我。 三、导入源码代码下载解压之后,应用IDEA工具导入。 四、下载依赖先确认Maven目录地址,再刷新,期待依赖下载实现。 五、启动RocketMQ的NameServer5.1 配置NameServer启动参数5.1.1 Edit Configurations,配置 ROCKETMQ_HOME 环境变量 5.2 拷贝配置文件Value 的文件夹是用来部署 RocketMQ 的,外面包含 bin 目录 ,conf 目录,store目录。 咱们首先创立一个文件夹,并创立三个子文件夹,别离是 bin ,conf ,store。 ...
《数据密集型利用零碎设计》音讯代理引言音讯代理其实指的就是音讯队列,然而我认为作者这里的代理,是给予零碎架构地位考量的,因为消息中间件的实质就是作为不同服务之间交换的一种媒介。 介绍音讯代理能够看作是 解决数据流进行优化的数据库。 音讯代理通常部署在独立的服务器当中,无论是生产者还是消费者,都有可能来自于不同的服务。整个流程通常为生产者生产数据通过音讯代理当中,消费者连贯音讯代理承受生产者数据进行生产。 音讯存在在两头代理有一个显著的益处是能够屏蔽频繁变动的生产者端和消费者端,将无关音讯代理外部的个性转移到代理中。 比方是否长久化问题,有的音讯代理解决音讯形式是无论是否生产都会存盘(Rocket MQ),保障音讯不会随服务器的宕机而失落音讯数据。 当然也有比拟粗犷的音讯代理解决形式,把音讯放在内存中,一旦敞开立马开释音讯,然而这样也会导致音讯失落。 音讯代理的劣势和劣势都在异步解决,生产者只须要确保生产数据正确发送并且正确存储到音讯代理中,这些步骤解决实现之后,生产者能够接着解决其余业务。 而消费者则不同,消费者能够配置定期获取音讯代理音讯并且查看音讯内容是否属于本人生产领域,这保障了消费者能够及时处理,然而解决生产时效性是不确定的(几分钟、几小时、甚至几天),如果消费者的生产能力或者解决效率过低,就会呈现音讯挤积压的问题。 大部分状况下能够应用有限队列积压音讯和切换生产的者的形式看待生产慢的消费者。 音讯代理比照数据库当初一部分音讯代理设计能够应用两阶段提交,看起来仿佛越来越和数据库进行聚拢。 音讯代理尽管能够看作是优化数据流的数据库,然而音讯队列和数据库是存在实质差距的,次要的差距如下: 数据库须要保障数据的长久化,删除须要指定的命令实现,否则不能擅自失落数据。传统的音讯代理更多设计为音讯胜利传递立马删除音讯,这样被生产过的音讯就不会沉积,也不会有反复生产问题。(然而有局部音讯代理存在特例)音讯代理删除音讯,少数音讯的工作区间十分音讯,队列也比拟短,通常这些内容都能很快的在内存中"转接",然而一旦音讯沉积,音讯无奈在内存中堆放,就须要长期序列化长久存储到磁盘中,期待内存有了足够空间之后再加载内容。这一步操作须要耗费零碎的CPU和IO资源。数据库通常会应用多级索引放慢数据的搜寻,而音讯代理通常反对某种音讯模型来反对特定主题主题发送模式,以及利用日志程序读写放慢数据搜寻。数据库查问数据通常基于数据的工夫点快照,为了保证数据的ACID个性,数据库通常须要保障前后两个线程之间的数据可见性失常,前者在不反复查问的前提下,不应该看到后者改变的数据内容(当然也能够做到齐全看不到,比方单线程化)。音讯代理则侧重于在数据改变之后告诉客户端,对于音讯查问的能力反对较弱(或罗唆没有)。多个消费者读取生产者端的数据处理 通常比较简单,音讯代理的关注重点再看待消费者的“消费行为”上, 目前音讯代理有两种次要的生产模式:负载平衡和扇出式。 负载平衡 负载平衡代表了每一个音讯只能传给一个消费者,消费者能够共享解决音讯的动作,代理也能够调配给任意的消费者,音讯解决的代价十分高的时候,这种负载平衡的模式比拟受欢迎,通常咱们会心愿增加消费者并行处理音讯。 扇出式 扇出式指的是音讯会发给所有的消费者,应用的实现形式是让独立的消费者独特“监听”相通的生产信息同时不相互进行干预,也能够看作是一个流被复制到不同的批次外面进行工作(实现形式通过JMS和AMQP进行替换绑定)。 扇出式和负载平衡形式能够组合实现,比方能够通过负载平衡的多个分组而生产组内每一个消费者都能够承受音讯。 消息传递确认 消息传递过程具备不确定性,消费者承受到音讯有可能呈现不会解决,或者无奈解决的状况。为了确保音讯不会失落,客户端再进行音讯解决之后必须通知音讯代理,而后音讯代理能力确认是否真的被生产过。 如果连贯超时或者客户端解决超时,音讯代理没有收到音讯代理解决实现的申请,则须要把音讯从新传递给另一个消费者,通常会把音讯重传屡次,直到胜利为止,否则就认为是存在生产失败的状况。 还有一种状况是有的时候可能接管到音讯的客户端曾经把音讯解决过了,然而在告诉音讯代理之前解体了,这时候就波及分布式事务问题。 反复生产 消息传递的最初一个问题是反复生产问题: 以下面的图为例,如消费者2在生产m3的时候忽然产生解体,此时消费者1,刚好生产m4生产结束,然而下一个生产的确意料之外的音讯m3,最初才是生产m5。呈现这样的状况因为消费者2生产m3的时候没有对音讯代理进行回应。 解决负载平衡和反复生产问题,通常的解决方案是应用独自队列的形式解决,然而如果音讯和音讯之间没有交加,则齐全能够释怀重排序问题。 分区日志传统的音讯队列通常不具备和数据库雷同的性能,消费者生产完音讯之后,音讯代理会把音讯间接抛弃,晚期音讯代理是单纯为了刹时数据处理而呈现的。 数据库的设计思路则要求数据的长久化存储,只有用户存在操作,应该是永恒储存到音讯代理当中。 传统音讯队列如果无奈找到消费者,则通常会间接把音讯失落并且无奈复原,然而古代音讯随着量级收缩会呈现音讯阻塞的问题。既然数据库善于存储,而音讯代理善于数据的刹时传递,那么必定是能够混合应用的。 日志音讯存储后续受到音讯队列欢送。日志存储队列日志存储构造的要害是 程序读写 和 追加,在[[《数据密集型型零碎设计》LSM-Tree VS BTree]]中介绍了无关日志存储构造的特点。 日志存储和音讯代理联合之后的工作形式是生产者推送到音讯队列应用追加日志,而消费者则读取最新日志进行接管解决,如果读取到开端则立即进入阻塞期待的状态期待生产申请。 为了实现这样的期待生产机制,音讯代理通常会设计序号或者生产进度(偏移量)来实现每个消费者的生产进度监控。 在Unix零碎当中tail -f 以雷同的思路进行工作,默认状况下会监听某个文件的开端地位监听扭转。应用序号递增是因为日志是只追加不批改的,序号能够保障音讯的发送和生产程序,然而如果应用分区并发发送,仍然没法保障程序生产。 上面的图就是音讯队列生产的分区以及日志存储联合。 从下面的图能够看到,生产者依照程序追加到分区前面,消费者保护偏移量记录生产地位,然而分区的理论生产程序是无奈保障的, 只能保障单个分区的生产程序 。 目前支流的音讯队列都会联合日志音讯存储实现高可用,高可用是古代架构的根本要求,应用分区以及音讯的程序读写能够根本达到媲美内存的操作速度,在实现百万音讯吞吐量的同时通过日志存储放弃高可用以及保障音讯的容错性。 日志和传统音讯比照基于日志的音讯代理应用日志的形式能够很好的反对扇出构造。日志音讯代理反对负载平衡,能够把整个分区交给消费者进行生产,不须要将单个消费者给消费者客户端。负载平衡的状况下每个客户端调配分区中的所有音讯,之后将会通过单线程程序生产的形式对于分区进行生产解决。然而这样会带来一个问题,那就是一个主题最终只能有一个分区进行解决,并且只能有一个消费者进行生产,这样会带来两个方面的问题。 主题数量将会等于分区的数量,负载平衡变为单节点,音讯队列的所有长处被屏蔽了。如果消费者无奈及时生产,将会呈现音讯沉积问题。通过下面几点咱们能够理解到,针对不同的利用场景,抉择音讯的解决形式也不一样: 如果音讯的解决代价十分高但音讯的排序不是十分重要,能够并行处理保障音讯的失常生产。这时候应用传统的 JMS/AMQP 类型的音讯代理进行解决。(举例:日志)如果生产程序十分重要,音讯程序也十分重要,应用日志形式解决也能够很好工作。(举例:扣款结算)生产偏移量程序读取的形式能够容易实现偏移量解决的需要,通过退出音讯偏移量,小于以后消费者偏移量的音讯都能够认为曾经被生产,而更大的偏移量此时消费者并没有发现,在这种工作模式下,消费者只须要定期进行偏移量查看,而后进行生产和挪动偏移量即可。 这种形式有点相似流水线工作上下游定期查看上游推送的工作。偏移量解决在主从复制的数据库解决形式中的日志序列号比拟常见,比方Mysql 的Binlog日志文件复制,主从复制反对从节点从新连贯主节点之后,从断开节点的日志序列号开始从新进行同步,在不跳过任何写入的状况下复原节点复制。 如果音讯生产失败,通常由另一个节点负责接管工作,同时记录偏移量最初的应用记录。然而生产偏移量也会呈现反复生产的状况,那就是消费者曾经把音讯生产解决实现了,然而挪动偏移量的时候没有记录就解体了,那么这条音讯在音讯队列复原之后会认为被生产!这种状况下会便呈现了反复生产的状况。 磁盘空间应用日志音讯存储的关键问题是磁盘空间会随着日志记录被耗尽,为了更好的治理消息日志, 日志设计通常会采纳分区分段的形式,定期将旧段进行归档或者删除。 日志通常能够看作是十分大的缓冲区,缓存区一旦满了通常须要删除掉最早的数据,缓冲区通常会以环形缓冲或者被叫做循环缓冲的形式存在。如果音讯的生产速度跟不上音讯的生产速度,有可能呈现消费者未进行生产,然而待删除标记指向未生产片段的状况。 古代磁盘的存储效率根本能够维持海量数据的日志保留几天甚至几周的工夫,不论音讯的留存工夫多长,能够确定的是音讯肯定会被存储在磁盘造成的缓冲区当中,所以整个日志的吞吐量根本恒定不变。 磁盘空间利用的另一种形式是当队列过大的时候才把存在内存的音讯写入导磁盘当中,和固定放在磁盘缓冲区的日志音讯存储相比,应用这种形式在内存的写入是相当快的,然而磁盘读写的时候会显著降速,这会导致队列不能保障吞吐量的稳固(当然也取决于队列中的音讯数量)。 消费者跟不上生产者下面根本探讨了消费者跟不上生产者的的一系列问题,咱们能够总结为三种解决形式: 抛弃音讯音讯缓冲利用抗压应用比拟多的形式是日志作为音讯缓冲,因为缓冲要求具备较大并且固定大小的缓冲。(受到磁盘制约) 然而消费者落后太多,所需的信息如果比磁盘上的信息还要旧,那么可能无奈读者这些信息。所以代理抛弃的音讯是缓冲区容量不能包容的旧音讯,实现的形式是监控消费者落后日志头部的间隔,落后十分多的状况下须要进行报警,避免音讯积压。 消费者所生产的队列“爆满”之前,报警机制以及一些监控工具能提前判断出消费者异样,缓冲区承载量通常足够抗到解决问题修复实现,呈现这些问题更多的状况是因为长事务或者长业务解决导致的阻塞问题,这些问题须要在音讯隐没之前及时处理修复实现。 那如果真的没来得及解决并且开始失落音讯怎么办? 通常也是消费者呈现问题,消费者通常为集群部署的(须要强一致性和程序生产例外),如果单消费者呈现故障,能够通过异样之后立刻下线的形式把音讯转移给其余消费者解决,同时古代音讯队列通常有心跳检测和负载平衡,能够在某个节点故障的时候,主动切换到其余消费者进行生产,切换和异样解决有许多策略能够抉择。 ...
提到音讯队列,大家肯定不生疏,无论是在校、面试还是工作,咱们波及得都很多。 有需要就有供给,市面上音讯队列当然十分多,算得上支流的也有5、6个,而大家可能因为对某一种队列比拟相熟,就始终用它,大略就是亚瑟打野、亚瑟上路、亚瑟中路、亚瑟辅助。 但你有思考过吗引入的队列是否是最合适的吗? 它与其它队列相比有什么劣势吗? 能够说,选型是开发工作中最重要,也是最体现能力的一环。明天牛牛就来给大家介绍下音讯队列如何抉择。 音讯队列是什么?音讯队列,顾名思义就是传递音讯的队列,有着先入先出的个性,既然是队列,天然遵循先入先出的准则,同时,音讯队列具备可靠性、高性能等特点。 音讯队列是大型分布式系统不可短少的中间件,个别用于异步流程、音讯散发、流量削锋等问题,能够通过音讯队列实现高性能、高可用、高扩大的架构。 业界比拟闻名的音讯队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。 那大家就会问了,这么多音讯队列,我选谁好呢? 别急,咱们先来深刻理一理音讯队列的利用场景,以及每个队列的特色,晓得这些信息,才好做出决策。 音讯队列的利用场景异步如果一个接口,解决工夫很长,而且不能通过程度扩容来解决,就须要异步,那么,什么状况下不能通过程度扩容解决呢? 有很多,比方视频解决,波及到视频下载,那受限于网络带宽等因素,扩容无用;比方区块链这种共识场景,只有单机能力出块,扩容也没有用;还有更常见的,比方一个业务流程,过了10多个微服务,单个兴许不长,加起来就很难承受。 以上的状况下,用户很难通过同步接口长时间期待后果,那就应该做成异步,先扔进音讯队列,后续再进行生产,响应工夫能够从10s,升高到10ms。 2.音讯散发 假如一个外围服务A,是用来公布某种信号的,公布之后,须要告诉到上游服务B、C,这种模式在只有B、C两兄弟的时候,没啥问题。 但随着业务须要,可能会有D、E、F等更多的打工人呈现,这时候A服务就须要更改代码,将音讯也传递给这些新退出的兄弟,每次减少打工人,就须要更改一次代码。 而引入音讯队列,则能够解决这个问题,实现能力复用,业务解耦。 3.削峰 业务有个概念叫峰值流量,也就说流量在一段时间远大于平时,从视图上看就像一个山峰。比方天猫双十一秒杀日,特地是0点的时候,流量起码是平时的千倍以上,如果不做好解决,一波就能彻底将服务打死。 如果常备1000倍流量的设施,那是极大的老本节约,如果长期调配,进行服务扩容,很多时候又不如设想得简略。 之前有和Shopee的同学聊过,一到流动日他们就得紧锣密鼓去做扩容,扩容之后还得熬更守夜地做压测,不光劳命伤财、还容易出事变。 音讯队列能够优雅地解决这个问题,将双十一的申请扔入音讯队列,期待后续服务缓缓解决,如下图所示。 其实在架构上,削峰和下面的异步场景是雷同的架构,都是将申请扔入队列中,再缓缓生产。 但要留神异步场景是单个申请,自身解决工夫很长。削峰针对是单个申请ok,然而流量突发的场景。 音讯队列能力比照下面有提到几个出名的音讯队列,其中ZeroMQ太过轻量,次要用于学习,理论是不会利用到生产,咱们就Kafka、ActiveMQ、RabbitMQ、RocketMQ来进行不同维度比照。 音讯队列选型选型的时候,咱们须要依据业务场景,联合上述个性来进行选型。 比方你要反对天猫双十一类超大型的秒杀流动,这种一锤子买卖,那治理界面、音讯回溯啥的不重要。 咱们须要看什么?看吞吐量! 所以优先选Kafka和RocketMQ这种更高吞吐的。 比方做一个公司的中台,对外提供能力,那可能会有很多主题接入,这时候主题个数又是很重要的考量,像Kafka这样百级的,就不太符合要求,能够依据状况思考千级的RocketMQ,甚至百万级的RabbitMQ。 又比方是一个金融类业务,那么重点思考的就是稳定性、安全性,分布式部署的Kafka和Rocket就更有劣势。 特地说一下时效性,RabbitMQ以微秒的时效作为招牌,但实际上毫秒和微秒,在绝大多数状况下,都没有感知的区别,加上网络带来的稳定,这一点在生产过程中,反而不会作为重要的考量。 其它的个性,如音讯确认、音讯回溯,也常常作为考量的场景,治理界面的话试公司而定了,反正牛牛呆过的中央,都不看重这个,毕竟都有本人的运维体系。 最初本次,牛牛给大家分享了音讯队列是什么、解决什么问题、每种队列的个性,以及怎么联合场景和个性做剖析。 面试时能够针对每个场景的不同就地取材选取队列,这样不仅能够展示常识的全面性还能够体现出本人剖析问题的能力。 而在理论的工作中,咱们须要思考的则更多,比方团队的技术栈、经济老本等状况进行综合剖析。只有相熟每个音讯队列的优劣,能力好中取优,选出适宜的计划。
音讯队列曾经逐步成为分布式应用场景、外部通信、以及秒杀等高并发业务场景的外围伎俩,它具备低耦合、牢靠投递、播送、流量管制、最终一致性 等一系列性能。 无论是 RabbitMQ、RocketMQ、ActiveMQ、Kafka还是其它等,都有的一些基本原理、术语、机制等,总结分享进去,心愿大家在应用音讯队列技术的时候可能疾速了解@mikechen 1. 音讯生产者、音讯者、队列 音讯生产者Producer:发送音讯到音讯队列。 音讯消费者Consumer:从音讯队列接管音讯。 Broker:概念来自与Apache ActiveMQ,指MQ的服务端,帮你把音讯从发送端传送到接收端。 音讯队列Queue:一个先进先出的音讯存储区域。音讯依照程序发送接管,一旦音讯被生产解决,该音讯将从队列中删除。2.设计Broker次要思考 1)音讯的转储:在更适合的工夫点投递,或者通过一系列伎俩辅助音讯最终能送达消费机。 2)标准一种范式和通用的模式,以满足解耦、最终一致性、错峰等需要。 3)其实简略了解就是一个音讯转发器,把一次RPC做成两次RPC。发送者把音讯投递到broker,broker再将音讯转发一手到接收端。 总结起来就是两次RPC加一次转储,如果要做生产确认,则是三次RPC。 3. 点对点音讯队列模型 点对点模型 用于 音讯生产者 和 音讯消费者 之间 点到点 的通信。 点对点模式蕴含三个角色: 音讯队列(Queue) 发送者(Sender) 接收者(Receiver)每个音讯都被发送到一个特定的队列,接收者从队列中获取音讯。队列保留着音讯,能够放在 内存 中也能够 长久化,直到他们被生产或超时。 特点 每个音讯只有一个消费者(Consumer)(即一旦被生产,音讯就不再在音讯队列中) 发送者和接收者之间在工夫上没有依赖性 接收者在胜利接管音讯之后需向队列应答胜利4. 公布订阅音讯模型Topic 公布订阅模型蕴含三个角色: 主题(Topic) 发布者(Publisher) 订阅者(Subscriber)多个发布者将音讯发送到Topic,零碎将这些消息传递给多个订阅者。 特点 每个音讯能够有多个消费者:和点对点形式不同,公布音讯能够被所有订阅者生产 发布者和订阅者之间有工夫上的依赖性。 针对某个主题(Topic)的订阅者,它必须创立一个订阅者之后,能力生产发布者的音讯。 为了生产音讯,订阅者必须放弃运行的状态。5.点对点和公布订阅的区别 生产者发送一条音讯到队列queue,只有一个消费者能收到。 发布者发送到topic的音讯,只有订阅了topic的订阅者才会收到音讯。 6. 音讯的程序性保障 基于Queue音讯模型,利用FIFO先进先出的个性,能够保障音讯的程序性。 7. 音讯的ACK机制 即音讯的Ackownledge确认机制, 为了保障音讯不失落,音讯队列提供了音讯Acknowledge机制,即ACK机制,当Consumer确认音讯曾经被生产解决,发送一个ACK给音讯队列,此时音讯队列便能够删除这个消 息了。如果Consumer宕机/敞开,没有发送ACK,音讯队列将认为这个音讯没有被解决,会将这个音讯从新发送给其余的Consumer从新生产解决。 8.最终一致性的设计思路 次要是用“记录”和“弥补”的形式。 本地事务保护业务变动和告诉音讯,一起落地,而后RPC达到broker,在broker胜利落地后,RPC返回胜利,本地音讯能够删除。否则本地音讯始终靠定时工作轮询一直重发,这样就保障了音讯牢靠落地broker。 broker往consumer发送音讯的过程相似,始终发送音讯,直到consumer发送生产胜利确认。 咱们先不理睬反复音讯的问题,通过两次音讯落地加弥补,上游是肯定能够收到音讯的。而后依赖状态机版本号等形式做判重,更新本人的业务,就实现了最终一致性。 如果呈现生产方解决过慢生产不过去,要容许生产方被动ack error,并能够与broker约定下次投递的工夫。 对于broker投递到consumer的音讯,因为不确定失落是在业务处理过程中还是音讯发送失落的状况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,音讯解决胜利了吗?如果询问无果,再重发。 事务:本地事务,本地落地,弥补发送。本地事务做的,是业务落地和音讯落地的事务,而不是业务落地和RPC胜利的事务。音讯只有胜利落地,很大水平上就没有失落的危险。 9. 音讯的事务反对 音讯的收发解决反对事务,例如:在工作核心场景中,一次解决可能波及多个音讯的接管、解决,这应该处于同一个事务范畴内,如果一个音讯解决失败,事务回滚,音讯从新回到队列中。 10. 音讯的长久化 音讯的长久化,对于一些要害的外围业务来说是十分重要的,启用音讯长久化后,音讯队列宕机重启后,音讯能够从长久化存储复原,音讯不失落,能够持续生产解决。 11. 音讯队列的高可用性 在理论生产环境中,应用单个实例的音讯队列服务,如果遇到宕机、重启等零碎问题,音讯队列就无奈提供服务了,因而很多场景下,咱们心愿音讯队列有高可用性反对,例如 RabbitMQ的镜像集群模式的高可用性计划,ActiveMQ也有基于LevelDB+ZooKeeper的高可用性计划,以及Kafka的Replication机制等。 12.音讯队列的选型和利用场景 具体请参考:高并发架构系列:分布式之音讯队列的特点、选型、及利用场景详解 以上 作者简介陈睿|mikechen,10年+大厂架构教训,《BAT架构技术500期》系列文章作者,分享十余年架构教训以及面试心得! ...
1、音讯队列1、MQ的相干概念1、什么是MQMQ(message queue),从字面意思上看,实质是个队列,FIFO 先入先出,只不过队列中寄存的内容是message 而已,还是一种跨过程的通信机制,用于上下游传递音讯。 在互联网架构中,MQ 是一种十分常见的上下游“逻辑解耦+物了解耦”的音讯通信服务。应用了 MQ 之后,音讯发送上游只须要依赖 MQ,不必依赖其余服务。 2、为什么要应用MQ1、流量消峰举个例子:如果订单零碎最多能解决一万次订单,这个解决能力应酬失常时段的下单时入不敷出,失常时段咱们下繁多秒后就能返回后果。然而在高峰期,如果有两万次下单操作系统是解决不了的,只能限度订单超过一万后不容许用户下单。应用音讯队列做缓冲,咱们能够勾销这个限度,把一秒内下的订单扩散成一段时间来解决,这时有些用户可能在下单十几秒后能力收到下单胜利的操作,然而比不能下单的体验要好。2、利用解耦以电商利用为例,利用中有订单零碎、库存零碎、物流零碎、领取零碎。用户创立订单后,如果耦合调用库存零碎、物流零碎、领取零碎,任何一个子系统出了故障,都会造成下单操作异样。当转变成基于音讯队列的形式后,零碎间调用的问题会缩小很多,比方物流零碎因为产生故障,须要几分钟来修复。在这几分钟的工夫里,物流零碎要解决的内存被缓存在音讯队列中,用户的下单操作能够失常实现。当物流零碎复原后,持续解决订单信息即可,中单用户感触不到物流零碎的故障,晋升零碎的可用性。 3、异步解决有些服务间调用是异步的,例如 A 调用 B,B 须要破费很长时间执行,然而 A 须要晓得 B 什么时候能够执行完,以前个别有两种形式: A 过一段时间去调用 B 的查问 api 查问A 提供一个 callback api, B 执行完之后调用 api 告诉 A 服务。这两种形式都不是很优雅,应用音讯总线,能够很不便解决这个问题,A 调用 B 服务后,只须要监听 B 解决实现的音讯,当 B 解决实现后,会发送一条音讯给 MQ,MQ 会将此音讯转发给 A 服务。这样 A 服务既不必循环调用 B 的查问 api,也不必提供 callback api。同样 B 服务也不必做这些操作。A 服务还能及时的失去异步解决胜利的音讯。 3、MQ的分类1、ActiveMQ长处:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,音讯可靠性较低的概率失落数据 毛病:官网社区当初对 ActiveMQ 5.x 保护越来越少,高吞吐量场景较少应用。 2、Kafka大数据的杀手锏,谈到大数据畛域内的音讯传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据畛域的宠儿,在数据采集、传输、存储的过程中施展着无足轻重的作用。目前曾经被 LinkedIn,Uber, Twitter, Netflix 等大公司所驳回。 ...
导语 | 本文次要介绍Kafka、RabbitMQ、Pulsar、RocketMQ相干的基本原理和选型比照,心愿对此方向感兴趣的读者提供肯定教训和帮忙。一、音讯队列(一)音讯队列应用场景音讯队列中间件是分布式系统中重要的组件,次要解决利用耦合,异步音讯,削峰填谷等问题。实现高性能、高可用、可伸缩和最终一致性架构。 解耦:多个服务监听、解决同一条音讯,防止屡次rpc调用。 异步音讯:音讯发布者不必期待音讯解决的的后果。削峰填谷:较大流量、写入场景,为上游I/O服务抗流量。当然大流量下就须要应用其余计划了。 音讯驱动框架:在事件总线中,服务通过监听事件音讯驱动服务实现相应动作。 (二)音讯队列模式点对点模式,不可反复生产 多个生产者能够向同一个音讯队列发送音讯,一个音讯在被一个音讯者生产胜利后,这条音讯会被移除,其余消费者无奈解决该音讯。如果消费者解决一个音讯失败了,那么这条音讯会从新被生产。 公布/订阅模式 公布订阅模式须要进行注册、订阅,依据注册生产对应的音讯。多个生产者能够将音讯写到同一个Topic中,多种音讯能够被同一个消费者生产。一个生产者生产的音讯,同样也能够被多个消费者生产,只有他们进行过音讯订阅。 二、选型参考音讯程序:发送到队列的音讯,生产时是否能够保障生产的程序。 伸缩:当音讯队列性能有问题,比方生产太慢,是否能够疾速反对库容;当生产队列过多,节约系统资源,是否能够反对缩容。 音讯留存:音讯生产胜利后,是否还会持续保留在音讯队列。 容错性:当一条音讯生产失败后,是否有一些机制,保障这条音讯是一种能胜利,比方异步第三方退款音讯,须要保障这条音讯生产掉,能力确定给用户退款胜利,所以必须保障这条音讯生产胜利的准确性。 音讯可靠性:是否会存在丢音讯的状况,比方有A/B两个音讯,最初只有B音讯能生产,A音讯失落。 音讯时序:次要包含“音讯存活工夫”和“提早音讯”。 吞吐量:反对的最高并发数。 音讯路由:依据路由规定,只订阅匹配路由规定的音讯,比方有A/B两者规定的音讯,消费者能够只订阅A音讯,B音讯不会生产。 (一)KafkaKafka是由Apache软件基金会开发的一个开源流解决平台,由Scala和Java编写。该项目标指标是为解决实时数据提供一个对立、高吞吐、低提早的平台。其长久化层实质上是一个“依照分布式事务日志架构的大规模公布/订阅音讯队列”,这使它作为企业级基础设施来解决流式数据十分有价值。(维基百科) 根本术语 Producer:音讯生产者。个别状况下,一条音讯会被发送到特定的主题上。通常状况下,写入的音讯会通过轮询将音讯写入各分区。生产者也能够通过设定音讯key值将音讯写入指定分区。写入分区的数据越平均Kafka的性能能力更好施展。 Topic:Topic是个形象的虚构概念,一个集群能够有多个Topic,作为一类音讯的标识。一个生产者将音讯发送到topic,消费者通过订阅Topic获取分区音讯。 Partition:Partition是个物理概念,一个Topic对应一个或多个Partition。新音讯会以追加的形式写入分区里,在同一个Partition里音讯是有序的。Kafka通过分区,实现音讯的冗余和伸缩性,以及反对物理上的并发读、写,大大提高了吞吐量。 Replicas:一个Partition有多个Replicas正本。这些正本保留在broker,每个broker存储着成千盈百个不同主题和分区的正本,存储的内容分为两种:master正本,每个Partition都有一个master正本,所有内容的写入和生产都会通过master正本;follower正本不解决任何客户端的申请,只同步master的内容进行复制。如果master产生了异样,很快会有一个follower成为新的master。 Consumer:音讯读取者。消费者订阅主题,并依照肯定程序读取音讯。Kafka保障每个分区只能被一个消费者应用。 Offset:偏移量是一种元数据,是一直递增的整数。在音讯写入时Kafka会把它增加到音讯里。在分区内偏移量是惟一的。生产过程中,会将最初读取的偏移量存储在Kafka中,消费者敞开偏移量不会失落,重启会持续从上次地位开始生产。 Broker:独立的Kafka服务器。一个Topic有N个Partition,一个集群有N个Broker,那么每个Broker都会存储一个这个Topic的Partition。如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在理论生产环境中,尽量避免这种状况的产生,这种状况容易导致Kafka集群数据不平衡。 零碎框架 第一个topic有两个生产,新音讯被写入到partition 1或者partition 2,两个分区在broker1、broker2都有备份。有新音讯写入后,两个follower分区会从两个master分区同步变更。对应的consumer会从两个master分区依据当初offset获取音讯,并更新offset。第二个topic只有一个生产者,同样对应两个partition,扩散在Kafka集群的两个broker上。有新音讯写入,两个follower分区会同步master变更。两个Consumer别离从不同的master分区获取音讯。 长处 高吞吐量、低提早:kafka每秒能够解决几十万条音讯,它的提早最低只有几毫秒; 可扩展性:kafka集群反对热扩大; 持久性、可靠性:音讯被长久化到本地磁盘,并且反对数据备份避免数据失落; 容错性:容许集群中节点故障,一个数据多个正本,多数机器宕机,不会失落数据; 高并发:反对数千个客户端同时读写。 毛病 分区有序:仅在同一分区内保障有序,无奈实现全局有序; 无延时音讯:生产程序是依照写入时的程序,不反对延时音讯; 反复生产:生产零碎宕机、重启导致offset未提交; Rebalance:Rebalance的过程中consumer group下的所有消费者实例都会进行工作,期待Rebalance过程实现。 应用场景 日志收集:大量的日志音讯先写入kafka,数据服务通过生产kafka音讯将数据落地。 音讯零碎:解耦生产者和消费者、缓存音讯等。 用户流动跟踪:kafka常常被用来记录web用户或者app用户的各种流动,如浏览网页、搜寻、点击等流动,这些流动信息被各个服务器公布到kafka的topic中,而后消费者通过订阅这些topic来做实时的监控剖析,亦可保留到数据库。 经营指标:记录经营、监控数据,包含收集各种分布式应用的数据,生产各种操作的集中反馈,比方报警和报告。流式解决:比方spark streaming。 (二)RabbitMQRabbitMQ是实现了高级音讯队列协定(AMQP)的开源音讯代理软件(亦称面向音讯的中间件(英语:Message-oriented middleware))。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在凋谢电信平台框架上的。所有次要的编程语言均有与代理接口通信的客户端函式库。(维基百科) 根本术语 Broker:接管客户端链接实体,实现AMQP音讯队列和路由性能。 Virtual Host:是一个虚构概念,权限管制的最小单位。一个Virtual Host里蕴含多个Exchange和Queue。 Exchange:接管音讯生产者的音讯并将音讯转发到队列。发送音讯时依据不同ExchangeType的决定路由规定,ExchangeType罕用的有:direct、fanout和topic三种。 Message Queue:音讯队列,存储为被生产的音讯。 Message:由Header和Body组成,Header是生产者增加的各种属性,蕴含Message是否长久化、哪个MessageQueue接管、优先级。Body是具体的音讯内容。 Binding:Binding连贯起了Exchange和Message Queue。在服务器运行时,会生成一张路由表,这张路由表上记录着MessageQueue的条件和BindingKey值。当Exchange收到音讯后,会解析音讯中的Header失去BindingKey,并依据路由表和ExchangeType将音讯发送到对应的MessageQueue。最终的匹配模式是由ExchangeType决定。 Connection:在Broker和客户端之间的TCP连贯。 Channel:信道。Broker和客户端只有tcp连贯是不能发送音讯的,必须创立信道。AMQP协定规定只有通过Channel能力执行AMQP命令。一个Connection能够蕴含多个Channel。之所以须要建设Channel,是因为每个TCP连贯都是很贵重的。如果每个客户端、每个线程都须要和Broker交互,都须要保护一个TCP连贯的话是机器消耗资源的,个别倡议共享Connection。RabbitMQ不倡议客户端线程之前共享Channel,至多保障同一Channel发小音讯是穿行的。 ...
作者:vivo 互联网中间件团队- Liu Runyun大量业务应用消息中间件进行零碎间的解耦、异步化、削峰填谷设计实现。公司外部后期基于RabbitMQ实现了一套高可用的消息中间件平台。随着业务的持续增长,音讯体量随之增大,对消息中间件平台提出了更高的要求,此外在运维过程中也遇到了高可用难以保障,性能个性有余等诸多问题。基于遇到的这些问题,决定引入RocketMQ进行替换。本文将介绍基于RocketMQ建设消息中间件平台并实现在线业务无感知的平滑迁徙。 一、背景阐明vivo互联网中间件团队于2016年开始基于开源RabbitMQ向业务提供高可用消息中间件平台服务。 为解决好业务流量快速增长的问题,咱们通过正当的业务集群拆分和动静调整,较好的交付了业务对消息中间件平台的平台能力需要。 然而随着业务长周期的迅猛发展,音讯体量也越来越大,在高并发、大流量场景下RabbitMQ的零碎架构设计存在着肯定的限度,次要有以下问题: 1.1 高可用能力有余架构设计存在脑裂危险,并且默认脑裂后无奈主动复原,人工染指复原存在数据失落的危险。 为解决脑裂问题,能够抉择将网络异样后的解决调整为pause_minority模式,然而也带来了可能渺小的网络抖动也会导致集群故障无奈复原的问题。 1.2. 性能有余业务音讯发送后通过exchange路由到对应的queue中,每一个queue由集群中的某个节点理论承载流量,高流量下集群中的某个节点可能会成为瓶颈。 queue由某个节点承载流量后无奈疾速迁徙,强制迁徙到其它低负载节点可能会导致queue不可用,这也导致了向集群中增加节点并无奈疾速晋升集群的流量承载能力。 集群性能较低,经测试应用三台机器组成集群,可承载大略数万tps左右,并且因为queue是由集群中某个节点理论承载的,也无奈持续晋升某个queue的性能,这样就无奈撑持大流量业务。 音讯沉积到千万或更多后会导致集群性能降落,甚至海量沉积后如果生产申请tps特地高,可能会因为磁盘的性能损耗导致发送性能降落,并且在音讯沉积太多时复原工夫长甚至无奈复原。 1.3 性能个性有余RabbitMQ 默认状况下生产异样会执行立刻从新投递,大量的异样音讯也可能导致业务无奈生产后续音讯。 性能个性上未反对事务音讯、程序音讯性能。 虽可自行实现音讯轨迹逻辑,然而会对集群产生十分大的性能损耗,在正式环境中理论无奈基于RabbitMQ原生的能力实现音讯轨迹性能。 二、消息中间件平台的我的项目指标基于以上问题,中间件团队于2020年Q4开始进行了下一代消息中间件平台计划的调研,为保障下一代消息中间件平台合乎业务新的需要,咱们首先明确了消息中间件平台的建设指标,次要蕴含两局部: 业务需要平台需要2.1 业务需要剖析高性能:可撑持极高的tps,并且反对程度扩大,可疾速满足业务的流量增长需要,消息中间件不应成为业务申请链路性能晋升的瓶颈点。 高可用:极高的平台可用性(>99.99%),极高的数据可靠性(>99.99999999%)。 丰盛的性能个性:反对集群、播送生产;反对事务音讯、程序音讯、延时音讯、死信音讯;反对音讯轨迹。 2.2 平台运维需要剖析可运维:业务应用权限校验;业务生产生产流量限度;业务流量隔离与疾速迁徙能力。可观测:丰盛的性能指标察看集群的运行状况。可把握:可基于开源组件疾速进行二次开发,丰盛平台性能个性和进行相干问题修复。云原生:后续可基于容器化提供云原生消息中间件,提供更高的弹性和可伸缩能力。总结:须要建设高性能、高牢靠的下一代消息中间件,具备极高的数据可靠性,丰盛的性能个性,并且须要完满兼容以后的RabbitMQ平台,帮忙业务疾速迁徙到新消息中间件平台,缩小业务迁徙老本。三、开源组件选型调研基于以后RabbitMQ平台的问题和对下一代消息中间件平台的我的项目需要,咱们发展了针对以后较风行的两款消息中间件:RocketMQ、Pulsar的调研。 调研过程中次要针对以下两方面进行比照: 3.1 高可用能力剖析比照 3.1.1 高可用架构与负载平衡能力比照 Pulsar部署架构(起源:Pulsar社区) RocketMQ部署架构(起源:RocketMQ社区) Pulsar:采纳计算与存储拆散架构设计,能够实现海量数据存储,并且反对冷热数据拆散存储。基于ZK和Manager节点管制Broker的故障切换以实现高可用。Zookeeper采纳分层分片存储设计,人造反对负载平衡。RocketMQ:采纳存算一体架构设计,主从模式部署,master节点异样不影响音讯读取,Topic采纳分片设计。须要二次开发反对主从切换实现高可用。未实现Broker的主动负载平衡,能够将top n流量Topic散布到不同的Broker中实现简略的负载平衡。 3.1.2 扩缩容与故障复原比照PulsarBroker与BooKeeper独立扩缩容,并且扩缩容后会实现主动负载平衡。Broker节点无状态,故障后承载Topic会主动转移到其它Broker节点,实现故障秒级复原。BooKeeper由主动复原服务进行ledger数据对齐,并复原到设置的QW份。故障期间已ack音讯不会失落,未ack音讯须要客户端重发。RocketMQBroker扩缩容后须要人工染指实现Topic流量平衡,可开发主动负载平衡组件联合Topic的读写权限管制自动化实现扩缩容后的负载平衡。基于主从切换实现高可用,因为客户端定期30秒从NameSrv更新路由,因而故障复原工夫在30~60秒,能够联合客户端降级策略让客户端被动剔除异样Broker节点,实现更快故障复原。采纳同步复制异步刷盘部署架构,在极其状况下会造成大量音讯失落,采纳同步复制同步刷盘,已写入音讯不会失落。 3.1.3 性能比照Pulsar可撑持百万Topic数量,理论受到ZK存储元数据限度。依据外部压测1KB音讯可撑持TPS达数十万。RocketMQ逻辑上可撑持百万Topic,理论在达到数万时Broker与NameSrv传输心跳包可能超时,倡议单集群不超过5万。依据压测可撑持1KB音讯体TPS达10万+。3.2 性能个性比照 3.3 总结从高可用架构剖析,Pulsar基于Bookeeper组件实现了架构的计算与存储拆散,能够实现故障的疾速复原;RocketMQ采纳了主从复制的架构,故障复原依赖主从切换。 从性能个性剖析,Pulsar反对了丰盛的过期策略,反对了音讯去重,能够反对实时计算中音讯只生产一次的语义;RocketMQ在事务音讯、音讯轨迹、生产模式等个性对在线业务有更好的反对。 从这两方面比照,最终抉择了RocketMQ构建咱们下一代的消息中间件平台。 四、平滑迁徙建设通过技术调研,确定了基于RocketMQ建设下一代消息中间件平台。 为了实现业务从RabbitMQ平滑迁徙到RocketMQ,就须要建设音讯网关实现音讯从AMQP协定转换到RocketMQ;RabbitMQ与RocketMQ的元数据语义与存储存在差别,须要实现元数据语义的映射与元数据的独立存储。 次要有以下四个事项须要实现: 4.1 音讯网关独立部署与嵌入式部署差别比照 4.2 元数据定义映射与保护 4.3 互不烦扰的高性能音讯推送RabbitMQ采纳推模式进行音讯生产,尽管RocketMQ也反对音讯推送生产,然而因为AMQP协定中通过prefetch参数限度了客户端缓存音讯数量以保障不会因缓存太多音讯导致客户端内存异样,因而在音讯网关实现音讯推送时也须要满足AMQP协定的语义。 同时每个音讯网关都须要数千甚至数万的queue的音讯推送,每个queue音讯生产速率存在差别,并且每个队列可能随时有音讯须要推送到客户端进行生产,要保障不同queue之间的推送互不烦扰且及时。 为了实现高效的、互不烦扰的音讯推送,有以下策略: 每个queue采纳独立的线程,保障互不烦扰和时效性,毛病是无奈撑持海量queue的音讯推送。基于信号量、阻塞队列等,在感知到有可推送音讯和可生产服务端时按需进行音讯的推送,这样可应用大量的线程即可实现高效的音讯推送。最终抉择了第2种计划,数据流转图如下图所示: 一个音讯生产过程:客户端在启动连贯到音讯网关后,在音讯网关中会构建RocketMQ推送生产客户端实例,并且注入自定义的ConsumeMessageService实例,同时应用一个信号量保留客户端容许推送的音讯数量。 当音讯从集群侧推送到音讯网关时,将音讯依照推送的批次封装为一个工作保留在ConsumeMessageService实例的BlockingQueue中,同时推送线程会轮询所有的ConsumeMessageService实例,如果发现本地缓存有待生产的音讯并且有可生产音讯的业务客户端,将工作提交到线程池中实现音讯的推送。 为了保障不会因为大量生产速率特地高的queue导致其它queue的音讯推送时效性升高,会限度每一个ConsumeMessageService只容许推送肯定数量的音讯即转到推送其它queue的音讯,以此即可保障所有queue的音讯推送的互不烦扰和时效性。 在客户端生产ack/uack后再次通过信号量告诉下一次推送,这样也保障了应用大量的线程资源即可实现海量音讯的推送需要。 4.4 生产启停与生产限流能力实现基于音讯网关,能够在音讯推送逻辑中减少生产启停和生产限流逻辑。 生产启停能够帮忙业务疾速实现生产的暂停或是局部异样节点进行音讯生产。 生产限流能够帮忙业务管制音讯生产速率,防止对底层依赖产生太大压力。 4.5 平台架构 最终造成了以上的平台架构。新建设了一个AMQP-proxy音讯网关服务实现AMQP音讯转换到RocketMQ,反对业务的音讯生产生产。建设了mq-meta服务保护集群的元数据信息。通过mq-controller管制集群的主从切换,实现集群的高可用,同时减少了集群监控,负载平衡模块保障集群的高可用。五、平台建设停顿与迁徙收益5.1 业务应用收益 5.1.1 更高、更稳固的音讯发送性能 原生RabbitMQ集群业务压测性能 ...
用户下完订单到领取实现期间,须要锁定库存避免超卖,如何不依赖数据库,实现较高负载呢? 常见的计划是频繁的读取数据库中的订单,统计总库存占用,查看是否付款超时。如果零碎的负载量较高,这种计划很快将数据库的CPU占用达到极限,零碎响应速度迅速下隆。 我在一个我的项目中利用了音讯队列(RabbitMQ)的计划实现的这个需要,分享一下。 简化流程: 下单时,第一步要操作的,就是遍历订单里的商品列表,将商品数量累加到 Redis 库存占用上,避免超卖。 在商品详情页,展现的 商品可用库存 = 商品库存 - 库存占用 第二步操作,将订单写入MQ队列,因为订单是按工夫程序写入队列的,所以最先生效的订单肯定是队首的订单。因而,打算工作只须要循环查看队首的订单 如果订单付款工夫未超时,如果还有 50 秒,则休眠 50 秒后持续解决。如果订单已达到超时工夫,则检醒订单原始状态如果订单原始状态已非未付款(如已付款,已勾销),则将订单移出队列,解决下一单如果订单原始状态仍为未付款,则开释库存占用 留神,勾销订单操作须要被动开释库存占用。 打算工作内流程: 在咱们的我的项目中,有一种非凡 VIP 的用户,他们的未付款超时工夫长达12小时(普通用户半小时),他们罕用这种形式收费锁定库存,咱们暂不关注这种需要的合理性,零碎如何实现呢? 在MQ中加一条队列即可,半小时超时的一个队列,12小时超时的一个队列 本文原始网址:https://www.liu12.com/article...,转载请保留出处
前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 【mq】从零开始实现 mq-07-负载平衡 load balance 【mq】从零开始实现 mq-08-配置优化 fluent 【mq】从零开始实现 mq-09-消费者拉取音讯 pull message 【mq】从零开始实现 mq-10-消费者拉取音讯回执 pull message ack 【mq】从零开始实现 mq-11-消费者音讯回执增加分组信息 pull message ack groupName 【mq】从零开始实现 mq-12-音讯的批量发送与回执 批量音讯对于音讯的发送,有时候可能须要一次发送多个,比方日志音讯等。 批量操作能够晋升性能。 本节老马就和大家一起增加一点批量个性。 音讯的批量发送生产者实现接口定义/** * 同步发送音讯-批量 * @param mqMessageList 音讯类型 * @return 后果 * @since 0.1.3 */SendBatchResult sendBatch(final List<MqMessage> mqMessageList);/** * 单向发送音讯-批量 * @param mqMessageList 音讯类型 * @return 后果 * @since 0.1.3 */SendBatchResult sendOneWayBatch(final List<MqMessage> mqMessageList);一次反对发送多个音讯。 ...
前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 【mq】从零开始实现 mq-07-负载平衡 load balance 【mq】从零开始实现 mq-08-配置优化 fluent 【mq】从零开始实现 mq-09-消费者拉取音讯 pull message 【mq】从零开始实现 mq-10-消费者拉取音讯回执 pull message ack 状态回执大家好,我是老马。 上一节咱们只实现了拉取音讯的实现,然而短少了生产状态回执。 这一节咱们一起来学习下如何实现状态回执。 代码实现回执状态的设计咱们规定如下几种回执状态: package com.github.houbb.mq.common.constant;/** * @author binbin.hou * @since 0.0.3 */public final class MessageStatusConst { private MessageStatusConst(){} /** * 待生产 * ps: 生产者推送到 broker 的初始化状态 */ public static final String WAIT_CONSUMER = "W"; /** * 推送给生产端解决中 * ps: broker 筹备推送时,首先将状态更新为 P,期待推送后果 * @since 0.1.0 */ public static final String TO_CONSUMER_PROCESS = "TCP"; /** * 推送给生产端胜利 * @since 0.1.0 */ public static final String TO_CONSUMER_SUCCESS = "TCS"; /** * 推送给生产端失败 * @since 0.1.0 */ public static final String TO_CONSUMER_FAILED = "TCF"; /** * 生产实现 */ public static final String CONSUMER_SUCCESS = "CS"; /** * 生产失败 */ public static final String CONSUMER_FAILED = "CF"; /** * 稍后生产 * @since 0.1.0 */ public static final String CONSUMER_LATER = "CL";}消费者状态回执咱们在生产之后,增加状态回执: ...
ActiveMQ系列RabbitMQ系列 Kafka系列:Kafka -- 生产者客户端发送音讯的大抵流程Kafka -- 元数据的拉取机会Kafka -- 元数据的拉取流程kafka -- 分区是怎么指定的Kafka -- 音讯写入音讯累加器流程Kafka -- 缓冲区里的数据什么时候发送Kafka -- 网络申请Kafka -- 网络响应解决Kafka -- 音讯发送存储流程kafka-- 过期文件的删除Kakfa -- 消费者启动流程 RocketMQ系列:RocketMQ -- 元数据的拉取RocketMQ -- MessageQueue是怎么指定的RocketMQ -- 音讯发送存储流程RocketMQ -- 刷盘机制RocketMQ -- 音讯生产队列与索引文件RocketMQ -- 文件不统一的解决方案RocketMQ -- 过期文件的删除RocketMQ -- 消费者启动流程RocketMQ -- 写在音讯拉取前RocketMQ -- 音讯拉取RocketMQ -- 音讯生产过程
音讯队列的概念、原理和场景 在高并发的时候,程序往往无奈做到及时的解决。咱们引入一个两头的零碎,来进行分流和减压。 所以从实质上讲:音讯队列就是一个队列构造的中间件。也就是说,你把音讯和内容放入这个容器之后就能够间接返回,不必等它前期解决的后果。另外会有一个程序,读取这些数据并依照程序解决。 1、队列构造的中间件 2、音讯放入后,不用立刻解决 3、由订阅者/消费者按程序解决 也就是说:当遇到一个比拟大或者耗时比拟长的环节的时候,而同时你的业务又不须要立刻晓得这个环节的后果,应用音讯队列是好的抉择。 常识付费的拼团性能应用的就是音讯队列性能;把每个拼团订单都贮存在音讯队列中,拼团实现或拼团完结就能够主动解决这个订单。 application\index\controller\PushJob /*** 一个应用了队列的 action*/public static function actionWithDoPinkJob(array $data,string $name=''){ try{ // 1.当前任务将由哪个类来负责解决。 $jobHandlerClassName = 'app\index\job\PullDoPink'; // 2.当前任务归属的队列名称,如果为新队列,会主动创立 $jobQueueName = Config::get('queue_name', '') ? Config::get('queue_name', '') : 'doPinkJobQueue'; // 3.当前任务所需的业务数据 . 不能为 resource 类型,其余类型最终将转化为json模式的字符串 if($name){ $jobData = [ 'pinkInfo' => $data, 'time' => date('Y-m-d H:i:s'),'doName'=>$name]; $isPushed = Queue::push($jobHandlerClassName , $jobData , $jobQueueName ); } else{ $jobData = [ 'pinkInfo' => $data, 'time' => date('Y-m-d H:i:s')]; if (!isset($data['pink_time']) || !$data['pink_time']) return true; $timewait = $data['pink_time'] + 300; // 4.将该工作推送到音讯队列,期待对应的消费者去执行 $isPushed = Queue::later($timewait, $jobHandlerClassName , $jobData , $jobQueueName ); } if( $isPushed !== false ){ return 1; }else{ return 1; } }catch (ErrorException $e){ echo $e->getMessage(); }}application\index\job\PullDoPink ...
引言由MQ(1)音讯队列文章咱们晓得Kafka采纳公布/订阅队列,区别于RabbitMQ的队列模式,队列模型每条音讯只能被一个消费者生产,而公布/订阅模型就是为让一条音讯能够被多个消费者生产而生的,当然队列模型也能够通过音讯全量存储至多个队列来解决一条音讯被多个消费者生产问题,然而会有数据的冗余。接下来的内容都基于公布/订阅模型Kafka。 Kafka个别咱们称发送音讯方为生产者 Producer,承受生产音讯方为消费者Consumer,音讯队列服务端为Broker。音讯从Producer发往Broker,Broker将音讯存储至本地,而后Consumer从Broker拉取音讯,或者Broker推送音讯至Consumer,最初生产。1. BrokerKafka 集群蕴含一个或多个服务器,服务器节点称为broker。broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。刚好散布平均。如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在理论生产环境中,尽量避免这种状况的产生,这种状况容易导致Kafka集群数据不平衡。2. topic每条公布到Kafka集群的音讯都有一个类别,这个类别被称为Topic。3. Partition(分区)为了进步并发度,Kafka引入了分区Partition的概念,在RocketMQ中也叫队列,实质一样。即音讯是发往一个主题下的某个分区中。例如某个主题下有 5 个分区,那么这个主题的并发度就进步为 5 ,同时能够有 5 个消费者并行生产该主题的音讯,每个topic至多有一个partition。。个别能够采纳轮询或者 key hash 取余等策略来将同一个主题的音讯调配到不同的队列中。每个partition中的数据应用多个segment文件存储。partition中的数据是有序的,不同partition间的数据失落了数据的程序。如果topic有多个partition,生产数据时就不能保证数据的程序。在须要严格保障音讯的生产程序的场景下,须要将partition数目设为1。 3. Consumer Group与之对应的消费者个别都有组的概念 Consumer Group, 即消费者都是属于某个生产组的。一条音讯会发往多个订阅了这个主题的生产组。假如当初有两个生产组别离是Group 1 和 Group 2,它们都订阅了Topic-a。此时有一条音讯发往Topic-a,那么这两个生产组都能接管到这条音讯。而后这条音讯理论是写入Topic某个分区中,生产组中的某个消费者对应生产一个分区的音讯。在物理上除了正本拷贝之外,一条音讯在Broker中只会有一份,每个生产组会有本人的offset即生产点位来标识生产到的地位。在生产点位之前的音讯表明曾经生产过了。当然这个offset是队列级别的。每个生产组都会保护订阅的Topic下的每个队列的offset。5. Producer(生产者)生产者即数据的发布者,该角色将音讯公布到Kafka的topic中。broker接管到生产者发送的音讯后,broker将该音讯追加到以后用于追加数据的segment文件中。生产者发送的音讯,存储到一个partition中,生产者也能够指定数据存储的partition。6. Leader and Follower每个partition有多个正本,其中有且仅有一个作为Leader,Leader是以后负责数据的读写的partition。Follower追随Leader,所有写申请都通过Leader路由,数据变更会播送给所有Follower,Follower与Leader保持数据同步。如果Leader生效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,从新创立一个Follower。 1.Kafka 的分区策略有哪些?所谓分区策略就是决定⽣产者将音讯发送到哪个分区的算法。 轮询策略:默认的分区策略,⾮常优良的负载平衡体现,它总是能保障音讯最⼤限度地被平均分配到所有分区上;随机策略:实现随机策略版的 partition ⽅法;按音讯键保序策略:也称 Key-Ordering 策略,能够保障同⼀个 Key 的所有音讯都进⼊到雷同的分区⾥,因为每个分区下的音讯解决是有程序的,所以称之为音讯键保序策略;2.音讯队列中如何保证数据音讯不失落?音讯失落是上游零碎没收到上游零碎发送的音讯,造成零碎间数据不统一。比方,订单零碎没有把胜利状态的订单音讯胜利发送到音讯队列里,造成上游的统计零碎没有收到下单胜利订单的音讯,于是造成零碎间数据的不统一,从而引起用户查看集体订单列表时跟理论不相符的问题。首先剖析音讯队列的流程,音讯失落的状况别离可能产生在生产端,Kafka服务端,生产端。 (1)生产端须要保障不少生产音讯应用带有回调办法的 API 时,咱们能够依据回调函数得悉音讯是否发送胜利,如果发送失败了,咱们要进行异样解决,比方把失败音讯存储到本地硬盘或近程数据库,等利用失常了再发送,这样能力保障音讯不失落。设置参数 acks=-1。acks 这个参数是指有多少分区正本收到音讯后,生产者才认为音讯发送胜利了,可选的参数值有 0、1 和 -1。 acks=0,示意生产者不期待任何服务器节点的响应,只有发送音讯就认为胜利。 acks=1,示意生产者收到 leader 分区的响应就认为发送胜利。 acks=-1,示意只有当 ISR中的正本全副收到音讯时,生产者才会认为音讯生产胜利了。这种配置是最平安的,因为如果 leader 正本挂了,当 follower 正本被选为 leader 正本时,音讯也不会失落。然而零碎吞吐量会升高,因为生产者要期待所有正本都收到音讯后能力再次发送音讯。第三个,设置参数 retries=3。参数 retries 示意生产者生产音讯的重试次数。这里 retries=3 是一个倡议值,个别状况下能满足足够的重试次数就能重试胜利。然而如果重试失败了,对异样解决时就能够把音讯保留到其余牢靠的中央,如磁盘、数据库、近程缓存等,而后等到服务失常了再持续发送音讯。第四个,设置参数 retry.backoff.ms=300。retry.backoff.ms 指音讯生产超时或失败后重试的间隔时间,单位是毫秒。如果重试工夫太短,会呈现零碎还没复原就开始重试的状况,进而导致再次失败。联合我集体教训来说,300 毫秒还是比拟适合的。 只有下面这四个要点配置对了,就能够保障生产端的生产者不少生产音讯了。(2)服务端保障不丢音讯第一个,设置 replication.factor >1。replication.factor 这个参数示意分区正本的个数,这里咱们要将其设置为大于 1 的数,这样当 leader 正本挂了,follower 正本还能被选为 leader 正本持续接管音讯。第二个,设置 min.insync.replicas >1。min.insync.replicas 指的是 ISR 起码的正本数量,原理同上,也须要大于 1 的正本数量来保障音讯不失落。 这里我简略介绍下 ISR。ISR 是一个分区正本的汇合,每个分区都有本人的一个 ISR 汇合。但不是所有的正本都会在这个汇合里,首先 leader 正本是在 ISR 汇合里的,如果一个 follower 正本的音讯没落后 leader 正本太长时间,这个 follower 正本也在 ISR 汇合里;可是如果有一个 follower 正本落后 leader 正本太长时间,就会从 ISR 汇合里被淘汰进来。也就是说,ISR 里的正本数量是小于或等于分区的正本数量的。第三个,设置 unclean.leader.election.enable = false。unclean.leader.election.enable 指是否能把非 ISR 汇合中的正本选举为 leader 正本。unclean.leader.election.enable = true,也就是说容许非 ISR 汇合中的 follower 正本成为 leader 正本。如果设置成这样会有什么问题呢?假如 ISR 汇合内的 follower1 正本和 ISR 汇合外的 follower2 正本向 leader 正本拉取音讯(如下图 1),也就是说这时 ISR 汇合中就有两个正本,一个是 leader 正本,另一个是 follower1 正本,而 follower2 正本因为网络或本身机器的起因曾经落后 leader 正本很长时间,曾经被踢出 ISR 汇合。忽然 leader 和 follower1 这两个正本挂了,因为 unclean.leader.election.enable = true,而当初分区的副本能失常工作的仅仅剩下 follower2 正本,所以 follower2 最终会被选为新的 leader 正本并持续接管生产者发送的音讯,咱们能够看到它接管了一个新的音讯 5。如果这时 follower1 正本的服务复原,又会产生什么状况呢?因为 follower 正本要拉取 leader 正本同步数据,首先要获取 leader 正本的信息,并感知到当初的 leader 正本的 LEO 比本人的还小,于是做了截断操作,这时 4 这个音讯就丢了,这就造成了音讯的失落。因而,咱们肯定要把 unclean.leader.election.enable 设置为 false,只有这样非 ISR 汇合的正本才不会被选为分区的 leader 正本。然而这样做也升高了可用性,因为这个分区的正本没有 leader,就无奈收发音讯了,然而音讯会发送到别的分区 leader 正本,也就是说分区的数量实际上缩小了。 ...
1.音讯队列在你我的项目中起到什么作用?从以前的单体架构到当初的微服务架构,成千盈百的服务之间互相调用和依赖。从互联网初期一个服务器上有 100 个在线用户曾经很了不得,到当初坐拥10亿日活的微信。咱们须要有一个「货色」来解耦服务之间的关系、管制资源正当合时的应用以及缓冲流量洪峰等等。它罕用来实现:异步解决、服务解耦、流量管制。 异步解决随着公司的倒退你可能会发现你我的项目的申请链路越来越长,例如刚开始的电商我的项目,能够就是粗犷的扣库存、下单。缓缓地又加上积分服务、短信服务等。这一路同步调用下来客户可能等急了,这时候就是音讯队列退场的好时机。调用链路长、响应就慢了,并且绝对于扣库存和下单,积分和短信没必要这么的 "及时"。因而只须要在下单完结那个流程,扔个音讯到音讯队列中就能够间接返回响应了。而且积分服务和短信服务能够并行的生产这条音讯。能够看出音讯队列能够缩小申请的期待,还能让服务异步并发解决,晋升零碎总体性能。 服务解耦除了加积分服务和短信服务,这时候可能又要来个营销服务,或者要丢掉某些服务。为了投合这些上游零碎订单服务须要常常地批改,任何一个上游零碎接口的变更可能都会影响到订单服务。这样的话订单零碎的保护老本就⾮常的⾼,要时时刻刻思考其余关联系统如果呈现故障该怎么办?A 零碎是发还是先把音讯保存起来呢?选用音讯队列来解决零碎之间耦合的问题,订单服务把订单相干音讯塞到音讯队列中,只负责⽣产数据,不须要思考音讯被哪个零碎来生产。 流量管制A 零碎调⽤ B 零碎解决数据,每天 0 点到 12 点,A 零碎⻛平浪静,每秒并发申请数量就 100 个。后果每次⼀到12 点 ~ 13 点,每秒并发申请数量忽然会暴增到 1 万条。然而 B 零碎最⼤的解决能⼒就只能是每秒钟解决 1000 个申请,这样零碎很容易就会崩掉。这种状况能够引⼊音讯队列,把申请数据先存⼊音讯队列中,生产零碎再依据⾃⼰的生产能⼒拉取生产。 2.常见音讯队列的模式?RabbitMQ 采纳队列模型,RocketMQ和Kafka 采纳公布/订阅模型。 1.队列模型生产者往某个队列外面发送音讯,一个队列能够存储多个生产者的音讯,一个队列也能够有多个消费者, 然而消费者之间是竞争关系,即每条音讯只能被一个消费者生产。2.公布/订阅模型公布/订阅模型是为了解决一条音讯能被多个消费者生产的问题。该模型是将音讯发往一个Topic即主题中,所有订阅了这个 Topic 的订阅者都能生产这条音讯。其实能够这么了解,公布/订阅模型等于咱们都退出了一个群聊中,我发一条音讯,退出了这个群聊的人都能收到这条音讯。那么队列模型就是一对一聊天,我发给你的音讯,只能在你的聊天窗口弹出,是不可能弹出到他人的聊天窗口中的。讲到这有人说,那我一对一聊天对每个人都发同样的音讯不就也实现了一条音讯被多个人消费了嘛。是的,通过多队列全量存储雷同的音讯,即数据的冗余能够实现一条音讯被多个消费者生产。RabbitMQ 就是采纳队列模型,通过 Exchange 模块来将音讯发送至多个队列,解决一条音讯须要被多个消费者生产问题。这里还能看到假如群聊里除我之外只有一个人,那么此时的公布/订阅模型和队列模型其实就一样了。 小结一下队列模型每条音讯只能被一个消费者生产,而公布/订阅模型就是为让一条音讯能够被多个消费者生产而生的,当然队列模型也能够通过音讯全量存储至多个队列来解决一条音讯被多个消费者生产问题,然而会有数据的冗余。
一、简介MQ全称为Message Queue-音讯队列,是一种应用程序对应用程序的音讯通信,一端只管往队列一直公布信息,另一端只管往队列中读取音讯,发布者不须要关怀读取音讯的谁,读取音讯者不须要关怀公布音讯的是谁,各干各的互不烦扰。 市场上当初罕用的音讯队列有:RabbitMQ、RocketMQ、Kafka,ActiveMQ 二、MQ的劣势(1) 解耦应用音讯MQ后,只须要保障音讯格局不变,不须要关怀发布者及消费者之间的关系,这两者不须要彼此分割 (2) 异步在一些不须要即时(同步)的返回后果操作,通过音讯队列来实现异步。 (3) 削峰在大量申请时(秒杀场景),应用音讯队列做缓冲解决,减弱峰值流量,避免零碎在短时间内被峰值流量冲垮。 场景:在大量流量涌入顶峰,如数据库只能抗住2000的并发流量,能够应用MQ管制2000到数据库中 (4) 日志解决日志存储在音讯队列中,用来解决日志,比方kafka。 三、MQ的劣势零碎的可用性升高在还未引进MQ之前,零碎只须要关系生产端与生产端的接口一致性就能够了,当初引进后,零碎须要关注生产端、MQ与生产端三者的稳定性,这减少零碎的累赘,零碎运维成本增加。 零碎的复杂性进步引入了MQ,须要思考的问题就减少了,如何保障音讯的一致性,生产不被反复生产等问题, 一致性问题A零碎发送完音讯间接返回胜利,然而BCD零碎之中若有零碎写库失败,则会产生数据不统一的问题。 四、常见问题(1) 怎么保障音讯没有反复生产?应用音讯队列如何保障幂等性幂等性:就是用户对于同一操作发动的一次申请或者屡次申请的后果是统一的,不会因为屡次点击而产生了副作用 问题呈现起因 咱们先来理解一下产生音讯反复生产的起因,对于MQ的应用,有三个角色:生产者、MQ、消费者,那么音讯的反复这三者会呈现: 生产者:生产者可能会推送反复的数据到MQ中,有可能controller接口反复提交了两次,也可能是重试机制导致的MQ:假如网络呈现了稳定,消费者生产完一条音讯后,发送ack时,MQ还没来得及承受,忽然挂了,导致MQ认为消费者还未生产该条音讯,MQ回复后会再次推送了这条音讯,导致呈现反复生产。消费者:消费者接管到音讯后,正筹备发送ack到MQ,忽然消费者挂了,还没得及发送ack,这时MQ认为消费者还没生产该音讯,消费者重启后,MQ再次推送该条音讯。解决方案 在失常状况下,生产者是客户,咱们很难避免出现用户反复点击的状况,而MQ是容许存在多条一样的音讯,但消费者是不容许呈现生产两条一样的数据,所以幂等性个别是在生产端实现的: 状态判断:消费者把生产音讯记录到redis中,再次生产时先到redis判断是否存在该数据,存在则示意生产过,间接抛弃业务判断:生产完数据后,都是须要插入到数据库中,应用数据库的惟一束缚避免反复生产。插入数据库前先查问是否存在该数据,存在则间接抛弃音讯,这种形式是比较简单粗犷地解决问题(2) 音讯失落的状况 (3) 音讯的传输程序性解决思路 在生产端公布音讯时,每次法公布音讯都把上一条音讯的ID记录到音讯体中,消费者接管到音讯时,做如下操作 先依据上一条Id去查看是否存在上一条音讯还没被生产,如果不存在(生产后去掉id),则失常进行,如果失常操作如果存在,则依据id到数据库查看是否被生产,如果被生产,则失常操作如果还没被生产,则休眠肯定工夫(比方30ms),再从新查看,如被生产,则失常操作如果还没被生产,则抛出异样
一、序言提早工作利用宽泛,提早工作典型利用场景有订单超时主动勾销;领取回调重试。其中订单超时勾销具备幂等性属性,无需思考反复生产问题;领取回调重试须要思考反复生产问题。 提早工作具备如下特点:在将来的某个工夫点执行;个别仅执行一次。 1、实现原理生产者将带有提早信息的音讯发送到RabbitMQ交换机中,期待延迟时间完结方将音讯转发到绑定的队列中,消费者通过监听队列生产音讯。提早工作的要害在音讯在交换机中停留。 不言而喻,基于RabbitMQ实现提早工作对服务器的可靠性要求极高,交换机内部消息无长久化机制,比方单机模式服务重启,未开始的提早工作均失落。 2、组件选型 二、方案设计(一)服务器RabbitMQ服务须要装置x-delayed-message插件以解决提早音讯。 (二)生产者提早工作的实现对生产者的要求是将音讯牢靠的投递到交换机,因而应用confirm确认机制即可。 订单生成之后,先入库,而后以订单ID为key将订单详情存入Redis中(长久化),向RabbitMQ发送异步confirm确定申请。如果收到失常投递返回,则删除Redis中订单ID为key的数据,回收内存,否则以订单ID为key,从Redis中查问出订单数据,从新发送。 (三)消费者提早工作的实现对消费者的要求是以信息不失落的形式生产音讯,具体表现在:手动确认音讯的生产,避免音讯失落;生产端继续稳固,避免音讯沉积;音讯生产失败有重试机制。 思考到订单提早勾销属于幂等性操作,因而无需思考音讯的反复生产问题。 三、SpringBoot实现实现局部仅贴一部分外围源码,残缺我的项目请拜访GitHub。 (一)生产者思考到下单是极为重要的操作,因而首先将订单落库、存盘,而后进行后续操作。 for (long i = 1; i <= 10; i++) { /* 1.模仿生成订单 */ BuOrder order = createOrder(i); /* 2.订单入库 */ orderService.removeById(order); orderService.saveOrUpdate(order); /* 3.将订单存入信息Redis */ RedisUtils.setObject(RabbitTemplateConfig.ORDER_PREFIX + i, order); /* 4.向RabbitMQ异步投递音讯 */ rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId()));}生产者牢靠投递音讯 public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData == null) { return; } String key = ORDER_PREFIX + correlationData.getId(); if (ack) { /* 如果音讯投递胜利,则删除Redis中订单数据,回收内存 */ RedisUtils.deleteObject(key); } else { /* 从Redis中读取订单数据,从新投递 */ BuOrder order = RedisUtils.getObject(key, BuOrder.class); /* 从新投递音讯 */ rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId())); }}(二)消费者消费者端手动确认,防止音讯失落;失败主动重试。 ...
技术栈OS: Ubuntu 20.04 LTSdocker: 20.10.12docker-compose: 1.25.0Elasticsearch: 7.16.3Logstash: 7.16.3kafka: 2.13-2.8.1Python: 3.8.2kafka-python: 2.0.2用 docker 搭建 logstash官网文档docker 镜像拉取:https://www.elastic.co/guide/...docker 镜像配置:https://www.elastic.co/guide/...docker 镜像目录构造:https://www.elastic.co/guide/...配置步骤拉取镜像docker pull docker.elastic.co/logstash/logstash:7.16.3logstash 配置文件 /home/qbit/logstash/settings/logstash.ymlhttp.host: "0.0.0.0"xpack.monitoring.elasticsearch.hosts: [ "http://192.168.1.46:9200" ]管道配置文件 /home/qbit/logstash/pipeline/:/usr/share/logstash/pipeline/es-pipeline.confinput { kafka { codec => json bootstrap_servers => "192.168.1.46:9092" topics => ["coder_topic"] }}filter { mutate { add_field => { "timestamp" => "%{@timestamp}" } remove_field => ["@version"] } date { match => [ "timestamp", "ISO8601" ] # 这里用 @timestamp 解析会出错 target => "time0" } ruby { code => " time1 = event.get('@timestamp').time.getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08') time2 = Time.parse(event.get('timestamp')).getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08') time3 = Time.now.getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08') event.set('time1', time1) event.set('time2', time2) event.set('time3', time3) " }}output { stdout { codec => json_lines } elasticsearch { hosts => ["192.168.1.46:9200"] index => "coder_index" document_id => "%{id}" }}创立容器docker run --rm -it --name logstash \-v /home/qbit/logstash/pipeline/:/usr/share/logstash/pipeline/ \-v /home/qbit/logstash/settings/logstash.yml:/usr/share/logstash/config/logstash.yml \docker.elastic.co/logstash/logstash:7.16.3用 Python 发送音讯producer.py# encoding: utf-8# author: qbit# date: 2022-01-28# summary: 向 kafka 发送音讯import jsonfrom kafka import KafkaProducerdef producer(): producer = KafkaProducer( bootstrap_servers="192.168.1.46:9092", key_serializer=lambda k: json.dumps(k).encode('utf8'), value_serializer=lambda v: json.dumps(v).encode('utf8'), ) id = 'qbit' dic = {'id': f"{id}", 'age': '23'} producer.send(topic="coder_topic", key=id, value=dic) print(f"send key: {id}, value: {dic}")if __name__ == "__main__": producer()运行后果# python3 producer.pysend key: qbit, value: {'id': 'qbit', 'age': '23'}用 Kibana 查看 ES 中数据GET coder_index/_search{ "_index": "coder_index", "_type": "_doc", "_id": "qbit", "_score": 1.0, "_source": { "id": "qbit", "age": "23", "@timestamp": "2022-01-28T01:03:40.733Z", // logstash event 工夫戳 "timestamp": "2022-01-28T01:03:40.733Z", "time0": "2022-01-28T01:03:40.733Z", "time1": "2022-01-28T09:03:40+08", "time2": "2022-01-28T09:03:40+08", "time3": "2022-01-28T09:03:40+08" // filter 中 ruby 代码生成的工夫戳 }}本文出自 qbit snap
作者:文婷、不周 引言: 本篇文章次要介绍 RocketMQ 的可观测性工具在线上生产环境的最佳实际。RocketMQ的可观测性能力当先业界同类产品,RocketMQ 的 Dashboard 和音讯轨迹等性能为业务外围链路保驾护航,有效应对线上大规模生产应用过程中遇到的容量布局、音讯收发问题排查以及自定义监控等场景。音讯队列简介进入主题之前,首先简要介绍下什么是阿里云的音讯队列? 阿里云提供了丰盛的音讯产品家族,音讯产品矩阵涵盖了互联网、大数据、物联网等各个业务场景的畛域,为云上客户提供了多维度可选的音讯解决方案。无论哪一款音讯队列产品,外围都是帮忙用户解决业务和零碎的异步、解耦以及应答流量洪峰时的削峰填谷,同时具备分布式、高吞吐、低提早、高可扩大等个性。 然而不同的音讯产品在面向客户业务的利用中也有不同的偏重。简略来做,音讯队列 RocketMQ 是业务畛域的首选音讯通道;Kafka 是大数据畛域不可或缺的音讯产品;MQTT 是物联网畛域的音讯解决方案;RabbitMQ 侧重于传统业务音讯畛域;云原生的产品集成和事件流通道是通过音讯队列 MNS 来实现;最初事件总线 EventBridge 是一个阿里云上的一个事件枢纽,对立构建事件核心。 本篇次要讲的是业务畛域的音讯首选通道:音讯队列 RocketMQ。RocketMQ 诞生于阿里的电商零碎,具备高性能、低提早、削峰填谷等能力,并且提供了丰盛的在业务和音讯场景上应答刹时流量洪峰的性能,被集成在用户的外围业务链路上。 作为一个外围业务链路上的音讯,就要求 RocketMQ 具备十分高的可观测性能力,用户能通过可观测性能力及时的监控定位异样稳定,同时对具体的业务数据问题进行排查。由此,可观测性能力逐渐成为音讯队列 RocketMQ 的外围能力之一。 那么什么是可观测能力呢?上面简略对可观测能力进行介绍。 可观测能力提到可观测能力,大家可能最先想到的是可观测的三要素:Metrics(指标)、Tracing(追踪)和 Logging(日志)。 联合音讯队列的了解,可观测能力三要素的细化解释如下: Metrics:Dashborad 大盘1)指标涵盖丰盛: 蕴含音讯量、沉积量、各个阶段耗时等指标,每个指标从实例、Topic、生产 GroupID 多维度做聚合和展现; 2)音讯团队最佳实际模板: 为用户提供最佳模板,特地是在简单的生产音讯场景,提供了丰盛的指标帮忙疾速定位问题,并继续迭代更新; 3)Prometheus + Grafana: Prometheus规范数据格式、利用Grafana展现,除了模板,用户也能够自定义展现大盘。 Tracing:音讯轨迹1)OpenTelemetry tracing规范: RocketMQ tracing 规范曾经合并到 OpenTelemetry 开源规范,标准和丰盛 messaging tracing 场景定义; 2)音讯畛域定制化展现: 依照音讯维度从新组织形象的申请 span 数据,展现一对多的生产,屡次生产信息,直观、不便了解; 3)可连接 tracing链路上下游: 音讯的 tracing 可继承调用上下文,补充到残缺调用链路中,音讯链路信息串联了异步链路的上游和上游链路信息。 Logging:客户端日志标准化1)Error Code标准化: 不同的谬误有惟一的 error code; ...
原创不易,转载请注明出处前言延时音讯(定时音讯)指的在分布式异步音讯场景下,生产端发送一条音讯,心愿在指定延时或者指定工夫点被生产端生产到,而不是立即被生产。 延时音讯实用的业务场景十分的宽泛,在分布式系统环境下,延时音讯的性能个别会在下沉到中间件层,通常是 MQ 中内置这个性能或者内聚成一个公共根底服务。 本文旨在探讨常见延时音讯的实现计划以及方案设计的优缺点。 实现计划1. 基于内部存储实现的计划这里探讨的内部存储指的是在 MQ 自身自带的存储以外又引入的其余的存储系统。基于内部存储的计划实质上都是一个套路,将 MQ 和 延时模块 辨别开来,延时音讯模块是一个独立的服务/过程。延时音讯先保留到其余存储介质中,而后在音讯到期时再投递到 MQ。当然还有一些细节性的设计,比方音讯进入的延时音讯模块时曾经到期则间接投递这类的逻辑,这里不展开讨论。 下述计划不同的是,采纳了不同的存储系统。 基于 数据库(如MySQL)基于关系型数据库(如MySQL)延时音讯表的形式来实现。 CREATE TABLE `delay_msg` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT, `delivery_time` DATETIME NOT NULL COMMENT '投递工夫', `payloads` blob COMMENT '音讯内容', PRIMARY KEY (`id`), KEY `time_index` (`delivery_time`))通过定时线程定时扫描到期的音讯,而后进行投递。定时线程的扫描距离实践上就是你延时音讯的最小工夫精度。 长处: 实现简略;毛病: B+Tree索引不适宜音讯场景的大量写入;基于 RocksDBRocksDB 的计划其实就是在上述计划上抉择了比拟适合的存储介质。 RocksDB 在笔者之前的文章中有聊过,LSM 树更适宜大量写入的场景。滴滴开源的DDMQ中的延时音讯模块 Chronos 就是采纳了这个计划。 DDMQ 这个我的项目简略来说就是在 RocketMQ 里面加了一层对立的代理层,在这个代理层就能够做一些性能维度的扩大。延时音讯的逻辑就是代理层实现了对延时音讯的转发,如果是延时音讯,会先投递到 RocketMQ 中 Chronos 专用的 topic 中。延时音讯模块 Chronos 生产失去延时音讯转储到 RocksDB,前面就是相似的逻辑了,定时扫描到期的音讯,而后往 RocketMQ 中投递。 ...
前言: 小编工夫隔了一年多之久未更新了,忙碌的工作和生存阶段性告一段落了,从新拾起笔杆子码字啦。尽管也没几个粉丝,然而仍然干货满满,也算是一个阶段性的总结,也是从新拾起技术的笔记,也是不便记录知识点好之后查看。码字不易,欢送拍砖。1、Redisredis 作为音讯队列来应用,在很多我的项目中都有使用。最重要的个性就是内存型的音讯队列。那有些人就要说了,redis 是一个缓存两头键,哪里有什么音讯队列。如果理解过数据结构与算法的话,就很容易了解。音讯队列是一种队列数据结构,具备先进先出的个性,是能够通过代码来实现的。 redis 6.0版本之前都是单线程的。所有的操作的是原子性的(要么全副执行胜利,要么全副失败)这样都人造的反对高并发的业务场景。来实现队列的性能也绝对简略。 退出到队列 $cacheKey = 'key'; $data = array($key=>$value);$ret = $redis->rPush($cacheKey , $data);取出队列 $ret = $redis->rPop($cacheKey);总结 redis 做音讯队列的长处:内存操作性能高。毛病也很显著,因为是内存,受限于内存容量大小的限度,不易扩大。至于队列执行失败,数据失落问题也能够在执行队列RPOPLPUSH 这个办法进行补充。具体队里的链接地址,能够自行查看Redis队列命令2、rabbitmq说起rabbitmq,就要从底层编程语言,erlang 编程语言说起,语言层面原生反对并发编程。不必放心并发产生的问题。rabbitmq 个性是文件型的音讯队列。具体具体能够参考 RabbitMq 官网超具体的RabbitMQ入门,看这篇就够了! 3、kafka目前用的比拟多的音讯队。这些都是比拟成熟的的两头键,开箱即用。具体知识点 Kafka 入门常识 总结 市面上的MQ队列产品有很多。最罕用的应用场景 redis、rabbitmq 、kafka。 redis 能够实现基于内存模式的音讯队列 rabbitmq 、kafka 是基于文件类型分布式音讯队列。 其中kafka 目前在最受欢迎的音讯队列产品之一。应用宽泛 本文大体上介绍,市面上的罕用的音讯队列。具体的细节知识点,都有链接提供参考。
作者:凯易&耘田审核校对:白玙编辑&排版:雯燕 前言:随着 RocketMQ 5.0 preview 的公布,5.0 的重大个性逐渐与大家见面。POP Consumer 作为 5.0 的一大个性,POP 生产模式展示了一种全新的生产模式。其具备的轻量级,无状态,无队列独占等特点,对于音讯积压场景,Streaming 生产场景等都十分敌对。在介绍 POP Consumer 之前,咱们先回顾一下目前应用较多的 Push Consumer。 Push Consumer相熟 RocketMQ 的同学对 Push Consumer 必定不会生疏,客户端生产个别都会应用这种生产模式,应用这种生产模式也比较简单。咱们只需简略设置,并在回调办法 ConsumeMessage 中写好业务逻辑即可,启动客户端利用就能够失常生产音讯了。 public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("test_topic", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }}那么 Push Consumer 是如何生产音讯的呢? ...
作者:不周审核校对:岁月、明锻编辑&排版:雯燕 “ 4982 亿,58.3 万笔/秒 ”的背地在新冠肺炎疫情催化下,数字化生存形式渐成新常态。“4982 亿,58.3 万笔/秒”是 2020 天猫双 11 寰球狂欢节(简称:天猫双 11 )对数字经济的先发劣势和微小潜能的直观体现。 面对千万级并发、万亿级的流量洪峰,背地无力撑持的便是双十一交易外围链路的官网指定产品:音讯队列 RocketMQ 。 双十一交易场景业务痛点随着双十一的逐年升温,保障交易场景的稳定性已成为各企业在双十一业务中的要害,每年双十一流动的凌晨,是“万民狂欢”的日子,同时也是各企业交易系统备受考验的时候,保障外围交易系统的业务解决能力、有效应对每秒数十万笔的交易订单成为重中之重,若不能进行流量缓冲将间接引发这些零碎的解体。防止零碎解体的外围“秘诀”便是音讯队列 RocketMQ。 音讯队列 RocketMQ 是如何帮忙各企业交易系统扛住霎时千万级 TPS、万亿级流量洪峰的冲击,并放弃各个利用之间的音讯通顺的呢?上面为您介绍音讯队列 RocketMQ 应答双十一流量洪峰的“六大武器”。 音讯队列 RocketMQ 的“六大武器”双十一的流量洪峰到底会给用户和商家零碎业务带来哪些问题?音讯队列 RocketMQ 的“六大武器”是如何解决这些问题的呢?小编带您初探一二: 武器一:“异步解耦”背景:双十一的夜晚,当用户在手机上“指点江山”时,可曾想,一个小小的购物 APP 背地其实是一个个宏大的零碎,从用户选购商品的那一刻起,就要和成百个业务零碎打交道,每一笔交易订单数据都会有几百个上游业务零碎的关联,包含物流、购物车、积分、直充、流计算剖析等等,整个零碎宏大而且简单,架构设计稍有不合理,将间接影响主站业务的连续性。 面对如此简单且宏大的零碎,防止零碎业务之间互相耦合影响,便要用到音讯队列 RocketMQ 的“异步解耦”性能,通过音讯队列 RocketMQ 实现上、上游业务零碎松耦合,松耦合能够升高零碎的复杂度,缩短用户申请的响应工夫(将原多个步骤的所需工夫之和压缩到只需一条音讯的工夫),保障上游某个子系统的故障不影响整个链路。 武器二:“削峰填谷”背景:在解决完交易业务背地宏大的零碎所带来的耦合性问题后,从用户视角登程来看,双十一期间 0 点这个工夫有成千盈百万的用户在同时点击着购买页面,因为用户海量申请,导致流量激增,面对如此大量的拜访流量,上游的告诉零碎可能无奈承载海量的调用量,甚至会导致系统解体等问题而产生漏告诉的状况。 为解决这些问题,就要用到音讯队列 RocketMQ 的“削峰填谷”性能,可在利用和上游告诉零碎之间退出音讯队列 RocketMQ,RocketMQ 反对高并发的音讯低提早写入,以及有限的沉积能力,能够防止超高流量的冲击,确保上游业务在平安水位内平滑稳固的运行。 武器三:“分布式事务音讯”背景:通过后面的介绍理解到,通过音讯的异步解耦,可实现音讯的分布式解决,在传统的分布式事务处理形式中,用户创立了一条新的订单信息,伴着这条订单信息的变更,在整个业务链条中的购物车、用户表、积分等都须要变更,零碎须要借助分布式事务协调组件来保障多个业务调用的事务一致性。传统的分布式事务组件谋求强一致性,性能吞吐低,零碎简单。那如何能力既实现分布式事务,同时又不使零碎过于简单? 这个时候音讯队列 RocketMQ 的“分布式事务音讯”的性能便起到了关键作用,通过原创的轻量级订单流转事务协调能力,只需发送一条音讯,就能够实现音讯最终一致性的分布式事务,同时确保订单状态长久化和上游调用统一。 武器四:“音讯过滤”背景:通过以上介绍会发现从客户下单到客户收到商品这一过程会生产一系列音讯,按音讯品种能够分为交易音讯、物流音讯、购物车音讯等,如何保障各个品种的音讯进行无效投递并被精确生产? 这时候就要用到音讯队列 RocketMQ 的“音讯过滤”性能,能够通过 Tag 给不同品种的音讯定义不同的属性,依据音讯属性设置过滤条件对音讯进行过滤,只有合乎过滤条件的音讯才会被投递到生产端进行生产。比方给物流音讯定义地区属性,依照地区分为杭州和上海: 订单音讯物流音讯物流音讯且地区为杭州物流音讯且地区为上海 武器五:“定时音讯”背景:除了以上零碎级别中可能呈现的问题外,用户本人在购物过程中可能都遇到过一些小细节,比方在点击了购买按钮后,会呈现“请您在 30 分钟内实现领取”的提醒,如果超过 30 分钟未领取,订单就会主动敞开。 这个业务用到的是音讯队列 RocketMQ 的“定时音讯”性能,音讯队列 RocketMQ 能够实现自定义秒级精度距离的定时音讯,通过音讯触发一些定时工作,比方在某一固定工夫点向用户发送揭示音讯,最终实现海量订单状态变更超时的核心调度。 ...
装置应用地址:http://www.rabbitmq.com/须要依据不同的版本抉择不同的erlang装置erlang 装置RabbitMQ首先须要装置erlang环境,依据GitHub上erlang中README配置yum源 将下面的内容写入红线标注的文件中保留退出,而后装置erlang yum update -yyum install -y erlang-23.3.4装置RabbitMQ下载实现RabbitMQ的安装包后执行上面的命令装置 yum install rabbitmq-server-3.8.19-1.el7.noarch.rpm装置实现后能够看到有6个相干的可执行程序 其中外围是rabbitmq-server RabbitMQ的应用启动rabbitmq-server start & # 后盾启动启动RabbitMQ图形化治理界面RabbitMQ的图形化治理界面模式是没有关上的,须要咱们手动去关上,rabbitmq-plugins就是治理各种插件的工具 rabbitmq-plugins list # 查看所有插件rabbitmq-plugins enable rabbitmq_management # 关上图形化界面rabbitmq-plugins disable rabbitmq_management # 敞开图形化界面拜访图形化治理界面关上图形化治理界面插件后就能够通过本机浏览器拜访RabbitMQ了:http://127.0.0.1:15672/ 用户名和明码都是guest 如果要在其余机器上拜访该界面则须要配置用户名和明码,权限等 rabbitmqctl add_user Guest Guest # 增加用户rabbitmqctl set_user_tags Guest administrator # 赋予该用户管理员角色rabbitmqctl set_permissions -p / Guest ".*" ".*" ".*" # 给该用户赋予权限这样就能够在其余机器上拜访RabbitMQ的治理界面了 上面通过Go语言实现一个简略的音讯队列的利用 consumer.go package mainimport ( "fmt" "github.com/streadway/amqp")//消费者func main() { //1.建设连贯 connection, err := amqp.Dial("amqp://Guest:Guest@192.168.27.139:5672") if err != nil { panic(err) } defer connection.Close() //2.设置通道 ch, err := connection.Channel() if err != nil { panic(err) } //3.定义exchange //err = ch.ExchangeDeclare("exchange-name", "direct", false, false, false, false, nil) err = ch.ExchangeDeclare("exchange-name-topic", "topic", false, false, false, false, nil) if err != nil { panic(err) } //4.定义队列 queue queue, err := ch.QueueDeclare("", false, false,false,false, nil) if err != nil { panic(err) } //5.绑定 bind //err = ch.QueueBind(queue.Name, "direct_key", "exchange-name", false, nil) //topic 含糊匹配 err = ch.QueueBind(queue.Name, "topic.#", "exchange-name-topic", false, nil) if err != nil { panic(err) } //6.接管音讯 consume_msg, err := ch.Consume(queue.Name, "", false, false, false, false, nil) if err != nil { panic(err) } //7.打印消息 //msg := <-consume_msg //fmt.Printf("received msg: %s\n", msg.Body) for msg := range consume_msg { fmt.Printf("received msg: %s\n", msg.Body) }}producer.go ...
音讯队列つ退场大家好,我是老三,是一个电商公司的程序员,负责订单零碎的开发。 掉了不少头发之后,我实现了用户订单领取的开发。 订单领取的业务是这样的。用户领取实现之后,我须要更新订单状态,这一部分是在本零碎实现的。接下来,我要调用库存零碎,减库存,好了,剩下的就是库存零碎的事件了。 开发、联调、测试、上线,我的小日子变得安闲起来,每天就是在群里吹牛打屁。 可是没过两天,产品妹子,找过去了,她说,她想加个性能,用户实现订单领取当前,要减少用户的积分。 没问题,so easy,噼里啪啦,我两天就做完了,无非是调用一下会员零碎。 这天,正和沙雕群友斗图的时候,产品妹子过去,他说要接入音讯零碎,好,搞! 又过两天,她说要增加营销零碎,行吧,干! 又过两天,她说要搞举荐零碎,嗯……,来吧! 又过两天…… 于是零碎就变成了这个样子: 就这样,我过上了暗无天日的日子,我要保护和若干个零碎的对接,每次他们公布新版本,我都要跟着值班。 我要迭代,也要改和几个零碎的对接代码。 周一,营销零碎; 周二,库存零碎; …… 这天,眼圈发黑的我正在和上游服务撕巴的时候,忽然忍不住两腿战战,她来了,产品妹(女)子(王)来了——她是我不能回绝的女人。 软弱的眼泪流了满面,我的猿生一片灰暗…… 没想到,代救星呈现了,我的好敌人傲天过去了,拿鼻孔看着我。 “你个Loser,竟不晓得用音讯队列,怪不得天天被人欺侮,哼!” 一语惊醒梦中人,为什么不必音讯队列啊? 于是我引入音讯队列,对系统进行了重构。 这下好了,我只管更新订单状态,剩下的丢给音讯队列,你们这些上游本人去音讯队列生产音讯就好了,别来缠着我了。 …… 引入音讯队列之后,又是一个安逸的下午。 我没有在群聊里扯扯,因为我退群了。 前几天,我受到了前所未有的挫伤—— 我在群里讥嘲一个老哥,技术真菜,连音讯队列都不会! 老哥反手就收回他和女朋友的合照,“独身狗,技术好又怎么样,连个女朋友都没有!” 我霎时san值狂掉! “程序员独身,不算独身……new个对象的事,能算独身么?”接连着便是什么难懂的话,什么“没有妹子”,什么“哲学”之类,引得众人都哄笑起来,群里充斥了快活的空气。 于是,这个下午我盯着空洞无物的需要单发愣,公司真的没有妹子么?…… 好了,简短的前奏完结了,接下来该进入注释了。音讯队列つ用处在下面的前言中,咱们曾经理解了音讯队列最重要的一个用处: 解耦通过音讯队列,升高零碎间的耦合,防止过多的调用。 就如同公司的行政小姐姐要告诉一件事件,她通常会是在群里发一个布告,而后咱们扣1就行了。要是一个个告诉,她要告诉几十上百次。 异步还是下面咱们提到的订单领取,领取之后,咱们要扣减库存、减少积分、发送音讯等等,这样一来这个链路就长了,链路一长,响应工夫就变长了。引入音讯队列,除了更新订单状态,其它的都能够异步去做,这样一来就来,就能更快地响应咱们的上游。 为什么不必多线程之类的形式做异步呢?—— 嗨,只用多线程做异步,不是还得写代码去调那一堆上游吗,所以这又回到理解耦这个问题上。 削峰音讯队列同样能够用来削峰。 用一个比喻,一条河流,如果它的上游能包容的水量是无限的,为了避免洪水冲垮堤坝,咱们应该怎么办呢? 咱们能够在上游建筑一个水库,洪峰来的时候,咱们先把水给蓄起来,闸口里只放出上游能接受地住的水量。 放在咱们的零碎,也是一个情理。比方秒杀零碎,平时流量很低,然而要做秒杀流动,秒杀的时候流量疯狂怼进来,你的服务器,Redis,MySQL各自的承受能力都不一样,间接全副流量照单全收必定有问题啊,重大点可能间接打挂了。 所以一样,咱们能够把申请放到队列外面,只放出咱们上游服务能解决的流量,这样就能抗住短时间的大流量了。 除了这三大用处之外,还能够利用队列自身的程序性,来满足音讯必须按程序投递的场景;利用队列 + 定时工作来实现音讯的延时生产 …… 音讯队列つ实质过气老北鼻马老师有三招——接、化、发。 音讯队列外围有三板斧:发、存、生产。 生产者先将音讯投递一个叫做「队列」的容器中,而后再从这个容器中取出音讯,最初再转发给消费者[1]。 下面这个图便是音讯队列最原始的模型,它蕴含了音讯队列中的一两个关键词音讯和队列队列: 音讯:就是要传输的数据,能够是最简略的文本字符串,也能够是自定义的简单格局。队列:大家应该再相熟不过了,是一种先进先出数据结构。它是寄存音讯的容器,音讯从队尾入队,从队头出队,入队即发消息的过程,出队即收音讯的过程。所以音讯队列的实质就是把要传输的数据放在队列中。 围绕着这个实质: 把数据放到音讯队列的角色就是生产者把数据从队列中取出的就是消费者音讯队列つ模型[1]咱们下面曾经理解了音讯队列模型的实质,随着利用场景的变动,音讯队列的模型逐步产生了一些演进。 就如同咱们的文字通信,最开始单对单地发消息,起初能够群发,再起初,能够拉一个群聊。 队列模型最后的音讯队列就是上一节讲的原始模型,它是一个严格意义上的队列(Queue)。音讯依照什么程序写进去,就依照什么程序读出来。不过,队列没有 “读” 这个操作,读就是出队,从队头中 “删除” 这个音讯。 ...
默认交换机实际上是一个没有名称(empty string)的Direct exchange.每个新建队列都会默认绑定到这个交换机上。绑定的路由键(routing key)名称与队列名称雷同 Direct (直连交换机)最常应用,会依据routingkey进行精准匹配。直连交换机能够散发工作给多个工作者(worker) Topic(主题交换机)依据routingkey进行含糊匹配,将音讯分发给一个或多个队列(delimited by dots)。 routingkey能够有通配符'*','#'。* 示意匹配一个单词,# 匹配0个或多个单词。 因为绑定关系比拟麻烦,该类型只在一些业务简单的队列零碎中利用。 Fanout (扇形交换机)将生产分发给所有绑定的队列,而不会理睬routingkey。长处是转发音讯最快,性能最好。个别会用来解决播送音讯(broadcast routing)。 Headers (头交换机)相似于直连交换机。不同点在与头交换机的路由规定建设在头属性之上而不是路由键。个别开发应用较少
对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ 设施接入服务(IoTDA)是华为云物联网平台的外围服务,IoTDA 须要一款牢靠的消息中间件,通过比照多款消息中间件的能力与个性,Apache Pulsar 凭借其多租户设计、计算与存储拆散架构、反对 Key_Shared 模式生产等个性成为华为云物联网消息中间件的首选。本文介绍了 Pulsar 在华为云物联网的上线历程以及上线过程中遇到的问题和相应的解决方案。 华为云设施接入服务介绍设施接入服务(IoTDA)具备海量设施连贯上云、设施和云端双向音讯通信、数据流转、批量设施治理、近程管制和监控、OTA 降级、设施联动规定等能力。下图为华为云物联网架构图,下层为物联网利用,包含车联网、智慧城市、智慧园区等。设施层通过直连网关、边缘网络连接到物联网平台。目前华为云物联网联接数超过 3 亿,IoT 平台竞争力中国第一。 数据流转指用户在物联网平台设置规定后,当设施行为满足规定条件时,平台会触发相应的规定动作来实现用户需要,例如对接到华为云其余服务,提供存储、计算、剖析设施数据的全栈服务,如 DIS、Kafka、OBS、InfluxDb 等,也能够通过其余通信协议和客户的零碎对接,如 HTTP、AMQP。在这些动作中,物联网平台次要做客户端或服务端。依据用户类别,能够将应用场景分为三类: 体量较大的客户个别会抉择推送到消息中间件(如 Pulsar、Kafka)上,并在云上构建本人的业务零碎进行生产解决。中长尾客户通常会抉择将数据推送到本人的数据库(如 MySQL)中进行解决,或由本人的 HTTP 服务器接收数据进行解决。更轻量级的客户会抉择通过 AMQP 协定创立简略的客户端进行连贯。原推送模块的痛点原推送模块采纳 Apache Kafka 计划,这种运行模式自身有一些弊病,且扩容操作简单,为开发和运维团队带来累赘。此外,原推送模块反对客户端类型和服务端类型的推送,但不反对 AMQP 推送,其架构图如下。Consumer 一直从 Kafka 中拉取音讯,并将发送失败的音讯存入数据库,期待重试。这种运行模式带来了很多问题: 即便很多客户的服务器不可达,consumer 仍须要从 Kafka 拉取音讯(因为 Kafka 只有一个 topic)并尝试发送。无奈依据单用户来配置音讯的存储工夫和大小。有些客户的服务器能力差,无法控制将音讯推送到单个客户的速率。 Topic 数量2020 年 5 月,为晋升产品的竞争力,咱们打算让客户通过 AMQP 协定来接管流转数据。AMQP 协定的客户端接入更加简单,而客户可能会将 AMQP 客户端集成在手机端,每天定时上线两小时,这种状况下,咱们须要保障客户在应用时不会呈现数据失落,因而要求消息中间件反对多于规定数量的 topic(有些客户单规定下数据量大,单 topic 无奈撑持)。目前,咱们的规定数量已超过 3 万,预计很快会达到 5 万,并且还会持续增长。 Kafka topic 在底层占用文件句柄,且共享 OS 缓存,无奈反对量级较大的 topic,友商的 Kafka 最多能够撑持 1800 个 topic。咱们要想反对规定数量级别的队列,就必须保护多个 Kafka 集群,下图是咱们基于 Kafka 设计的计划。 ...
对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ 导语各位小伙伴们,Pulsar 社区周报更新来啦! 本次 Pulsar 社区周报,为大家出现 Pulsar client、broker、transactions 等内容,帮忙社区小伙伴们把握 Pulsar 我的项目及社区每周停顿,也不便大家更好地参加到 Pulsar 社区中来! 感激本周以下小伙伴为 Apache Pulsar 添砖加瓦(排名不分先后,看看你有没有上榜): jerrypeng、dlg99、rdhabalia、linlinnn、315157973、k2la、eolivelli、wolfstudy、cbornet、abhilashmandaliya、 fantapsody、lhotari、merlimat、devinbost、BewareMyPower、zymap、freeznet、codelipenghui、dragonls、KannarFr接下来,一起看看 5 月 3 日 ~ 5 月 9 日有哪些值得你关注的停顿吧! 本周亮点C++ Client:防止发送零许可的流申请。https://github.com/apache/pul... 贡献者:@BewareMyPower Java Client:防止发送零许可的流申请。https://github.com/apache/pul... 贡献者:@BewareMyPower 重要停顿PIP-45:复原会话后,从新验证 leader 选举。https://github.com/apache/pul... 贡献者:@merlimat 重要个性因为 PR 较多,仅列举较大 PR 停顿,不包含当周全副动静上面 PR 均已合入 Pulsar 主分支Broker:在强制删除 namespace 后,删除残余信息。https://github.com/apache/pul... 贡献者:@315157973 Client:修复 Pulsar 客户端空指针异样的问题。https://github.com/apache/pul... 贡献者:@abhilashmandaliya Bookie:修复 ledger rollover 期间公布回调的 entry 数据为空的问题。https://github.com/apache/pul... 贡献者:@BewareMyPower Enhancement:改良在本地运行启动过程中处理错误的形式。https://github.com/apache/pul... 贡献者:@jerrypeng ...
背景咱们最近在做新业务的技术选型,其中波及到了对消息中间件的抉择;联合咱们的理论状况心愿它能满足以下几个要求: 敌对的云原生反对:因为当初的主力语言是 Go,同时在运维上可能足够简略。官网反对多种语言的 SDK:还有一些 Python、Java 相干的代码须要保护。最好是有一些不便好用的个性,比方:延时音讯、死信队列、多租户等。当然还有一些程度扩容、吞吐量、低提早这些个性就不必多说了,简直所有成熟的消息中间件都能满足这些要求。 基于以上的筛选条件,Pulsar 进入了咱们的视线。 作为 Apache 下的顶级我的项目,以上个性都能很好的反对。 上面咱们来它有什么过人之处。 架构 从官网的架构图中能够看出 Pulsar 次要有以下组件组成: Broker 无状态组件,能够程度扩大,次要用于生产者、消费者连贯;与 Kafka 的 broker 相似,但没有数据存储性能,因而扩大更加轻松。BookKeeper 集群:次要用于数据的长久化存储。Zookeeper 用于存储 broker 与 BookKeeper 的元数据。整体一看仿佛比 Kafka 所依赖的组件还多,这样的确会提供零碎的复杂性;但同样的益处也很显著。 Pulsar 的存储于计算是拆散的,当须要扩容时会非常简单,间接新增 broker 即可,没有其余的心智累赘。 当存储成为瓶颈时也只须要扩容 BookKeeper,不须要人为的做重均衡,BookKeeper 会主动负载。 同样的操作,Kafka 就要简单的多了。 个性多租户多租户也是一个刚需性能,能够在同一个集群中对不同业务、团队的数据进行隔离。 persistent://core/order/create-order以这个 topic 名称为例,在 core 这个租户下有一个 order 的 namespace,最终才是 create-order 的 topic 名称。 在理论应用中租户个别是依照业务团队进行划分,namespace 则是以后团队下的不同业务;这样便能够很清晰的对 topic 进行治理。 通常有比照才会有挫伤,在没有多租户的消息中间件中是如何解决这类问题的呢: 罗唆不分这么细,所有业务线混着用,当团队较小时可能问题不大;一旦业务减少,治理起来会十分麻烦。本人在 topic 之前做一层形象,但其实实质上也是在实现多租户。各个业务团队各自保护本人的集群,这样当然也能解决问题,但运维复杂度天然也就进步了。以上就很直观的看出多租户的重要性了。 Function 函数计算Pulsar 还反对轻量级的函数计算,例如须要对某些音讯进行数据荡涤、转换,而后再公布到另一个 topic 中。 这类需要就能够编写一个简略的函数,Pulsar 提供了 SDK 能够不便的对数据进行解决,最初应用官网工具公布到 broker 中。 ...
对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ Apache Pulsar 社区迎来两位 Transaction 背地的 committer在 3 月下旬,来自 StreamNative 的两位软件工程师高冉、丛搏入选 Apache Pulsar Committer。恭喜高冉、丛搏成为 Apache Pulsar 社区 Committer! 高冉与丛搏属于国内最早一批将 Apache Pulsar 落地企业的“尝鲜者”。两人也是晚期即退出了 Pulsar 社区的贡献者。目前高冉与丛搏的次要工作方向聚焦在为 Pulsar 退出 Transaction 新个性,请在行将公布的 Pulsar 2.8.0 新版本公布中关注他们的成绩。 同时,咱们也对高冉、丛搏做了书面采访,聊聊他们与 Apache Pulsar 的故事。 上面内容依据采访整顿而成。 高冉与 Pulsar大家好,我叫高冉,当初是 StreamNative 工程师,负责 Pulsar SQL、Transaction 和分层存储等工作。 最后接触 Pulsar 是在前一家公司将 Pulsar 落地并优化。因为我的项目须要应用 Pulsar SQL,从调研、落地到应用中遇到问题、解决问题,在对 Pulsar 的一直优化中我在社区提 PR 并开始了成为贡献者的路线。Pulsar 的劣势很多,计算与存储拆散架构设计使其领有其余音讯零碎所不具备的弹性。 成为 Pulsar 开发人员奉献 PR 过程中不免遇到困难。就我个人经历而言,最开始我也对 Pulsar SQL 的应用不甚了解,于是找到社区搭档们学习交换。这个理解过程中少不了社区的很多反对,许多有教训的搭档们提供信息帮忙我少走弯路,让我很快理解了 Pulsar SQL 的工作原理。 ...
第一章、Kafka概述1.1、定义Kafka是一个分布式的基于公布/订阅模式的音讯队列,用于大数据实时处理畛域 1.2、音讯队列1.2.1、利用场景异步解决、解耦、削峰 1.2.2、应用音讯队列的益处解耦:容许你独立批改和扩大两边的处理过程,只有他们遵循雷同的接口标准可恢复性:零碎的一部分组件生效后,不会影响到整个零碎。音讯队列升高了过程之间的耦合度,所以即便一个解决音讯的过程挂掉后,退出到队列的音讯依然可能在零碎复原后被解决缓冲:无效管制音讯流的速度,解决生产和生产数据速度不统一的状况削峰:能够在突发流量的时候扛住压力异步通信: 1.2.3、音讯队列的两种模式点对点:消费者被动拉取数据,音讯收到后将音讯从队列中革除音讯生产者生产音讯发送到Queue中 而后音讯消费者从Queue中取出并且生产音讯。音讯被生产当前,Queue中不再有存储,所以音讯消费者不可能生产到曾经被生产的音讯。Queue反对存在多个消费者,然而对一个音讯而言,只会有一个消费者能够生产。公布订阅:一对多模式,消费者生产数据之后不会革除数据;生产者将数据生产到topic中,订阅了该topic的消费者被动从该topic生产数据。公布到topic的音讯会被所有消费者生产 1.3、kafka架构 1.3.1要害概念producer:生产数据的客户端consumer:生产数据的客户端consumer group:一组生产数据的客户端,一个消费者只能生产一个分区的数据,消费者之间互不影响kafka cluster:Kafka的服务器集群broker:一个broker就是一个kafka服务器。一个集群能够有多台broker,一个broker能够包容多个topictopic:相当于一个队列partition:为了实现零碎的延展性,会将一个topic散布到多台broker下,散布到每台broker下的队列就是一个partition。每个partition都是一个有序的队列replica:为了保障kafka集群的高可用,避免一个broker宕机之后partition的数据失落,须要对partition做备份leader:master partition主分区。生产者生产和消费者生产数据的对象都是leaderfollower:slave partition从分区。从主分区中同步数据;当leader挂掉之后,某个follower会变成新的leader
写在后面预计运维年前没有祭拜服务器,Nginx的问题修复了,Kafka又不行了。明天,原本想再睡会,后果,电话又响了。还是经营,“喂,冰河,到公司了吗?连忙看看服务器吧,又出问题了“。“在路上了,运维那哥们儿还没下班吗”? “还在休假。。。”, 我:“。。。”。哎,这哥们儿是跑路了吗?先不论他,问题还是要解决。 问题重现到公司后,放下我专用的双肩包,拿出我的利器——笔记本电脑,关上后迅速登录监控零碎,发现次要业务零碎没啥问题。一个非核心服务收回了告警,并且监控零碎中显示这个服务频繁的抛出如下异样。 2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]从下面输入的异样信息,大略能够判断出零碎呈现的问题:Kafka消费者在解决完一批poll音讯后,在同步提交偏移量给broker时报错了。大略就是因为以后消费者线程的分区被broker给回收了,因为Kafka认为这个消费者挂掉了,咱们能够从上面的输入信息中能够看出这一点。 ...
对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ 导语各位小伙伴们,2021 年 3 月 Pulsar 社区月报来啦!新年伊始,Pulsar 有哪些新年新气象呢?和咱们一起来看看吧。 感激以下各位社区成员本月对 Pulsar 我的项目的反对,让 Apache Pulsar 持续发光发热!(排名不分先后,看看你有没有上榜 ): congbobo184、hangc0276、eolivelli 、Huanli-Meng、 lhotari、zymap、 jerrypeng、david-streamlio、michaeljmarshall、Anonymitaet、kandersen82、klwilson227、freeznet、MarvinCai、RobertIndie、rdhabalia、fmiguelez、315157973、sijia-w、sakurafly123、tuteng、congbobo184、dlg99、timmyyuan、Jennifer88huang、yangl、aahmed-se、BewareMyPower、caladogan、gaoran10、csthomas1、wuzhanpeng、linlinnn、mauza、xiaotongwang1、limingnihao、murong00、merlimat、codelipenghui、fanjeff、patricklucas、golden-yang、mlyahmed、limingnihao 、BewareMyPower、HugeSkull、hnail、massakam、wolfstudy、bithavoc重要停顿BrokerBroker:反对获取利用的 BlacklogQuota。https://github.com/apache/pul... Broker:反对获取利用的 SubscriptionDispatchRate。https://github.com/apache/pul... Broker:将待处理的读订阅指标增加到 stats-internal。https://github.com/apache/pul... Broker:为 PulsarService#getAdminClient 增加 NPE 查看。https://github.com/apache/pul... Broker:避免应用有效的 broker 或 proxy configuration 设置受权。https://github.com/apache/pul... Broker:将 getWorkerService 办法更改为抛出 UnsupportedOperationException。https://github.com/apache/pul... Broker:如果响应曾经提交,则不增加 broker-address 标头。https://github.com/apache/pul... Broker:验证 /offload 申请的参数。https://github.com/apache/pul... Broker:反对获取已利用的 PersistencePolicies。https://github.com/apache/pul... Broker:反对获取已利用的 clusterSubscribeRate。https://github.com/apache/pul... Broker:反对获取已利用 dispatchRate。https://github.com/apache/pul... Broker:反对获取压缩阀值。https://github.com/apache/pul... BuildBuild:从 Pulsar docker 镜像中删除 .deb包。https://github.com/apache/pul... Build:将 pulsar-io-kafka-connect-adaptor 拆分为 JAR 和 NAR 模块。https://github.com/apache/pul... ...
作者:陈航,BIGO 大数据音讯平台团队负责人。本期文章排版:Tango@StreamNative。 对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ 背景在上一篇博客中,咱们探讨了 BIGO 在 Pulsar Broker 性能调优过程中遇到的一些问题并提出相应的解决方案。本篇博客,咱们将探讨 BIGO 在 Pulsar 底层分布式存储服务 BookKeeper 的性能调优工作。 对于 BIGO 而言,Apache Pulsar 在 bookie 端(BookKeeper 的单个存储节点)的零碎性能次要存在以下几个问题: 读申请耗时较长,排队重大;Bookie 呈现 direct memory Out of Memory (OOM),导致过程挂掉;压测的时候经常出现 broker direct memory OOM;当 journal 盘为 HDD 时,尽管敞开了 fsync,然而 bookie add entry 99th latency 仍旧很高, 写入性能很差;当大量读申请进入 bookie 时,呈现写被反压,add entry latency 回升。当 Ledger 盘为 HDD 时,体现更加显著。保障 BookKeeper 的稳定性以及高吞吐和低提早是 Pulsar 稳固、吞吐的基石。本文会基于 BookKeeper 基本原理介绍影响读写吞吐和稳定性的因素。咱们打算次要从以下六个方面介绍 bookie 性能调优: ...
作者 StreamNative 文档工程师刘昱、软件工程师张勇。 对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ 在 Apache Pulsar 2.6.3 版本公布后的 2 个月,2021 年 3 月 18 日,Apache Pulsar 正式公布了 2.7.1 版本! Apache Pulsar 2.7.1 版本新增诸多优化改良,修复大量破绽,笼罩 Broker、Proxy、Pulsar Perf、Transaction、Pulsar Admin、Pulsar SQL、Client、Function、Pulsar IO 和 Tiered Storage 等方面,进一步丰盛和欠缺 Apache Pulsar 作为云原生流数据平台的能力。 Apache Pulsar 2.7.1 版本总共承受了来自社区约 717 个 commits,合并社区约 210 个 PR,越来越多的小伙伴开始参加到 Pulsar 社区建设中,成为 contributor 的一员。不少代码和文档奉献来自于中国开发者,中国力量越发迅猛。 以下为你具体解读 Apache Pulsar 2.7.1 版本重要的优化改良和破绽修复。 Broker优化 schema 比拟的逻辑在 2.7.1 之前,Pulsar schema 通过将 schema 转换为 bytes 的形式比拟 schema 类型。上传 schema 时,如果 schema 含有空格或者换行符,会造成 schema 类型不兼容。在 2.7.1 版本中,Pulsar schema 应用 equals 来比拟 schema 类型,因而不会造成 schema 类型不兼容。 ...
导读:在 3 月 6 日 TGIP-CN 直播流动上,咱们邀请到 StreamNative 高级工程师吕能,他为大家分享了 Pulsar Function Mesh 的性能与个性。上面是吕能分享视频的简洁文字整顿版本,供大家参考。很快乐明天能跟大家分享 StreamNative 基于 Pulsar Function 做的新工作:Function Mesh,它整个外围的想法是把一些简单的、拆散的、独自治理的 Function 进行统一化治理,基于原生地整合到 Kubernetes 中,并能充分利用其多方面的性能和调度算法。 Pulsar 中的数据处理 首先看一下 Pulsar 中所反对的各种数据处理的模块和形式,次要分为三个方面。第一,是基于 Presto 的交互式查问,Pulsar 中有本人的 Pulsar SQL,基于 Presto 对 Pulsar 整个集群查问;有和 Presto 相干的 Connector,能够间接通过 Presto 集群来查问 topic。 第二,作为音讯队列、音讯解决数据的外围,Pulsar 能够对接各种不同的流数据或者批数据处理的框架,比方 Flink、Spark、Hive。后续咱们会公布 Pulsar 和 Flink SQL 整合的残缺解决方案。 最初是 Pulsar 内建了 Pulsar Function,核心思想在于提供一个最简略的 API,让用户可能不便地解决在 Pulsar 中流动的数据。总结来说,Pulsar Function 是一个轻量级数据处理的过程,次要进行如下操作: 生产来自一个或多个 Pulsar topic 的音讯;将用户提供的解决逻辑利用于每个音讯;将后果公布到一个 Pulsar topic。Pulsar Function何为 Pulsar Function ...
本文转自腾讯云中间件,作者张超,腾讯数据平台部 MQ 团队高级工程师,Apache TubeMQ(incubating) PMC,Kafka-on-Pulsar Maintainer,Apache Pulsar Contributor 腾讯数据平台数平 MQ 团队对 Pulsar 做了深刻调研以及大量的性能和稳定性方面优化,目前曾经在腾讯云音讯队列 TDMQ 落地上线。本文次要简略梳理了 Pulsar 反对的一些传统音讯队列利用场景,以及 Pulsar 新个性对更多场景的反对。 对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ 音讯队列概述什么是音讯队列音讯队列(Message Queue,简称MQ),是指在音讯的传输中保留音讯的容器或服务,是一种异步的服务间通信形式,实用于无服务器和微服务架构,是分布式系统实现高性能、高可用、可伸缩等高级特效的重要组件。 常见的支流音讯队列有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ、Pulsar 等。而在公司内有 TubeMQ、Ckafka、TDMQ、CMQ、CDMQ、Hippo 等。 音讯队列特点分布式音讯队列都是分布式的,因而才能够提供异步、解耦等性能。 可靠性基于音讯的通信是牢靠的,音讯不会失落。大多数音讯队列都提供将音讯长久化到磁盘的性能。 异步通过音讯队列,可将近程同步调用拆解成为异步调用。对于不须要获取近程调用后果的利用场景来说,性能晋升显著。 松耦合音讯间接由中间件存储和散发。音讯生产者只需关注如何将音讯发送给音讯中介服务器;消费者只需关注如何从中介服务器订阅。生产者和消费者之间是齐全解耦的,不须要晓得彼此的存在。 事件驱动能够将简单的利用零碎重构成为事件驱动的零碎。事件溯源(Event Sourcing),示意一个对象从创立到沦亡,会通过的多种状态。如果把对象的状态变动都存储下来,岂但能够依据状态变动记录获取对象的以后状态,也能够回溯对象的变动过程。音讯队列能很好地反对这样的零碎设计形式,将触发对象状态变动的事件放入音讯队列。 音讯队列分类在 JMS(JAVA Message Service)规范中,有P2P(Point to Point)和 Publish/Subscribe(Pub/Sub) 两种音讯模型。 P2PP2P的特点是每个音讯只有一个消费者。音讯生产者将音讯发送到音讯队列(Queue)中,只有一个消费者可能生产此音讯,生产实现之后音讯即删除。任意一个消费者都能够生产这个音讯,但音讯相对不会被两个消费者反复生产。 Pub/SubPub/Sub 的特点是公布到 Topic 的音讯会被所有订阅者生产。音讯生产者将音讯发送到音讯主题(Topic)中,所有订阅这个主题的消费者都能够生产此音讯,当所有订阅者都生产实现之后能力删除音讯。 音讯的生产者和消费者之间有工夫依赖,只有当时订阅这个主题的消费者才可生产。如果先发送音讯,后订阅主题,那么订阅之前的音讯将不能被这个订阅者生产。 传统企业型音讯队列 ActiveMQ 遵循了 JMS 标准,实现了点对点和公布订阅模型,但其余风行的音讯队列 RabbitMQ、Kafka 并没有遵循 JMS 标准。 而在实时流式架构中,音讯队列的消息传递能够分为队列(Queue)和流(Stream)两类。 队列(Queue)模型队列模型次要是采纳无序或者共享的形式来生产音讯。通过队列模型,用户能够创立多个消费者从单个管道中接管音讯;当一条音讯从队列发送进去后,多个消费者中的只有一个(任何一个都有可能)接管和生产这条音讯。音讯零碎的具体实现决定了最终哪个消费者理论接管到音讯。 队列模型通常与无状态应用程序一起联合应用。无状态应用程序不关怀排序,但它们的确须要可能确认(ACK)或删除单条音讯,以及尽可能地扩大生产并行性的能力。典型的基于队列模型的音讯零碎包含 RabbitMQ 和 RocketMQ。 ...
Pulsar 周报由 StreamNative 翻译整顿。 原文内容来自 StreamNative 官网 Pulsar 周报模块 https://streamnative.io/weekly。本期编辑:Tango@StreamNative。 对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ 导语各位小伙伴们,Pulsar 社区周报更新来啦! 本次 Pulsar 社区周报,为大家出现 Pulsar client、broker、Functions、transactions,、authentication 等内容,帮忙社区小伙伴们把握 Pulsar 我的项目及社区每周停顿,也不便大家更好地参加到 Pulsar 社区中来! 感激本周以下小伙伴为 Apache Pulsar 添砖加瓦(排名不分先后,看看你有没有上榜): mauza、xiaotongwang1、eolivelli 、limingnihao 、rdhabalia、linlinnn、 aahmed-se、 murong00、Anonymitaet、315157973、MarvinCai、congbobo184、merlimat、lhotari、codelipenghui、BewareMyPower、fanjeff、patricklucas、golden-yang接下来,一起看看 3 月 1 ~ 7 日有哪些值得你关注的停顿吧! 本周亮点broker:容许启用或禁用 cursor 指标。https://github.com/apache/pul... 贡献者:@linlinnn Transactions:在待处理的 ack 操作上新增 最低偏移量(low watermark)的解决。https://github.com/apache/pul... 贡献者:@congbobo184 重要个性因为 PR 较多,仅列举较大 PR 停顿,不包含当周全副动静 上面 PR 均已合入 Pulsar 主分支Common:优化 ConcurrentOpenHashMap,缩短读锁死的持续时间。https://github.com/apache/pul... 贡献者:@merlimat Utils:转储 pulsar-perf 的 JVM 信息。https://github.com/apache/pul... 贡献者:@eolivelli ...
仿照Kafka,从零开始自实现 MQ,实现了 Kafka 中 80% 的根底性能。学习 Kafka 的话如果只是看文章和源码,可能不久就会忘了,还是本人实现一个「精简版」的 Kafka 吧, 实现性能概览1、基于内存Queue实现生产和生产API [X] 1) 创立内存Queue, 作为底层音讯存储[X] 2) 定义Topic, 反对多个Topic[X] 3) 定义Producer, 反对Send音讯[X] 4) 定义Consumer, 反对Poll音讯2、设计自定义Queue,实现音讯确认和生产offset [X] 1) 自定义内存Message数组模仿Queue。[X] 2) 应用指针记录以后音讯写入地位。[X] 3) 对于每个命名消费者, 用指针记录生产地位3、拆分broker和client(包含producer和consumer) [X] 1) 将Queue保留到web server端[X] 2) 设计音讯读写API接口, 确认接口, 提交offset接口[X] 3) producer和consumer通过httpclient拜访Queue[X] 4) 实现音讯确认, offset提交[X] 5) 实现consumer从offset增量拉取我的项目目录bitkylin-mq 我的项目设计及我的项目能力Server一、Topic保护ArrayList用于模仿长久化音讯「起因:音讯须要随机拜访」设定音讯队列容量,达到容量时无奈再生产音讯以后音讯的最大索引二、ConsumerGroup消费者组由消费者组名和topic名独特决定,即不同topic的消费者组互相独立,不会相互影响需依据topic创立消费者组,即消费者组必须关联topic消费者组创立后,默认从头残缺生产关联topic的所有音讯同一个消费者组内,各个消费者总共生产一次「起码生产一次」所关联topic的所有音讯三、broker一个broker关联一个ConsumerGroup列表和一个Topic列表通过broker裸露的接口,能够展现关联ConsumerGroup列表和Topic列表的概览信息通过broker裸露的接口,能够向一个topic中生产音讯通过broker裸露的接口,能够依据消费者组名和topic名生产音讯注:本次仅实现单个broker,broker后实现了topic和consumerGroup「消费者组」,细节结构图如下: client客户端通过topic名生产音讯客户端依据消费者组名和topic名生产音讯客户端生产音讯时,能够同时取得消费者组的offset「偏移量」客户端生产音讯胜利后,需手动更新消费者组的offset。若不更新,客户端默认无奈生产前面的音讯。客户端生产音讯失败时,不应更新消费者组的offset。此时客户端能够反复生产当条音讯。多个客户端能够应用同一个消费者组生产同一个topic;能够应用不同的消费者组生产同一个topic;能够应用不同的消费者组生产不同的topic客户端工作示意图如下: 我的项目构造本我的项目共提供四个module: bitkylin-mq-serverbitkylin-mq-apibitkylin-mq-client-producerbitkylin-mq-client-consumer各module的介绍如下: 1. bitkylin-mq-server提供MQ服务端,提供broker以及其关联的ConsumerGroup和Topic等,次要实现如下性能: 展现MQ概览信息,包含topic和ConsumerGroup的详细信息创立消费者组,创立消费者组后,即可应用该消费者组生产音讯生产音讯,将音讯发送至指定topic基于指定消费者组生产音讯,生产音讯但不更新关联消费者组的offset基于指定消费者组生产音讯,生产音讯且自动更新关联消费者组的offset手动更新指定消费者组的偏移量2. bitkylin-mq-api提供供客户端应用的api,通过feignClient模式提供,客户端可间接应用,执行RPC,以后实现如下性能: 发送音讯至指定topic订阅指定topic的音讯。主动创立消费者组,应用观察者模式轮询音讯并生产。3. bitkylin-mq-client-producer音讯生产客户端,通过feign-api生产音讯,以后实现如下演示性能:随机向topic名为「topic-1」和「topic-2」的topic中发送音讯,每隔3秒发送一次音讯。 4. bitkylin-mq-client-consumer音讯生产客户端,通过feign-api生产音讯,以后实现如下演示性能: 创立消费者组「spring-group-1」订阅「topic-1」,并打印订阅的音讯。创立消费者组「spring-group-2」订阅「topic-2」,并打印订阅的音讯。代码演示运行module「bitkylin-mq-server」,启动MQ的broker,启动音讯服务。运行module「bitkylin-mq-client-consumer」和「bitkylin-mq-client-producer」,开启音讯订阅演示工作和音讯发送演示工作。此时可通过「bitkylin-mq-client-consumer」的控制台,看到音讯一直被生产。2021-01-24 01:55:58.008 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:12021-01-24 01:56:00.996 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:22021-01-24 01:56:04.000 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:32021-01-24 01:56:07.004 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:42021-01-24 01:56:10.015 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:52021-01-24 01:56:13.011 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:62021-01-24 01:56:16.011 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:72021-01-24 01:56:19.006 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:82021-01-24 01:56:21.997 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:92021-01-24 01:56:24.994 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:102021-01-24 01:56:28.002 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:112021-01-24 01:56:30.991 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:122021-01-24 01:56:34.014 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:132021-01-24 01:56:37.010 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:142021-01-24 01:56:40.004 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:15关上postman,发送如下申请创立专用于postman的消费者组:POST http://localhost:8080/mq/broker/consumer-group/create{ "groupName": "postman-group-1", "topicName": "topic-1"}发送如下申请即可生产音讯,且主动确认「无需手动更新消费者组的offset」POST http://localhost:8080/mq/broker/message/simple-pull{ "groupName": "postman-group-1", "topicName": "topic-1"}能够发现,postman能够独立生产指定topic的音讯,不受Spring程序生产的影响。当然,postman能够间接应用Spring程序统一的消费者组,以独特生产音讯。 ...
Pulsar Authorization 受权Pulsar的受权须要开启认证,且须要独自在Broker和Proxy的配置中开启,否则所有认证通过后的用户角色将对所有资源有权限,如未开启认证则所有客户端对所有资源有权限;受权也反对插件化扩大机制,但应用自带的实现就能够满足需要了。此外还能够配置超级用户角色和代理角色,对于集群的治理和Proxy拜访等十分有用。 开启Pulsar认证配置之前咱们在 Pulsar学习笔记之 Authentication认证机制与插件开发 文章中介绍了Pulsar的认证机制和认证插件的开发,有须要能够移步过来看认证插件的配置,也能够应用官网举荐的认证插件。 开启Pulsar受权配置示例# broker.confauthorizationEnabled=trueauthorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvidersuperUserRoles=user-role-123456proxyRoles=pulsar_proxy_role_1开启受权后,认证通过的用户角色默认是没有任何权限的,须要当时创立租户/命名空间,并给用户角色做受权。 创立租户和命名空间bin/pulsar-admin tenants create my-tenantbin/pulsar-admin namespaces create my-tenant/my-namespacebin/pulsar-admin namespaces grant-permission my-tenant/my-namespace \ --actions lookup,produce,consume \ --role userrole-123457客户端写入有权限命名空间下的TopicAuthenticationTabaltAK at = new AuthenticationTabaltAK("test_access_key2", "test_access_secret");//...Producer<byte[]> producer = client.newProducer() .topic("my-tenant/my-namespace/my-topic") .create();Pulsar resource-quotas 资源配额Pulsar resource-quotas 资源配额 用于限度命名空间级别的出入音讯速率、带宽等 bin/pulsar-admin resource-quotas set \ --memory 20 \ --msgRateIn 2 \ --msgRateOut 20 \ --bandwidthIn 2 \ --bandwidthOut 20 \ --bundle "0x00000000_0x40000000" \ --namespace "my-tenant/my-namespace" bin/pulsar-admin namespaces policies my-tenant/my-namespace bin/pulsar-admin resource-quotas reset-namespace-bundle-quota \ --bundle "0x00000000_0x40000000" \ --namespace "my-tenant/my-namespace"Pulsar backlog-quotas 配置bin/pulsar-admin namespaces set-backlog-quota "my-tenant/my-namespace" \ --limit 100 \ --policy producer_exception / producer_request_hold / consumer_backlog_evictionbin/pulsar-admin namespaces policies my-tenant/my-namespace限度backlog的大小能够失效,同时有一些“小特点”,但影响不大 ...
Pulsar自带的Authentication认证形式有很多种:TLS/Basic/JWT Token/Athenz/Sasl,然而均存在安全性或复杂性的一些问题,且有时候咱们须要和已有的账户零碎做集成,以保持一致的产品体验,此时须要自行开发认证插件。这里介绍一个应用签名机制加强安全性的认证插件开发计划。 Pulsar认证的扩大机制Pulsar认证提供了比拟好的扩大机制,通过实现几个预约义的接口类,就能够不便的插入本人开发的认证实现。这些接口次要包含以下4个: #客户端包装认证参数org.apache.pulsar.client.api.Authenticationorg.apache.pulsar.client.api.AuthenticationDataProvider#服务端验证认证参数org.apache.pulsar.broker.authentication.AuthenticationProviderorg.apache.pulsar.broker.authentication.AuthenticationState此外默认的多种认证形式的代码能够提供十分丰盛的认证实现参考。 待集成的认证服务假如咱们曾经有了一个账号零碎,外面存储有账号名(accessKey)、加盐hash过的明码(accessSecret)等认证须要的信息;咱们须要再开发一个专用的接口,供咱们实现的Pulsar认证插件调用。为防止认证接口被别人滥用,通过Header中的Auth参数做简略比对校验。 接口参数接管POST参数:accessKey,timeStr,paramStr,signature接管Header参数:Auth认证大抵过程对收到的accessKey做格局校验申请账号零碎接口或查询数据库获取accessSecret用跟客户端雷同的形式从新计算签名sginStr:accessKey+timeStr+paramStrsignature=base64(HmacSHA1.init(accessSecret).doFinal(signStr))比对签名,失败报错结构userRole认证通过后响应后果{ "uid":"123456", "userRole": "ur-123456", #命名规定能够自行感觉定}Pulsar认证插件的实现要点客户端包装认证参数传入参数:accessKey,accessSecret生成参数 paramStr:method=akauth&client=$UUID&rand=Math.rand()timeStr:String.valueOf(System.currentTimeMillis()/1000)sginStr:accessKey+timeStr+paramStr签名计算:signature=base64(HmacSHA1.init(accessSecret).doFinal(signStr))传递参数:accessKey,timeStr,paramStr,signature服务端转发认证参数接管参数:accessKey,timeStr,paramStr,signature申请 认证服务接口服务端签名存活期校验AuthenticationProviderTabaltAK.authenticate 判断是否过期// check auth params survival timelong currentTimeSeconds = System.currentTimeMillis() / 1000;long authTimeSeconds = Long.parseLong(authParams.getTimeStr());if ((authTimeSeconds + this.akSignatureSurvivalSeconds) < currentTimeSeconds) { throw new AuthenticationException("auth out of survival time");}TokenAuthenticationState.isExpired 判断是否过期 具体判断逻辑与下面雷同Pulsar认证插件的应用配置Pulsar服务端配置服务端通常在Broker和Proxy的配置文件中配置 # 配置Broker的认证插件和参数authenticationEnabled=trueauthenticationProviders=net.tabalt.pulsar.broker.authentication.AuthenticationProviderTabaltAKtabaltAKServerUrl=http://127.0.0.1/ak #AK服务地址tabaltAKServerAuth=test-auth #AK服务接口认证Token,通过header传递给认证服务tabaltAKSignatureSurvivalSeconds=3600 #签名存活秒数# 配置Broker作为客户端申请其余Broker的认证插件和参数,此处须要配置一个超级账号brokerClientAuthenticationPlugin=net.tabalt.pulsar.client.auth.AuthenticationTabaltAKbrokerClientAuthenticationParameters={"accessKey":"test_access_key","accessSecret":"test_access_secret"}Pulsar客户端配置AuthenticationTabaltAK authAK = new AuthenticationTabaltAK("test_access_key", "test_access_secret");PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .authentication(authAK).build();Producer<byte[]> producer = client.newProducer().topic("my-topic").create();...
编译 Pulsar Jar包拉取开源代码 mkdir ~/dev/apachecd ~/dev/apachegit clone https://github.com/apache/pulsar.gitcd pulsar#从TAG v2.6.1 创立本地分支git checkout -b v2.6.1_build v2.6.1 编译Jar包,并打包成公布包 mvn install -DskipTests# 查看编译后的后果cd distribution/tree ./cd server/targettree ./部署本地单机模式 Pulsar解压下面编译打包好的公布包,并启动单机模式,可用于本地开发测试 cd ~/dev/apache/pulsar/distribution/server/target# 解压编译好的Jar包tar -zxvf apache-pulsar-2.6.1-bin.tar.gzcd apache-pulsar-2.6.1# 批改单机模式的配置文件vim conf/standalone.conf# 启动单机模式Pulsarbin/pulsar standalone# 敞开单机模式PulsarCtrl + c单机模式开启 JWT认证 配置示例 # 生成secret和tokensmkdir -p /tmp/test-jwtbin/pulsar tokens create-secret-key --output /tmp/test-jwt/my-base64-secret.key --base64bin/pulsar tokens create --secret-key file:///tmp/test-jwt/my-base64-secret.key --subject my-jwt-user > /tmp/test-jwt/my-jwt-user.tokens# 批改认证相干的配置项vim conf/standalone.conf authenticationEnabled=true authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken tokenSecretKey=file:///tmp/test-jwt/my-base64-secret.key brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken brokerClientAuthenticationParameters=file:///tmp/test-jwt/my-jwt-user.tokens# 启动单机模式Pulsarbin/pulsar standalone构建Docker镜像将上述编译打包好的公布包构建成Docker镜像 # 构建镜像cd ~/dev/apache/pulsar/docker./build.sh# 查看构建的镜像docker images | grep pulsar推送镜像到镜像仓库 ...
本文转载自 Java 高级架构,原中文版本由闻数起舞翻译自 Lewis Fairweather 的文章《Pulsar Advantages Over Kafka》,文章转载时有改变。Apache Pulsar 欢送大家踊跃踊跃投稿、与社区共同进步,也欢送大家与作者进行交换!排版:Tango@StreamNative 对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ Photo by Mikael Kristenson on Unsplash 介绍最近,我始终在钻研 Pulsar 及其与 Kafka 的比拟。通过疾速搜寻,你会看到这两个最驰名的开源消息传递零碎之间正在进行的"和平"。 作为 Kafka 的用户,我着实对 Kafka 的某些问题感到困惑,但 Pulsar 却让人眼前一亮、令我十分兴奋。所以最初,我设法花了一些工夫理解背景材料,并且做了很多钻研。在本文中,我将重点介绍 Pulsar 的劣势,并阐明 Pulsar 胜于 Kafka 的理由。让咱们开始! Kafka 基础知识Kafka 是消息传递零碎之王。它由 LinkedIn 于 2011 年创立,并在 Confluent 的反对下失去了宽泛的流传。Confluent 已向开源社区公布了许多新性能和附加组件,例如用于模式演变的 Schema Registry,用于从其余数据源轻松流式传输的 Kafka Connect 等。数据库到 Kafka,Kafka Streams 进行分布式流解决,最近应用 KSQL 对 Kafka topic 执行相似 SQL 的查问等等。 ...
本文转载自社区用户:咖啡拿铁,猿辅导后端开发工程师,带你直击Pulsar的外围常识!Apache Pulsar 欢送大家踊跃踊跃投稿、与社区共同进步,也欢送大家与作者进行交换!编辑:Tango@StreamNative 对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ 背景Pulsar 是一个由 Yahoo!于 2016 年开源的消息中间件,2018 年成为 Apache 的顶级我的项目。在我之前的文章中写过很多其余消息中间件的文章,比方 Kafka,RocketMQ 等等,如果大家对于音讯队列不理解的能够浏览以下我之前的文章: 你须要理解的 Kafka你应该晓得的 RocketMQ聊聊计算和存储拆散在开源的业界曾经有这么多音讯队列中间件了,Pulsar 作为一个新权势到底有什么长处呢?Pulsar 自从出身就一直的再和其余的音讯队列(Kafka,RocketMQ 等等)做比拟,然而 Pulsar 的设计思维和大多数的音讯队列中间件都不同,具备了高吞吐,低提早,计算存储拆散,多租户,异地复制等性能,所以 Pulsar 也被誉为下一代音讯队列中间件,接下来我会一一对其进行具体的解析。 Pulsar 架构原理 整体的架构和其余的音讯队列中间件差异不是太大,置信大家也看到了很多相熟的名词,接下来会给大家一一解释这些名词的含意。 名词解释Producer:音讯生产者,将音讯发送到 Broker;Consumer:音讯消费者,从 Broker 读取音讯到客户端,进行生产解决;Broker:能够看作是 Pulsar 的 Server,Producer 和 Consumer 都看作是 Client. 音讯解决的节点,Pulsar 的 Broker 和其它消息中间件不一样,它是无状态的没有存储,所以能够无限度的扩大,这个前面也会详解讲到;Bookie::负责所有音讯的长久化,这里采纳的是 Apache Bookeeper;ZK:和 Kafka一样 Pulsar 也是应用 ZK 保留一些元数据,比方配置管理,Topic 调配,租户等等;Service Discovery:能够了解为 Pulsar 中的 Nginx,只用一个 url 就能够和整个 Broker 打交道,当然也能够应用本人的服务发送。客户端收回的读取,更新或删除主题的初始申请将发送给可能不是解决该主题的 Broker 。如果这个 Broker 不能解决该主题的申请,Broker 将会把该申请重定向到能够解决主题申请的 Broker。不论是 Kafka,RocketMQ,还是 Pulsar 作为音讯队列中间件最为重要的组成为以下三个局部: ...
本文作者为 jesse-anderson。内容由 StreamNative 翻译并整顿。 以三个理论应用场景为例,从 CTO 的视角登程,在技术等方面比照 Kafka 和 Pulsar。 浏览本文须要大概 8 分钟。 对于 Apache PulsarApache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。GitHub 地址:http://github.com/apache/pulsar/ 在评估新技术时,高层管理人员的视角通常与中层管理人员、架构师、数据工程师等有所不同。高层管理人员不仅要关注基准测试后果、产品反对的个性,还要从久远角度思考新技术的可靠性,新技术可能为企业带来哪些竞争劣势,以及是否能够缩短上市工夫、节约开销。 我是 Big Data Institute 的常务董事,技术评估是我的一项次要工作。咱们帮忙企业依据业务需要抉择并落地最合适的技术。咱们不与供应商单干,因而客户尤为看中咱们可能主观地评估不同的技术。 在本文中,我将从 CTO 的视角登程,比照 Apache Pulsar 和 Apache Kafka。只进行实践上的比照空洞有效,也不能帮忙咱们作出决策,理论用例才真正值得参考。所以,在本文中,我会通过一些常见的理论应用场景来比照 Pulsar 和 Kafka,即简略音讯应用场景、简单音讯应用场景和高级音讯应用场景。在这些理论应用场景下,Pulsar 和 Kafka 的体现可能帮忙咱们更好地了解二者的性能和劣势,进而作出决策。 简略音讯应用场景假如有一个企业,之前从未应用过音讯零碎,当初须要通过一个简略的音讯零碎,将音讯从地位 A 发送到地位 B,但不须要复制音讯。 数据架构师团队在深入研究 Pulsar 和 Kafka 的业务案例后,得出如下论断:在这一应用场景中,Pulsar 和 Kafka 都没有绝对优势。并且,他们认为在短时间内,该应用场景根本不会产生扭转。 对于相似这样的简略音讯应用场景而言,我也同意 Pulsar 和 Kafka 都没有绝对优势。仅从技术角度登程,Pulsar 和 Kafka 这一回合打成平局,那么咱们只能思考老本。二者的经营老本、员工培训老本别离是多少?我打算依据 Kafka 或 Pulsar 的服务提供商的免费规范进行比照。比照开销时,选好服务提供商也能够在肯定水平上缩小经营老本和员工培训老本。Kafka 的云服务提供商,我参考了应用 Kafka API(Azure) 的 Confluent Cloud、MSK(AWS)和 Event Hubs。Pulsar 的云服务提供商,我抉择 StreamNative Cloud。 ...
一、简介 Apache Kafka 是一个分布式的流解决平台(分布式的基于公布/订阅模式的音讯队列【Message Queue】)。 流解决平台有以下3个个性: 能够让你公布和订阅流式的记录。这一方面与音讯队列或者企业音讯零碎相似。能够贮存流式的记录,并且有较好的容错性。能够在流式记录产生时就进行解决。1.1 音讯队列的两种模式1.1.1 点对点模式生产者将音讯发送到queue中,而后消费者从queue中取出并且生产音讯。音讯被生产当前,queue中不再存储,所以消费者不可能生产到曾经被生产的音讯。Queue反对存在多个消费者,然而对一个音讯而言,只能被一个消费者生产。 1.1.2 公布/订阅模式生产者将音讯公布到topic中,同时能够有多个消费者订阅该音讯。和点对点形式不同,公布到topic的音讯会被所有订阅者生产。 1.2 Kafka 适宜什么样的场景它能够用于两大类别的利用: 结构实时流数据管道,它能够在零碎或利用之间牢靠地获取数据。(相当于message queue)。构建实时流式应用程序,对这些流数据进行转换或者影响。(就是流解决,通过kafka stream topic和topic之间外部进行变动)。为了了解Kafka是如何做到以上所说的性能,从上面开始,咱们将深刻摸索Kafka的个性。 首先是一些概念: Kafka作为一个集群,运行在一台或者多台服务器上。Kafka 通过 topic 对存储的流数据进行分类。每条记录中蕴含一个key,一个value和一个timestamp(工夫戳)。1.3 主题和分区Kafka的音讯通过主题(Topic)进行分类,就好比是数据库的表,或者是文件系统里的文件夹。主题能够被分为若干个分区(Partition),一个分区就是一个提交日志。音讯以追加的形式写入分区,而后以先进先出的程序读取。留神,因为一个主题个别蕴含几个分区,因而无奈在整个主题范畴内保障音讯的程序,但能够保障音讯在单个分区内的程序。主题是逻辑上的概念,在物理上,一个主题是横跨多个服务器的。 Kafka 集群保留所有公布的记录(无论他们是否已被生产),并通过一个可配置的参数——保留期限来管制(能够同时配置工夫和音讯大小,以较小的那个为准)。举个例子, 如果保留策略设置为2天,一条记录公布后两天内,能够随时被生产,两天过后这条记录会被摈弃并开释磁盘空间。 有时候咱们须要减少分区的数量,比方为了扩大主题的容量、升高单个分区的吞吐量或者要在单个消费者组内运行更多的消费者(因为一个分区只能由消费者组里的一个消费者读取)。从消费者的角度来看,基于键的主题增加分区是很艰难的,因为分区数量扭转,键到分区的映射也会变动,所以对于基于键的主题来说,倡议在一开始就设置好分区,防止当前对其进行调整。 (留神:不能缩小分区的数量,因为如果删除了分区,分区外面的数据也一并删除了,导致数据不统一。如果肯定要缩小分区的数量,只能删除topic重建) 1.4 生产者和消费者生产者(发布者)创立音讯,个别状况下,一个音讯会被公布到一个特定的主题上。生产者在默认状况下把音讯平衡的散布到主题的所有分区上,而并不关怀特定音讯会被写入哪个分区。不过,生产者也能够把音讯间接写到指定的分区。这通常通过音讯键和分区器来实现,分区器为键生成一个散列值,并将其映射到指定的分区上。生产者也能够自定义分区器,依据不同的业务规定将音讯映射到分区。 消费者(订阅者)读取音讯,消费者能够订阅一个或者多个主题,并依照音讯生成的程序读取它们。消费者通过查看音讯的偏移量来辨别曾经读取过的音讯。偏移量是一种元数据,它是一个一直递增的整数值,在创立音讯时,kafka会把它增加到音讯里。在给定的分区里,每个音讯的偏移量都是惟一的。消费者把每个分区最初读取的音讯偏移量保留在zookeeper或者kafka上,如果消费者敞开或者重启,它的读取状态不会失落。 消费者是消费者组的一部分,也就是说,会有一个或者多个生产独特读取一个主题。消费者组保障每个分区只能被同一个组内的一个消费者应用。如果一个消费者生效,群组里的其余消费者能够接管生效消费者的工作。 1.5 broker和集群broker:一个独立的kafka服务器被称为broker。broker接管来自生产者的音讯,为音讯设置偏移量,并提交音讯到磁盘保留。broker为消费者提供服务,对读取分区的申请作出相应,返回曾经提交到磁盘上的音讯。 集群:交给同一个zookeeper集群来治理的broker节点就组成了kafka的集群。 broker是集群的组成部分,每个集群都有一个broker同时充当集群控制器的角色。控制器负责管理工作,包含将分区调配给broker和监控broker。在broker中,一个分区从属于一个broker,该broker被称为分区的领袖。一个分区能够调配给多个broker(Topic设置了多个正本的时候),这时会产生分区复制。如下图: broker如何解决申请:broker会在它所监听的每个端口上运行一个Acceptor线程,这个线程会创立一个连贯并把它交给Processor线程去解决。Processor线程(也叫网络线程)的数量是可配的,Processor线程负责从客户端获取申请信息,把它们放进申请队列,而后从响应队列获取响应信息,并发送给客户端。如下图所示: 生产申请和获取申请都必须发送给分区的领袖正本(分区Leader)。如果broker收到一个针对特定分区的申请,而该分区的领袖在另外一个broker上,那么发送申请的客户端会收到一个“非分区领袖”的谬误响应。Kafka客户端要本人负责把生产申请和获取申请发送到正确的broker上。 客户端如何晓得该往哪里发送申请呢?客户端应用了另外一种申请类型——元数据申请。这种申请蕴含了客户端感兴趣的主题列表,服务器的响应音讯里指明了这些主题所蕴含的分区、每个分区都有哪些正本,以及哪个正本是领袖。元数据申请能够发给任意一个broker,因为所有的broker都缓存了这些信息。客户端缓存这些元数据,并且会定时从broker申请刷新这些信息。此外如果客户端收到“非领袖”谬误,它会在尝试从新发送申请之前,先刷新元数据。 1.6 Kafka 基础架构 二、Kafka架构深刻2.1 Kafka工作流程及文件存储机制2.1.1 工作流程 Kafka中音讯是以topic进行分类的,生产者生产音讯,消费者生产音讯,都是面向topic的。 Topic是逻辑上的概念,而partition(分区)是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被一直追加到该log文件末端,且每条数据都有本人的offset。消费者组中的每个消费者,都会实时记录本人生产到哪个offset,以便出错复原时,从上次的地位持续生产。 2.1.2 文件存储机制 因为生产者生产的音讯会一直追加到log文件开端,为避免log文件过大导致数据定位效率低下,Kafka采取了分片和索引的机制,将每个partition分为多个segment。(由log.segment.bytes决定,管制每个segment的大小,也可通过log.segment.ms管制,指定多长时间后日志片段会被敞开)每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规定为:topic名称+分区序号。例如:bing这个topic有3个分区,则其对应的文件夹为:bing-0、bing-1和bing-2。 索引文件和日志文件命名规定:每个 LogSegment 都有一个基准偏移量,用来示意以后 LogSegment 中第一条音讯的 offset。偏移量是一个 64位的长整形数,固定是20位数字,长度未达到,用 0 进行填补。如下图所示: index和log文件以以后segment的第一条音讯的offset命名。index文件记录的是数据文件的offset和对应的物理地位,正是有了这个index文件,能力对任一数据写入和查看领有O(1)的复杂度,index文件的粒度能够通过参数log.index.interval.bytes来管制,默认是是每过4096字节记录一条index。下图为index文件和log文件的构造示意图: ...
1. 音讯队列RocketMQ性能测试案例1.1 RocketMQ测试剖析客户场景,信息共享替换平台:1.替换平台需反对每秒万级别数据传输2.实现跨路段、跨部门、跨行业、跨区域信息即时共享,做到高牢靠、低提早 客户现场展现场景设计思路:1.针对性的编写一套JAVA代码来撑持本次MQ性能POC验证。2.抉择适合规格的单台ECS,在单个TOPIC下运行多线程代码实现和MQ的订阅发送,统计1分钟内订阅和发送的数据交换TPS状况。3.思考POC也要合乎客户理论生产场景中MQ应用逻辑,ECS应运行4个独立的JAR包,两对JAR包穿插通过MQ进行数据交换。4.音讯体内的内容应打印到屏幕,通过音讯轨迹验证音讯的被生产状况。5.验证后果:客户指定场景下8线程异步,1分钟TPS在10K以上。 1.2 创立资源本章节详细描述如何创立音讯队列 RocketMQ 版的资源。 1.2.1 创立RocketMQ实例1.登录Apsara Stack控制台。2.在左侧导航栏中单击中间件产品 > 音讯队列拜访治理控制台界面。 3.在音讯队列页面,抉择区域与部门后,单击MQ,进入MQ控制台。 4.单击左侧导航栏概览后,在概览页面中,单击创立实例。5.在创立实例对话框,抉择实例类型,并输出实例名和形容,而后单击确认。 阐明: 创立完实例后,单击左侧导航栏实例详情,能够查看创立实例的Topic数下限、音讯发送TPS下限、音讯订阅TPS下限和TCP协定接入地址等。 1.2.2 创立 TopicTopic 是音讯队列 RocketMQ 版里对音讯的一级归类,例如能够创立 Topic_Trade 这一 Topic 来辨认交易类音讯,音讯生产者将音讯发送到 Topic_Trade,而音讯消费者则通过订阅该 Topic 来获取和生产音讯。创立Topic要留神一下几点: Topic 不能跨实例应用,例如在实例 A 中创立的 Topic A 不能在实例 B 中应用。Topic 名称必须在同一实例中是惟一的。您可创立不同的 Topic 来发送不同类型的音讯,例如用 Topic A 发送一般音讯,Topic B 发送事务音讯,Topic C 发送定时/延时音讯。1.在控制台左侧导航栏,单击 Topic 治理。2.在 Topic 治理页面上方抉择刚创立的实例,单击创立 Topic 按钮。 3.在创立 Topic 对话框中的 Topic 一栏,输出 Topic 名称,抉择该 Topic 对应的音讯类型,输出该 Topic 的备注内容,而后单击确定。 ...
https://my.oschina.net/u/4318... 最近钻研音讯队列,发现好几个框架,收罗一下进行比照,说一下选型阐明: 1)中小型软件公司,倡议选RabbitMQ。一方面,erlang语言天生具备高并发的个性,而且他的治理界面用起来非常不便。不思考rocketmq和kafka的起因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选性能比拟齐备的,所以kafka排除。RocketMQ也很不错,只是没有RabbitMQ进去的早,文档和网上的材料没有RabbitMQ多,但也是很不错,RocketMQ是阿里出品,当初阿里曾经把RocketMQ捐献给Apache了,保护和更新不是问题 。 2)大型软件公司,依据具体应用在rocketMq和kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对rocketMQ,大型软件公司也能够抽出人手对rocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。至于kafka,依据业务场景抉择,如果有日志采集性能,必定是首选kafka了。具体该选哪个,看应用场景 (下面观点都是集体意见,仅供参考) 个性 ActiveMQ RabbitMQ RocketMQ kafka 开发语言 java erlang java scala 单机吞吐量 万级 万级 10万级 10万级 时效性 ms级 us级 ms级 ms级以内 可用性 高(主从架构) 高(主从架构) 十分高(分布式架构) 十分高(分布式架构) 性能个性 成熟的产品,在很多公司失去利用;有较多的文档;各种协定反对较好 基于erlang开发,所以并发能力很强,性能极其好,延时很低;治理界面较丰盛 MQ性能比拟齐备,扩展性佳 只反对次要的MQ性能,像一些音讯查问,音讯回溯等性能没有提供,毕竟是为大数据筹备的,在大数据畛域利用广。 ActiveMQ、Kafka、RocketMQ、RabbitMQ比拟1.ActiveMQ 长处 单机吞吐量:万级 topic数量都吞吐量的影响: 时效性:ms级 可用性:高,基于主从架构实现高可用性 音讯可靠性:有较低的概率失落数据 性能反对:MQ畛域的性能极其齐备 毛病: 官网社区当初对ActiveMQ 5.x保护越来越少,较少在大规模吞吐的场景中应用。 2.Kafka 号称大数据的杀手锏,谈到大数据畛域内的音讯传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据畛域的宠儿,在数据采集、传输、存储的过程中施展着无足轻重的作用。 Apache Kafka它最后由LinkedIn公司基于独特的设计实现为一个分布式的提交日志零碎( a distributed commit log),之后成为Apache我的项目的一部分。 目前曾经被LinkedIn,Uber, Twitter, Netflix等大公司所驳回。 长处 性能卓越,单机写入TPS约在百万条/秒,最大的长处,就是吞吐量高。 时效性:ms级 可用性:十分高,kafka是分布式的,一个数据多个正本,多数机器宕机,不会失落数据,不会导致不可用 消费者采纳Pull形式获取音讯, 音讯有序, 通过管制可能保障所有音讯被生产且仅被生产一次; 有优良的第三方Kafka Web治理界面Kafka-Manager; ...
最近我的项目发现了一个很诡异的景象,纵使删除了会话且革除了历史音讯,一旦卸载重装利用,之前删除的局部音讯又莫名其妙的从新收到且显示了,见鬼啦~????~,在“福尔摩斯·我”的周密排查下(提工单问了融云的技术支持????),假相只有一个。 假相:原来是因为开启了融云的“多设施音讯同步”服务,在卸载重装利用时,触发了该服务中的“音讯弥补”机制,默认会把当天收发过的音讯从新拉取回来。 如果既须要开明“多设施音讯同步”服务,又须要卸载重装利用时保障之前删除的会话和历史音讯不再显示,该如何解决呢? 计划: 删除会话且革除历史音讯向该会话发送一条不存储不计数的自定义音讯,作用是标识该会话曾经被革除卸载重装利用触发“音讯弥补”机制,除了收到之前收发过的音讯,也会收到标识该会话被革除的自定义音讯在接管到该自定义音讯时,对该会话再做一遍革除操作,也就是“删除会话且革除历史音讯”注: “音讯弥补”默认是当天,也能够批改这个工夫,具体能够征询融云 https://www.rongcloud.cn/ 顺便说一下,他们的技术支持服务还是挺到位的,根本都能失去绝对称心的回答,如果感觉问他们比拟麻烦,能够本人先在文档 https://docs.rongcloud.cn/v4/ 外面找找,说不定会有惊喜哟~
每个时代,都不会亏待会学习的人。大家好,我是 yes。 继上一篇 头条终面:写个消息中间件 ,我提到实现消息中间件的一些关键点,明天就和大家一起深刻生产级别消息中间件 - RocketMQ 的内核实现,来看看真正落地能撑持万亿级音讯容量、低提早的音讯队列到底是如何设计的。 这篇文章我会先介绍整体的架构设计,而后再深刻各外围模块的具体设计、外围流程的分析。 还会提及应用的一些留神点和最佳实际。 对于音讯队列的用途和一些概念不太分明的同学强烈建议先看音讯队列面试连环问,这篇文章介绍了音讯队列的应用场景、基本概念和常见面试题。 话不多说,上车。 RocketMQ 整体架构设计整体的架构设计次要分为四大局部,别离是:Producer、Consumer、Broker、NameServer。 为了更贴合理论,我画的都是集群部署,像 Broker 我还画了主从。 Producer:就是音讯生产者,能够集群部署。它会先和 NameServer 集群中的随机一台建设长连贯,得悉以后要发送的 Topic 存在哪台 Broker Master上,而后再与其建设长连贯,反对多种负载平衡模式发送音讯。Consumer:音讯消费者,也能够集群部署。它也会先和 NameServer 集群中的随机一台建设长连贯,得悉以后要音讯的 Topic 存在哪台 Broker Master、Slave上,而后它们建设长连贯,反对集群生产和播送生产音讯。Broker:次要负责音讯的存储、查问生产,反对主从部署,一个 Master 能够对应多个 Slave,Master 反对读写,Slave 只反对读。Broker 会向集群中的每一台 NameServer 注册本人的路由信息。NameServer:是一个很简略的 Topic 路由注册核心,反对 Broker 的动静注册和发现,保留 Topic 和 Borker 之间的关系。通常也是集群部署,然而各 NameServer 之间不会相互通信, 各 NameServer 都有残缺的路由信息,即无状态。我再用一段话来概括它们之间的交互: 先启动 NameServer 集群,各 NameServer 之间无任何数据交互,Broker 启动之后会向所有 NameServer 定期(每 30s)发送心跳包,包含:IP、Port、TopicInfo,NameServer 会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相干信息,代表下线。 ...
我的项目中用到了音讯队列,然而本人还只是大略理解,于是打算零碎的学习一下。学货色肯定要留神领会思维,了解了思维就是学一通百,不留神了解思维,很容易就会吞没在技术的陆地中。MQ与JMSjava是一个喜爱公布规范,让他人实现的语言,这个规范你能够了解为接口,像JDBC就是一组接口,由不同的数据库厂商负责实现,这是一个相当胜利的设计。JMS也是相似于JDBC这样的规范,然而并没有做到像JDBC这么胜利,所有的音讯服务都按JMS来。 已经的我认为MQ和JMS的关系是这样的: 像是接口和实现类一样, 然而实际上: 只有ActiveMQ齐全恪守了JMS协定。RocketMQ、RabbitMQ、KafkaMq并不怎么恪守JMS。 JMS是什么?The Java Message Service (JMS) API is a messaging standard that allows application components based on the Java Platform Enterprise Edition (Java EE) to create, send, receive, and read messages. It enables distributed communication that is loosely coupled, reliable, and asynchronous. java 音讯服务接口是一个基于java平台创立、发送、接管音讯的规范,在分布式系统中经常被用来解耦合,异步发送音讯。 MQ = message queuemessage queue 音讯队列,首先是队列,队列是一种数据结构,个别状况是先进先出。 那咱们个别用音讯队列来做什么呢? 或者音讯队列能够用来做什么呢 ? 能够用来做中间件那什么是中间件? 查了一些材料,发现并没有严格的定义,目前较为承受的定义是将具体业务和底层逻辑解耦的组件。咱们权且能够将其了解为中介,咱们想要租房的话,通常不会本人去找房东,通常也比拟难找,也比拟麻烦。如果你真的不想让中介占到一点便宜,跟某块地区的房东曾经建设了策略单干关系,于是你每换一个中央就要本人再找一下房东,你于这个地区就产生了重大的耦合,房东也与你产生了耦合。 通常状况下,房东会将本人的房子委托给中介或者托管在某个平台,租房子的人通过中介或者托管平台来看房子。这个中介和被托管的平台咱们就能够了解为中间件。无效的实现了房东和访客解耦,房东和租房子的人通过中介或托管平台交换。 所以你能够看到Redis也能够算进中间件,Redis也能够做音讯队列。 理论的利用场景中,MQ被当做中间件时往往用于两个零碎间进行通信, A零碎将B零碎须要的音讯放入音讯队列中,B零碎从音讯队列中获取。 有人可能会问了,为什么B零碎不间接调用A零碎的接口呢? 要放一个音讯队列呢? 其实也不是不行,那这样就耦合在一起了,也就是说A跟着动,B也要跟着动,软件行业广泛讨厌改变。如果是在分布式系统中,咱们就更心愿调用关系清晰一点了,两个零碎在某种意义上像是两个国家,两个国家之间的交换是通过外交部,而两个零碎之间的交换就是通过音讯队列。 ...
最近这不是赶上了金九银十了啊,而后公司也面试了几个,有幸,我跟着老大也经验了几个面试场景,其中一个让我真的是印象粗浅 他说他在原公司是资深工程师级别,就相当于技术骨干,而后公司我的项目也不错,就在我和老大都感觉不错的时候,我多问了一句,你们的消息中间件是怎么选型的,这大哥给我回了一句:boss决定用这个的扭头看着老大逐步石化的面庞,我就又问了一下他们用的中间件RocketMQ相干内容。。。。剩下的我就不说了(大哥,你面试的时候长点心啊),幸好咱们公司有个我的项目用到的技术跟架构他答复的不错,不然,哎。。。 上面,技术上,咱们来看一下,他们过后为什么会选用RocketMQ的,其实这个技术真的还不错 RocketMQ 介绍Apache RocketMQ 是一款 低提早、高并发、高可用、高牢靠的分布式消息中间件。音讯队列 RocketMQ 可为分布式应用零碎提供异步解耦和削峰填谷的能力,同时也具备互联网利用所需的海量音讯沉积、高吞吐、牢靠重试等个性。 RocketMQ 概念Topic:音讯主题,用于将一类的音讯进行归类,比方订单主题,就是所有订单相干的音讯都能够由这个主题去承载,生产者向这个主题发送音讯。生产者:负责生产音讯并发送音讯到 Topic 的角色。消费者:负责从 Topic 接管并生产音讯 的角色。音讯:生产者向 Topic 发送的内容,会被消费者生产。音讯属性:生产者发送的时候能够为音讯自定义一些业务相干的属性,比方 Message Key 和 Tag 等。Group:一类生产者或消费者,这类生产者或消费者通常生产或生产同一类音讯,且音讯公布或订阅的逻辑统一。为什么要应用 RocketMQ?异步解耦随着微服务架构的风行,服务之间的关系梳理十分重要。异步解耦能够升高服务之间的耦合水平,同时也能进步服务的吞吐量。 应用异步解耦的业务场景十分多,因为每个行业的业务都会不太一样,以一些比拟通用的业务来阐明置信大家都能了解。 比方电商行业的下单业务场景,以最简略的下单流程来说,下单流程如下: 锁库存创立订单用户领取扣减库存给用户发送购买短信告诉给用户减少积分告诉商家发货咱们以下单胜利后,用户进行领取,领取实现会有个逻辑叫领取回调,在回调外面须要去做一些业务逻辑。首先来看下同步解决须要破费的工夫,如下图: 下面的下单流程从 3 到 5 都是能够采纳异步流程进行解决,对于用户来说,领取实现后他就不须要关注前面的流程了。后盾缓缓解决就行了,这样就能简化三个步骤,进步回调的解决工夫。 削峰填谷削峰填谷指的是在大流量的冲击下,利用 RocketMQ 能够抗住刹时的大流量,爱护零碎的稳定性,晋升用户体验。 在电商行业,最常见的流量冲击就是秒杀流动了,利用 RocketMQ 来实现一个残缺的秒杀业务还是与很多须要做的工作,不在本文的范畴内,前面有机会能够独自跟大家聊聊。想通知大家的是像诸如此类的场景能够利用 RocketMQ 来扛住高并发,前提是业务场景反对异步解决。 分布式事务最终一致性家喻户晓,分布式事务有 2PC,TCC,最终一致性等计划。其中应用音讯队列来做最终一致性计划是比拟罕用的。 在电商的业务场景中,交易相干的外围业务肯定要确保数据的一致性。通过引入音讯队列 RocketMQ 版的分布式事务,既能够实现零碎之间的解耦,又能够保障最终的数据一致性。 数据散发数据散发指的是能够将原始数据散发到多个须要应用这份数据的零碎中,实现数据异构的需要。最常见的有将数据散发到 ES, Redis 中为业务提供搜寻,缓存等服务。 除了手动通过音讯机制进行数据散发,还能够订阅 Mysql 的 binlog 来散发,在散发这个场景,须要应用 RocketMQ 的程序音讯来保证数据的一致性。 RocketMQ 架构 图片起源阿里云官网文档 Name Server:是一个简直无状态节点,可集群部署,在音讯队列 RocketMQ 版中提供命名服务,更新和发现 Broker 服务。就是一个注册核心。Broker:音讯直达角色,负责存储音讯,转发音讯。分为 Master Broker 和 Slave Broker,一个 Master Broker 能够对应多个 Slave Broker,然而一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后须要实现一次将本人注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。生产者:与 Name Server 集群中的其中一个节点(随机)建设长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建设长链接,且定时向 Master Broker 发送心跳。消费者:与 Name Server 集群中的其中一个节点(随机)建设长连贯,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建设长连贯,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既能够从 Master Broker 订阅音讯,也能够从 Slave Broker 订阅音讯,订阅规定由 Broker 配置决定。RocketMQ 音讯类型RocketMQ 反对丰盛的音讯类型,能够满足多场景的业务需要。不同的音讯有不同的利用场景,上面为大家介绍罕用的四种音讯类型。 ...
消息中间件的背景介绍消息中间件可能解决的问题异步 很多业务场景咱们须要把同步的工作变成异步的工作。 拿一个电商平台的注册性能来简略剖析下,用 户注册这一个服务,不单单只是insert一条数据到数据库外面就完事了,还须要发送激活邮件、发送新人红包或者积分、发送营销短信等一系列操作。如果说这外面的每一个操作,都须要耗费1s,那么整个注册过程就须要耗时4s能力响应给用户。 那么咱们须要把这些操作拆出来,优化变成异步解决的逻辑。 咱们能够应用阻塞队列+线程池来实现生产者消费者模式。 然而这种形式只实用于单机,一旦机器宕机,那么原来在阻塞队列中存储的数据内容就失落了。应用消息中间件来解决削峰 用户提交过去的申请,先写入到音讯队列。音讯队列是有长度的,如果音讯队列长度超过指定长度, 间接摈弃。这样就把流量的峰值削掉了。 限流 秒杀的具体外围解决业务,接管音讯队列中音讯进行解决,这里的音讯解决能力取决于生产端自身的 吞吐量 解耦 不同程序语言之间能够通过音讯队列来达到通信。 音讯长久化 可能不必放心应用程序挂了而无奈生产音讯 当然,消息中间件还有更多利用场景,比方在弱一致性事务模型中,能够采纳分布式音讯队列的实现最 大能力告诉形式来实现数据的最终一致性等等 思考消息中间件的设计能够先从根本的需要开始思考最根本反对音讯的收发 网络通信就会思考NIO音讯的存储 长久化,非长久化音讯的序列化,反序列化是否跨语言音讯的确认机制 如何防止音讯的重发高级性能音讯的有序性是否反对事物音讯音讯收发的性能,对高并发大数据的反对是否反对集群音讯的可靠性传输是否反对多协定消息中间件的倒退过程 实际上消息中间件的倒退也是挺有意思的,咱们晓得任何一个技术的呈现都是为了解决理论问题,这个 问题是 通过一种通用的软件总线也就是一种通信零碎,解决应用程序之间沉重的信息通信工作。 最早的小白鼠就是金融交易畛域,因为在过后这个畛域中,交易员须要通过不同的终端实现交易,每台终端显示不同的信息。 如果接入音讯总线,那么交易员只须要在一台终端上操作,而后订阅其余终端感兴趣 的音讯。于是就诞生了公布订阅模型(pubsub),同时诞生了世界上第一个古代音讯队列软件(TIB) The information Bus, TIB容许开发者建设一系列规定去形容音讯内容,只有音讯依照这些规定公布出 去,任何消费者利用都能订阅感兴趣的音讯。 随着TIB带来的苦头被广泛应用在各大畛域,IBM也开始研 究开发本人的消息中间件,3年后IBM的音讯队列IBM MQ产品系列公布,之后的一段时间MQ系列进化 成了WebSphere MQ统治商业音讯队列平台市场。 包含前期微软也研发了本人的音讯队列(MSMQ) 各大厂商纷纷钻研本人的MQ,然而他们是以商业化模式经营本人的MQ软件,商业MQ想要解决的是利用互通的问题,而不是创立标准接口来容许不同MQ产品互通。 不同消息中间件切换的问题 所以有些大型的金融公司可能会应用来 自多个供应商的MQ产品,来服务企业外部不同的利用。那么问题来了,如果利用曾经订阅了TIB MQ的 音讯而后忽然须要生产IBM MQ的音讯,那么整个实现过程会很麻烦。 JMS规范 为了解决这个问题,在2001年诞 生了 Java Message Service(JMS),JMS通过提供公共的Java API形式,暗藏独自MQ产品供应商的实现 接口,从而逾越了不同MQ生产和解决互通问题。从技术层面来说,Java应用程序只须要针对JMS API编 程,抉择适合的MQ驱动即可。JMS会解决其余局部。这种计划实际上是通过独自标准化接口来整合很 多不同的接口,成果还是不错的,然而碰到了互用性的问题。 ...
Kafka 是以后十分风行的一个音讯零碎,最后用作 LinkedIn 的流动流式数据和经营数据处理管道的根底。流动流式数据次要包含页面访问量PV、拜访内容以及检索内容等。经营数据指的是服务器的性能数据(CPU、IO 使用率、申请工夫、服务日志等等数据)。这些数据通常的解决形式是先把各种流动以日志的模式写入某种文件,而后周期性地对这些文件进行统计分析。 近年来,随着互联网的疾速倒退,流动和经营数据处理曾经成为网站软件产品个性中一个至关重要的组成部分,须要一套宏大的基础设施对其提供反对。 什么是 Kafka? Kafka 是一个分布式、反对分区的、多正本的,基于 Zookeeper 协调的分布式音讯零碎,现已募捐给 Apache 基金会。它最大的个性就是能够实时处理大量数据并且反对动静程度扩大,这样的个性能够满足各种需要场景:比方基于 Hadoop 的批处理零碎、低提早的实时零碎、Storm/Spark 流式解决引擎,日志解决零碎,音讯服务等等。 Kafka 零碎架构 Kafka 架构如上图所示,实质就是生产-存储-生产,次要蕴含以下四个局部: Producer Cluster:生产者集群,负责公布音讯到 Kafka broker,个别由多个利用组成。Kafka Cluster:Kafka 服务器集群。这里就是 Kafka 最重要的一部分,这里负责接管生产者写入的数据,并将其长久化到文件存储里,最终将音讯提供给 Consumer Cluster。Zookeeper Cluster:Zookeeper 集群。Zookeeper 负责保护整个 Kafka 集群的 Topic 信息、Kafka Controller 等信息。_Consumer Cluster_:消费者集群,负责从 Kafka broker 读取音讯,个别也由多个利用组成,获取本人想要的信息。Kafka 相干概念解释 Broker Kafka 集群蕴含一个或多个服务器,这种服务器被称为 broker;Topic 每条公布到 Kafka 集群的音讯都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的音讯离开存储,逻辑上一个 Topic 的音讯尽管保留于一个或多个 broker 上但用户只需指定音讯的 Topic 即可生产或生产数据而不用关怀数据存于何处);Partition 是物理上的概念,每个 Topic 蕴含一个或多个 Partition;Producer 负责公布音讯到 Kafka broker;Consumer 音讯消费者,向 Kafka broker 读取音讯的客户端;Consumer Group 每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。在 Kafka 架构设计里,无论是生产者、还是消费者、还是音讯存储,都能够动静程度扩大,从而进步整个集群的吞吐量、可扩展性、持久性和容错性,Kafka生来就是一个分布式系统,这赋予了它以下个性: ...
大家好,我是 yes。 最近我始终扎在音讯队列实现细节之中无法自拔,曾经写了 3 篇Kafka源码剖析,还剩很多没肝完。之前还存着RocketMQ源码剖析还没整顿。今儿临时先跳进去盘一盘大方向上的音讯队列有哪些外围留神点。 外围点有很多,为了更贴合理论场景,我从常见的面试问题动手: 如何保障音讯不失落?如果解决反复音讯?如何保障音讯的有序性?如果解决音讯沉积?当然在分析这几个问题之前须要简略的介绍下什么是音讯队列,音讯队列常见的一些根本术语和概念。 接下来进入注释。 什么是音讯队列来看看维基百科怎么说的,顺带学学英语这波不亏: In computer science, message queues and mailboxes are software-engineering components typically used for inter-process communication (IPC), or for inter-thread communication within the same process. They use a queue for messaging – the passing of control or of content. Group communication systems provide similar kinds of functionality.翻译一下:在计算机科学畛域,音讯队列和邮箱都是软件工程组件,通常用于过程间或同一过程内的线程通信。它们通过队列来传递音讯-传递管制信息或内容,群组通信零碎提供相似的性能。 简略的概括下下面的定义:音讯队列就是一个应用队列来通信的组件。 下面的定义没有错,但就当初而言咱们日常所说的音讯队列经常指代的是消息中间件,它的存在不仅仅只是为了通信这个问题。 为什么须要音讯队列从实质上来说是因为互联网的疾速倒退,业务一直扩张,促使技术架构须要一直的演进。 从以前的单体架构到当初的微服务架构,成千盈百的服务之间互相调用和依赖。从互联网初期一个服务器上有 100 个在线用户曾经很了不得,到当初坐拥10亿日活的微信。咱们须要有一个「货色」来解耦服务之间的关系、管制资源正当合时的应用以及缓冲流量洪峰等等。 音讯队列就应运而生了。它罕用来实现:异步解决、服务解耦、流量管制。 异步解决随着公司的倒退你可能会发现你我的项目的申请链路越来越长,例如刚开始的电商我的项目,能够就是粗犷的扣库存、下单。缓缓地又加上积分服务、短信服务等。这一路同步调用下来客户可能等急了,这时候就是音讯队列退场的好时机。 调用链路长、响应就慢了,并且绝对于扣库存和下单,积分和短信没必要这么的 "及时"。因而只须要在下单完结那个流程,扔个音讯到音讯队列中就能够间接返回响应了。而且积分服务和短信服务能够并行的生产这条音讯。 能够看出音讯队列能够缩小申请的期待,还能让服务异步并发解决,晋升零碎总体性能。 ...
大家好,我是 yes。 最近我始终扎在音讯队列实现细节之中无法自拔,曾经写了 3 篇Kafka源码剖析,还剩很多没肝完。之前还存着RocketMQ源码剖析还没整顿。今儿临时先跳进去盘一盘大方向上的音讯队列有哪些外围留神点。 外围点有很多,为了更贴合理论场景,我从常见的面试问题动手: 如何保障音讯不失落?如果解决反复音讯?如何保障音讯的有序性?如果解决音讯沉积?当然在分析这几个问题之前须要简略的介绍下什么是音讯队列,音讯队列常见的一些根本术语和概念。 接下来进入注释。 什么是音讯队列来看看维基百科怎么说的,顺带学学英语这波不亏: In computer science, message queues and mailboxes are software-engineering components typically used for inter-process communication (IPC), or for inter-thread communication within the same process. They use a queue for messaging – the passing of control or of content. Group communication systems provide similar kinds of functionality.翻译一下:在计算机科学畛域,音讯队列和邮箱都是软件工程组件,通常用于过程间或同一过程内的线程通信。它们通过队列来传递音讯-传递管制信息或内容,群组通信零碎提供相似的性能。 简略的概括下下面的定义:音讯队列就是一个应用队列来通信的组件。 下面的定义没有错,但就当初而言咱们日常所说的音讯队列经常指代的是消息中间件,它的存在不仅仅只是为了通信这个问题。 为什么须要音讯队列从实质上来说是因为互联网的疾速倒退,业务一直扩张,促使技术架构须要一直的演进。 从以前的单体架构到当初的微服务架构,成千盈百的服务之间互相调用和依赖。从互联网初期一个服务器上有 100 个在线用户曾经很了不得,到当初坐拥10亿日活的微信。咱们须要有一个「货色」来解耦服务之间的关系、管制资源正当合时的应用以及缓冲流量洪峰等等。 音讯队列就应运而生了。它罕用来实现:异步解决、服务解耦、流量管制。 异步解决随着公司的倒退你可能会发现你我的项目的申请链路越来越长,例如刚开始的电商我的项目,能够就是粗犷的扣库存、下单。缓缓地又加上积分服务、短信服务等。这一路同步调用下来客户可能等急了,这时候就是音讯队列退场的好时机。 调用链路长、响应就慢了,并且绝对于扣库存和下单,积分和短信没必要这么的 "及时"。因而只须要在下单完结那个流程,扔个音讯到音讯队列中就能够间接返回响应了。而且积分服务和短信服务能够并行的生产这条音讯。 能够看出音讯队列能够缩小申请的期待,还能让服务异步并发解决,晋升零碎总体性能。 ...
对于 MQ 的定义Message Queue(MQ)音讯队列中间件,通常咱们在网上看到的对其定义是将音讯的发送和承受拆散来实现应用程序的异步和解耦,给人的直觉是 MQ 是异步的,用来解耦的。但这个只是 MQ 的成果,而不是目标。MQ 真正的目标是为了通信,屏蔽底层简单的通信协定,定义了一套应用层上更加简略的通信协定。 一套分布式系统中两个模块之间通信要么是 HTTP,要么是 TCP,但这两种协定其实都是原始的协定。前者实现通信就必须要做到各客户端都有 WebServer,而且不反对长连贯;后者就更加原始了 — 粘包、心跳、公有的协定。 而 MQ 所要做就是在基于这些现有的协定之上构建一个更简略的通信(生产者/消费者)模型。它定义了两个对象 —发送数据的叫生产者,承受数据的叫消费者,提供一个 SDK 给咱们本人定义生产者和消费者实现音讯通信,且忽视底层通信协定。 带 Broker 的流派这个流派通常有一台服务器作为 Broker,所有的音讯都通过它进行直达。生产者把音讯发送给它就完结本人的工作了,最初 Broker 则把音讯被动推送给消费者(或者消费者被动轮询)。 重 Topic 的 MQKafka、Active MQ 就属于这个流派:生产者发送 key 和数据到 Broker,由 Broker 比拟 key 之后决定给哪个消费者。 .png) 在这种模式下,Topic(主题音讯) 往往是一个比拟大的概念,甚至一个零碎中就可能只有一个 Topic。 尽管这两种音讯队列的架构一样,然而 Kafka 的性能要比 Active MQ 的性能不晓得高到多少倍,所以根本这种类型的 MQ 只有 Kafka一种备选计划。 轻 Topic 的 MQ这种的代表是 RabbitMQ(AMQP)。生产者发送 key 和数据,Borker 收到数据之后会依据 key 通过肯定的逻辑计算出相应的队列,最初消费者订阅队列。 .png) 在这种架构中 Queue 是十分轻量级的(在 RabbitMQ 中它的下限取决于你的内存),消费者关怀的只是本人的 Queue;生产者不用关怀数据最终给谁,只有指定 key 就行了。两头的那层映射在 AMQP 中叫 exchange(交换机)。 ...
Spring整合RabbitMQ如果我是使用者,我应该只需要关注发送信息和接收信息。基于此可以使用Spring框架来继承RabbitMQ,从而简化RabbitMQ的操作。1.在Maven工程中添加依赖 <!-- 添加Spring整合Rabbit依赖的包 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.2.RELEASE</version> </dependency>2.修改Spring的配置文件 <?xml version="1.0" encoding="UTF-8" ?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <bean id="fooMessageListener" class="cn.itding.mq.rabbitmq.spring.FooMessageListener"/> <!-- 配置连接 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest" virtual-host="/" requested-heartbeat="60"/> <!-- 配置RabbitTemplate --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange" routing-key="foo.bar"/> <!-- 配置RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 配置队列名称 --> <rabbit:queue name="myQueue"/> <!-- 配置Topic类型的交换器 --> <rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding pattern="foo.*" queue="myQueue"/> </rabbit:bindings> </rabbit:topic-exchange> <!-- 配置监听器 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="fooMessageListener" queue-names="myQueue" /> </rabbit:listener-container></beans>MessageListenerContainer:用来监听容器,为消息入队提供异步处理。RabbitTemplate:用来发送和接收消息。RabbitAdmin:用来声明队列、交换器、绑定。发送消息public class SendMessage { public static void main(String[] args) { ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); RabbitTemplate template = classPathXmlApplicationContext.getBean(RabbitTemplate.class); template.convertAndSend("Hello World"); classPathXmlApplicationContext.close(); }}接收消息public class FooMessageListener implements MessageListener { @Override public void onMessage(Message message) { String messageBody = new String(message.getBody()); System.out.println("接收到消息:" + messageBody); }}异步处理的案例某天产品人员说“系统要增加一个用户注册功能,注册成功后用户能收到邮件通知”,开发人员觉得这个不难,于是写了一个注册页面,点击提交按钮后保存用户的注册信息,然后发送邮件,最后返回用户注册成功。过了一段时间,产品人员说“点击注册后响应太慢,能不能优化一下”。开发人员首先想到的是利用多线程,将保存注册和邮件发送并行执行。又没过多久,产品人员说“现在注册响应是快了,但是用户反馈没有收到注册成功个的邮件通知,能不能在发送邮件的时候先保存所发送的邮件内容,如果邮件发送失败则进行补发”。如果有专门提供邮件发送的部门,开发人员直接使用他们提供的服务岂不是更好,而这个部门真是使用了消息的异步处理来完成邮件的可靠发送。下面代码实现:1.添加Maven依赖 ...
1.RabbitMQ特点 保证可靠性。如持久化、传输确认、发布确认等。路由灵活。通过Exchange交换器来路由消息。支持集群。具有高可用性。支持多种协议,除了AMQP,还支持STOMP和MQTT等。支持多语言。提供管理界面。提供跟踪机制。提供插件机制,多方面扩展。2.RabbitMQ基本概念 Message(消息):由消息头和消息体组成。消息体是不透明的,但是消息头则由一系列可选属性组成,包括routing-key(路右键)、priority(优先级)、delivery-mode(消息持久化)等。Publisher(消息生产者):向交换器发布消息的客户端应用程序。Exchange(交换器):接收生产者发送来的消息,并将消息路由到服务器中的队列上。Binding(绑定):消息队列和交换器之间的关联。Queue(消息队列):保存消息,直到发送给消费者。Connection(网络连接):一个TCP连接。Channel(信道):一个双向数据流通道,建立在真实的TCP连接内的虚拟连接。Consumer(消息消费者):从消息队列取走消息的客户端应用程序。Virtual Host(虚拟主机):一批交换器、消息队列等相关的对象。默认vhost是“/”。Broker:消息队列服务器实体。(1)AMQP中的消息路由AMQP中增加了Exchange和Binding的角色。生产者需要把消息发布到Exchange上,然后通过Binding,将消息发送到指定的队列。(2)交换器类型 Direct交换器如果消息的路由键(RoutingKey)和绑定键(BindingKey)完全匹配,就会把消息发送到对应的队列上。Fanout交换器不处理路右键,只是简单的将队列绑定到交换器,当有消息发送到这个交换器时,就会将消息发送到和它绑定的队列上。Topic交换器将路由键和某种模式进行匹配,此时的BindingKey是一种模式。路由键和绑定键中的单词与单词之间是用“.”分割的。其中模式中的“#”表示匹配0到多个单词,“*”表示只匹配一个单词。3.用Java访问RabbitMQ 1.在Maven工程中添加依赖。 <!-- RabbitMQ依赖 --><dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version></dependency>2.消息生产者 public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); // 设置RabbitMQ地址 factory.setHost("localhost"); factory.setVirtualHost("/"); // 建立到代理服务器的连接 Connection conn = factory.newConnection(); // 创建信道 Channel channel = conn.createChannel(); // 声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "testRoutingKey"; // 发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); // 关闭信道和连接 channel.close(); conn.close(); }}3.消息消费者 ...