关于mq:阿里二面MQ-消息积压了增加消费者有用吗

大家好,我是不才陈某~ 明天分享一道有意思的面试题。 面试官:RocketMQ 音讯积压了,减少消费者有用吗? 我:这个要看具体的场景,不同的场景下状况是不一样的。 面试官:能够具体说一下吗? 我:如果消费者的数量小于 MessageQueue 的数量,减少消费者能够放慢音讯生产速度,缩小音讯积压。比方一个 Topic 有 4 个 MessageQueue,2 个消费者进行生产,如果减少一个消费者,明细能够放慢拉取音讯的频率。如下图: 关注公众号:码猿技术专栏,回复关键词:1111 获取阿里外部Java性能调优手册! 如果消费者的数量大于等于 MessageQueue 的数量,减少消费者是没有用的。比方一个 Topic 有 4 个 MessageQueue,并且有 4 个消费者进行生产。如下图 面试官:你说的第一种状况,减少消费者肯定能放慢音讯生产的速度吗? 我:这...,个别状况下是能够的。 面试官:有非凡的状况吗? 我:当然有。消费者音讯拉取的速度也取决于本地音讯的生产速度,如果本地音讯生产的慢,就会提早一段时间后再去拉取。 面试官:在什么状况下消费者会提早一段时间后后再去拉取呢? 我:消费者拉取的音讯存在 ProcessQueue,消费者是有流量管制的,如果呈现上面三种状况,就不会被动去拉取: ProcessQueue 保留的音讯数量超过阈值(默认 1000,能够配置);ProcessQueue 保留的音讯大小超过阈值(默认 100M,能够配置);对于非程序生产的场景,ProcessQueue 中保留的最初一条和第一条音讯偏移量之差超过阈值(默认 2000,能够配置)。这部分源码请参考类:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl。面试官:还有其余状况吗? 我:对于程序生产的场景,ProcessQueue 加锁失败,也会提早拉取,这个延迟时间是 3s。 面试官:消费者提早拉取音讯,个别可能是什么起因导致的呢? 我:其实提早拉取的实质就是消费者生产慢,导致下次去拉取的时候 ProcessQueue 中积压的音讯超过阈值。以上面这张架构图为例: 消费者生产慢,可是能上面的起因: 消费者解决的业务逻辑简单,耗时很长;消费者有慢查问,或者数据库负载高导致响应慢;缓存等中间件响应慢,比方 Redis 响应慢;调用内部服务接口响应慢。面试官:对于内部接口响应慢的状况,有什么应答措施吗? 我:这个要分状况探讨。 如果调用内部零碎只是一个告诉,或者调用内部接口的后果并不解决,能够采纳异步的形式,异步逻辑里采纳重试的形式保障接口调胜利。 如果内部接口返回后果必须要解决,能够思考接口返回的后果是否能够缓存默认值(要思考业务可行),在调用失败后采纳疾速降级的形式,应用默认值代替返回接口返回值。 如果这个接口返回后果必须要解决,并且不能缓存,能够把拉取到的音讯存入本地而后给 Broker 间接返回 CONSUME_SUCCESS。等内部零碎恢复正常后再从本地取出来进行解决。 面试官:如果消费者数小于 MessageQueue 数量,并且内部零碎响应失常,为了疾速生产积压音讯而减少消费者,有什么须要思考的吗? 我:内部零碎尽管响应失常,然而减少多个消费者后,内部零碎的接口调用量会突增,如果达到吞吐量下限,内部零碎会响应变慢,甚至被打挂。 同时也要思考本地数据库、缓存的压力,如果数据库响应变慢,解决音讯的速度就会变慢,起不到缓解音讯积压的作用。 面试官:新减少了消费者后,怎么给它调配 MessageQueue 呢? ...

June 13, 2023 · 2 min · jiezi

关于mq:消息中间件基础知识

by zhimaxingzhe from 消息中间件基础知识 欢送分享链接,转载请注明出处,尊重版权,若急用请分割受权。 https://zhimaxingzhe.github.io前言本文梳理笔者的MQ常识,从消息中间件的基础知识讲起,在有了基础知识后,对市面上各支流的消息中间件进行具体的解析,包含 RabbitMQ、RocketMQ、Kafka、Pulsar,最初再横向比照这几款支流的消息中间件。 介绍MQ的文章网上千千万,最好的学习路径还是官网文档,文中介绍的这几款MQ都在致力推广本人,所以文档在权威性、全面性、专业性、时效性都是无人能及其左右,当初的官网文档甚至本人做竞品比对,比方RocketMQ就本人放了比对表格在首页。所以要学好哪一款MQ,就去看它的官网吧,地址放在文末参考资料中了。 最好的学习办法是带着问题去寻找答案,以费曼学习法为规范,产出可教学的材料,所以本文多是集体的所学梳理和所想记录,集体只是无限,不免有所疏漏,文中有谬误和疏漏请不吝赐教,感激!若有帮忙,请一键三连吧。文中许多图片内容是随笔摘抄,若有触犯,侵删。 消息中间件的倒退曾经有近40年历史,早在上个世纪80年代就诞生了第一款音讯队列 The Information Bus。 到90年代 IBM、Oracle、Microsoft 纷纷推出自家的MQ,但都是免费且闭源的产品,次要面向高端的企业用户,这些MQ个别都采纳高端硬件,软硬件一体机交付,须要洽购专门的保护服务,MQ自身的架构是单机的架构,用户的自主性较差。 进入新世纪后,随着技术成熟,人们开始探讨MQ的协定,诞生了JMS、AMPQ 两大协定规范,随之别离有 ActiveMQ、RabbitMQ的具体实现,并且是开源共建的,这使得这两款MQ在过后迅速风行开来,MQ的应用门槛也随之升高,越来越多零碎融入了MQ作为根底能力。 再起初PC互联网、挪动互联网的爆发式倒退,因为传统的音讯队列无奈接受亿级用户的拜访流量和海量数据传输,诞生了互联网消息中间件,外围能力是全面采纳分布式架构、具备很强的横向扩大能力,开源典型代表有 Kafka、RocketMQ、Pulsar。Kafka 的诞生还将消息中间件从Messaging畛域延长到了 Streaming 畛域,从分布式应用的异步解耦场景延长到大数据畛域的流存储和流计算场景。Pulsar 更是在 Kafka 之后集大家之成,在企业级利用上做得更好,存储和计算拆散的设计使得拓展更加轻松。 现在,IoT、云计算、云原生引领了新的技术趋势。面向IoT的场景,音讯队列开始从云内服务端利用通信,延长到边缘机房和物联网终端设备,反对MQTT等物联网标准协议也成了各大音讯队列的标配,咱们看到Pulsar、Kafka、RocketMQ 都在致力追随时代步调,拓展本人在各种应用场景下的能力。 1、消息中间件的定义在早些年MQ始终被叫做音讯队列,就能够定义为传递音讯的容器,随着时代的倒退,MQ 都在致力拓展进去越来越多的性能,越来越多需要加在MQ纸上,消息中间件的能力越来越强,利用的场景也越来越多,如果非要用一个定义来概括只能是形象进去一些概念,概括为跨服务之间传递信息的软件。 2、用处异步解决能够把接口申请依据业务的时效性水平,将不紧急的解决逻辑生成音讯、事件放到MQ当中,再由专门的零碎解决该音讯、事件;如日志上报、归档事件、数据推送、数据分析、触发策略、变更举荐、增加积分、发送告诉音讯等。 削峰填谷作为零碎外部的一个音讯池,抵制洪峰,对后端服务起到爱护作用。流量洪峰进来的时候,会转换为音讯落到MQ当中,后端服务能够依据本人的解决能力来,流量不会间接冲击到后端服务,特地是落库、IO等操作。 服务解耦缩小零碎、模块之间间接对接带来的耦合,交互对立按MQ中音讯的协定,按需生产和生产,耦合水平大大降低。 公布订阅零碎产生的行为不须要通过接口等形式来告诉到相干服务,只须要公布一次音讯,订阅者都能生产到音讯,执行服务本身的本职工作。 当然,所有收益都是有代价的,对于零碎架构自身来说,会引入新组件,带来零碎复杂度的晋升,整体零碎的可靠性也会是挑战,减少消息中间件的运维老本,还会带来整体零碎一致性的问题。所以须要衡量本身零碎是否有必要引入MQ,能解决什么痛点,投入产出是否让组织称心,对于自身流量不大的零碎来说,放弃简略架构是大快人心的事件,毕竟,越简略越稳固,越耐用。 3、音讯模型队列模型 一种是音讯队列,生产者往队列写音讯,消费者从这个队列生产音讯,当然生产者能够是多个,消费者也能够是多个,然而一条音讯只能被生产一次,具体怎么做的,这就波及到具体的应用需要和每一款消息中间件的实现了,前面第二局部的时候会波及到。这是最早的音讯模型,这也是为什么音讯队列MQ这个名字也始终有人在用吧。 订阅模型 起初上个世纪80年代有人提出公布订阅模式,就是topic模式,生产者公布的音讯,消息中间件会把音讯投递给每一个订阅者,这个投递的过程有可能是推也可能是拉,反对哪一种也要看每一款的具体实现。 4、音讯协定从音讯的生命周期来看,音讯的生产、存储、生产的整个过程来标准步骤要达到的规范。比方金融级别的音讯协定会要求保障音讯生产过程中不丢、不反复,存储过程中也能有持久性、一致性的要求,在生产过程中保障音讯正确被生产,如不反复、不错位等。 常见的音讯协定: 接下来举例 AMPQ 协定的生产、生产过程规范。 AMQP 协定高级音讯队列协定(Advanced Message Queuing Protocol),一个提供对立音讯服务的应用层规范高级音讯队列协定,是应用层协定的一个凋谢规范,为面向音讯的中间件设计。基于此协定的客户端与消息中间件可传递音讯,并不受客户端/中间件不同产品,不同的开发语言等条件的限度。 生产音讯生产音讯 MQTT 协定MQTT(音讯队列遥测传输)是ISO 规范(ISO/IEC PRF 20922)下基于公布/订阅范式的音讯协定。它工作在 TCP/IP协定族上,是为硬件性能低下的近程设施以及网络情况蹩脚的状况下而设计的公布/订阅型音讯协定。 MQTT协定是轻量、简略、凋谢和易于实现的,这些特点使它适用范围十分宽泛。在很多状况下,包含受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶然拨号的医疗设施、智能家居、及一些小型化设施中已宽泛应用。 其余协定另外还有STOMP、OpenMessaging等,这里不做开展。以后市面上支流的消息中间件多是有自定义的协定倒退起来的,如Kafka 在最开始并不算是一个消息中间件,而是用于日志记录零碎的一部分,所以并不是基于某种中间件音讯协定来做的,而是基于TCP/IP,依据自定义的音讯格局,来传递日志音讯,为满足对于音讯失落是有肯定容忍度的;在起初逐渐倒退到能够反对正好一次(Exactly Once)语义,实际上是通过 At Least Once + 幂等性 = Exactly Once 。 ...

February 1, 2023 · 3 min · jiezi

关于mq:关于mq重复消息

producer反复发送, RocketMQ的broker是不论的, Kafka开启幂等后, 每个音讯都会带一个"过程pid+topic+自增序列号"的惟一标识, 重试的音讯的惟一标识是雷同的.consumer反复生产, 能够用bitmap来做去重.前提是mq必须有自定义路由, 让同一个音讯发到同一个队列, 反复的音讯会在同一个队列. 一个consumer可能对应多个队列(或分区),那么就须要用一个map或list来别离保留各个分区的bitmap,因为bitmap是数组下标来判断是否曾经生产, 所以音讯的惟一id要是数字, 如果是字符+数字, 也得是枚举类型的字符, 这样就只能用map来对枚举字段进行维度划分. 这样搞有个问题, 就是rebalance的时候, 这种计划可能还是会产生反复生产, 解决方案就是监听rebalance, rebalance之后, 就把本人的bitmap播送进来(能够用zk,redis,也能够发一个mq), consumer会取到rebalance之后本人负责的队列的bitmap

August 25, 2022 · 1 min · jiezi

关于mq:简单易用的任务队列beanstalkd

更多技术文章,请关注我的集体博客 www.immaxfang.com 和小公众号 Max的学习札记。概述beanstalkd 是一个简略疾速的分布式工作队列零碎,协定基于 ASCII 编码运行在 TCP 上。其最后设计的目标是通过后盾异步执行耗时工作的形式升高高容量 Web 利用的页面延时。其具备简略、轻量、易用等特点,也反对对工作优先级、延时/超时重发等管制,同时还有泛滥语言版本的客户端反对,这些长处使得它成为各种须要队列零碎场景的一种常见抉择。 beanstalkd 长处如他官网的介绍,simple&fast,应用非常简单,适宜须要引入音讯队列又不想引入 kafka 这类重型的 mq,保护成本低;同时,它的性能十分高,大部分场景下都能够 cover 住。反对长久化反对音讯优先级,topic,延时音讯,音讯重试等支流语言客户端都反对,还能够依据 beanstalkd 协定自行实现。beanstalkd 有余无最大内存管制,当业务音讯极多时,服务可能会不稳固。官网没有提供集群故障切换计划(主从或哨兵等),须要本人解决。beanstalkd 重点概念job工作,队列中的根本单元,每个 job 都会有 id 和优先级。有点相似其余音讯队列中的 message 的概念。但 job 有各种状态,下文介绍生命周期局部会重点介绍。job 寄存在 tube 中。 tube管道,用来存储同一类型的 job。有点相似其余音讯队列中的 topic 的概念。beanstalkd 通过 tube 来实现多任务队列,beanstalkd 中能够有多个管道,每个管道有本人的 producer 和 consumer,管道之间相互不影响。 producerjob 生产者。通过 put 命令将一个 job 放入到一个 tube 中。 consumerjob 消费者。通过 reserve 来获取 job,通过 delete、release、bury 来扭转 job 的状态。 beanstalkd 生命周期上文介绍到,beanstalkd 中 job 有状态辨别,在整个生命周期中,job 可能有四种状态:READY, RESERVED, DELAYED, BURIED。只有处于READY状态的 job 能力被生产。下图介绍了各状态之间的流转状况。 ...

July 21, 2022 · 3 min · jiezi

关于mq:python3调用rocket-mq

installhttps://github.com/apache/roc... wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm Installationpip install rocketmq-client-pythonProducerfrom rocketmq.client import Producer, Messageproducer = Producer('test-litx')producer.set_name_server_address('rxxrocketmq-namesrv.sit.dexxxxm:9876')producer.start()msg = Message('itworkspace')msg.set_keys('status')msg.set_tags('json')msg.set_body('{"alertname": "test666666666","building": "t11111tttt"}')ret = producer.send_sync(msg)print(ret.status, ret.msg_id, ret.offset)producer.shutdown() PushConsumerimport timefrom rocketmq.client import PushConsumer, ConsumeStatusdef callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESSconsumer = PushConsumer('itworkxxxce-consumer')consumer.set_name_server_address('red-rocketmq-namesrv.sit.devops.xixxxu.com:9876')consumer.subscribe('itwxxce', callback)consumer.start()time.sleep(3)consumer.shutdown()

July 8, 2022 · 1 min · jiezi

关于mq:系统学习消息队列RabbitMQ的消息发布确认

1.MQ发送信息的时候产生的问题2.MQ的公布确认原理3.MQ的公布确认策略 1.MQ发送信息的时候产生的问题 咱们在前一篇博客零碎学习音讯队列——RabbitMQ的音讯应答和长久化中学过,当消费者挂掉的时候,有音讯重发,当队列挂掉的时候,有音讯长久化,然而咱们却无奈保障生产者发送到队列的音讯是否确定发送胜利,这个时候就有了音讯的公布确认。 2.MQ的公布确认原理当咱们的信道被设置成公布确认(confirm)模式,那么所有在该信道下面公布的音讯都会被指派一个惟一的ID,一旦音讯胜利投递,broker就会发送一个确认给生产者,生产者此时就晓得音讯曾经投递胜利,生产者就会把这条音讯进行删除。 confirm模式能够是同步的,也能够是异步的,同步的状况下是发送之后马上进行确认,异步的话生产者能够无需期待确认只管发送音讯,如果某些音讯失去确认,生产者将就能够通过回调办法来确认音讯。 3.MQ的公布确认策略 3.1)开启确认公布公布确认模式默认是没有开启的,咱们须要调用办法将它关上。 Channel channel = connection.createChannel(); //开启公布确认 channel.confirmSelect();3.2)单个确认公布 这是一种简略的同步确认形式,发送一条音讯,确认一条音讯,后续的音讯能力持续发送。 长处:简略易懂。毛病:公布速度过慢,如果后面的音讯没有失去确认,前面的音讯就不得发送,容易阻塞。 public class ProducerSingle { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开 try (Channel channel = RabbitMqUtils.getChannel()) { /** * 申明一个队列 * 1.队列名称 * 2.队列外面的音讯是否长久化 默认音讯存储在内存中 * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产 * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除 * 5.其余参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //开始工夫 long begin = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message = i + ""; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //确认是否发送胜利,服务端返回 false 或超时工夫内未返回,生产者能够音讯重发 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("音讯发送胜利"); } } //发送完结工夫 long end = System.currentTimeMillis(); System.out.println("公布" + 1000 + "个独自确认音讯,耗时" + (end - begin) + "ms"); } }}3.3)批量确认公布单个确认公布的速度十分慢,其实咱们能够先发送一批,而后确认一批,再公布一批。长处:比单个确认公布速度快,吞吐量大。毛病:当其中一个音讯出问题的时候,不晓得是哪个音讯呈现了问题,咱们必须将整个批处理音讯保留在内存里,以记录重要的音讯后从新公布音讯。这种办法也是阻塞的,一样阻塞音讯的公布。 ...

June 15, 2022 · 3 min · jiezi

关于mq:系统学习消息队列RabbitMQ的消息应答和持久化

1.MQ的Hellow World程序2.MQ的音讯应答3.MQ的长久化 1.MQ的Hellow World程序依据上一篇文章零碎学习音讯队列——RabbitMQ的根底概念,咱们学习了mq的基础理论,实践诚然重要,实际也尤其重要,咱们先来编写一个RabbitMq的Hello World程序,来感受一下队列。 在这里咱们创立一个音讯生产者,两个音讯消费者,轮询进行散发音讯。 pom: <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency>连贯工具类: import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * @author sulingfeng * @title: RabbitMqUtils * @date 2022/6/14 9:33 */public class RabbitMqUtils { //失去一个连贯的工具类 public static Channel getChannel() throws Exception { //创立一个连贯工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; }}生产者: import com.rabbitmq.client.Channel;import java.util.Scanner;/** * @author sulingfeng * @title: Producer * @date 2022/6/13 19:55 */public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开 try (Channel channel = RabbitMqUtils.getChannel()) { /** * 申明一个队列 * 1.队列名称 * 2.队列外面的音讯是否长久化 默认音讯存储在内存中 * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产 * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除 * 5.其余参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); //从控制台当中承受信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); /** * 发送一个音讯 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其余的参数信息 * 4.发送音讯的音讯体 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送音讯实现:"+message); } } }}消费者: ...

June 14, 2022 · 3 min · jiezi

关于mq:系统学习消息队列RabbitMQ的基础概念

1.什么是MQ2.为什么要应用MQ3.MQ的分类和抉择4.RabbitMQ的根底概念5.RabbitMQ的装置 1.什么是MQMQ(message queue),全名为音讯队列,接下来咱们从数据结构、操作系统和日常开发的角度来解释一下这个概念。从数据结构的角度上来看:MQ是一个队列,领有先进先出个性的一种数据结构。 从操作系统的角度上来看:操作系统上有一个概念,叫做管道通信。所谓管道,就是连贯一个读线程和一个写线程,以实现它们之间的通信的一个共享文件。有了管道,就意味着两个线程能够一边写,一边读,既不必放心读写管制的问题,也能够防止写过程忙碌/读过程闲暇或者写过程闲暇/读过程忙碌造成的阻塞,使两个过程有一个很好的解耦。 从日常开发的角度上来看:咱们在日常开发的时候,常常会须要有排队、异步、限流等操作,有了mq当前,音讯只须要发送给mq,不必依赖其它服务。 2.为什么要应用MQ咱们应用mq个别有三个目标:削峰填谷,利用解耦,异步解决,咱们来挨个解释一下这三个概念。 2.1)削峰填谷假如当初有一个电商零碎,一秒钟能解决一万个申请,在失常的时间段,零碎的生产能力齐全能够消化。然而在秒杀/流动/热品的顶峰期间,如果一瞬间有两万个申请,那么这个时候操作系统是解决不了的,当初有了MQ,咱们就能够把所有的音讯放在MQ里,而后让零碎能生产的速度进行解决,就达到了流量的一个削峰。在服务器应用顶峰后,往往会有一段低谷,就把生产不过去的那一部分订单丢到零碎应用低谷期间进行解决,这就是削峰填谷。 2.2)利用解耦咱们假如还是一个电商零碎,其中会有领取,订单,库存,物流等零碎,当用户下单时,上游零碎挨个调用上游零碎,然而任何一个子系统呈现了故障,下单都会失败,用户体验不佳。当零碎调用形式变为音讯队列的时候,咱们只须要把音讯发送给其它零碎,就算其它零碎出了故障,也能够保障本人的零碎胜利,在故障复原之后,因为音讯还能够保留在mq当中,能够持续生产,两头用户感知不到某些零碎的异样,晋升零碎可用性。 2.3)异步解决比方咱们当初有一个注册性能,有输出邀请码和不输出邀请码两个选项,如果输出邀请码,邀请者会取得处分。此时,咱们就能够让音讯队列来进行实现,把处分的逻辑放在队列的消费者外面,音讯生产者只须要实现注册逻辑,再发送一条音讯给队列,就能够大大减少逻辑的解决工夫,无需阻塞。 3.MQ的分类和抉择 3.1)ActiveMQ 长处:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,音讯可靠性较低的概率失落数据 毛病:官网社区当初对 ActiveMQ 5.x 保护越来越少,高吞吐量场景较少应用。 3.2)Kafka 长处: 性能卓越,单机写入 TPS 约在百万条/秒,最大的长处,就是吞吐量高。时效性 ms 级可用性十分高,kafka 是分布式的,一个数据多个正本,多数机器宕机,不会失落数据,不会导致不可用,消费者采纳 Pull 形式获取音讯, 音讯有序, 通过管制可能保障所有音讯被生产且仅被生产一次;有优良的第三方KafkaWeb 治理界面 Kafka-Manager;在日志畛域比拟成熟,被多家公司和多个开源我的项目应用;性能反对: 性能较为简单,次要反对简略的 MQ 性能,在大数据畛域的实时计算以及日志采集被大规模应用 毛病:Kafka 单机超过 64 个队列/分区,Load 会产生显著的飙高景象,队列越多,load 越高,发送音讯响应工夫变长,应用短轮询形式,实时性取决于轮询间隔时间,生产失败不反对重试;反对音讯程序,然而一台代理宕机后,就会产生音讯乱序,社区更新较慢; 3.3)RocketMQ 长处:单机吞吐量十万级,可用性十分高,分布式架构,音讯能够做到 0 失落,MQ 性能较为欠缺,还是分布式的,扩展性好,反对 10 亿级别的音讯沉积,不会因为沉积导致性能降落,源码是 java 咱们能够本人浏览源码,定制本人公司的 MQ 毛病:反对的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度个别,没有在MQ外围中去实现 JMS 等接口,有些零碎要迁徙须要批改大量代码 3.4)RabbitMQ 长处:因为 erlang 语言的高并发个性,性能较好;吞吐量到万级,MQ 性能比拟齐备,强壮、稳固、易用、跨平台、反对多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,反对 AJAX 文档齐全;开源提供的治理界面十分棒,用起来很好用,社区活跃度高;更新频率相当高 ...

June 13, 2022 · 1 min · jiezi

关于mq:mq从零开始实现-mq11消费者消息回执添加分组信息-pull-message-ack-groupName

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 【mq】从零开始实现 mq-07-负载平衡 load balance 【mq】从零开始实现 mq-08-配置优化 fluent 【mq】从零开始实现 mq-09-消费者拉取音讯 pull message 【mq】从零开始实现 mq-10-消费者拉取音讯回执 pull message ack 【mq】从零开始实现 mq-11-消费者音讯回执增加分组信息 pull message ack groupName 状态回执上一节咱们实现了音讯的回执,然而存在一个问题。 同一个音讯,能够被不同的 groupName 进行生产,所以回执是须要依据 groupName 进行离开的,这个上一节中脱漏了。 Broker 推送音讯的调整以前推送音讯是间接推送,然而短少 groupName 信息。 订阅列表获取获取订阅列表的实现调整如下: public List<ChannelGroupNameDto> getPushSubscribeList(MqMessage mqMessage) { final String topicName = mqMessage.getTopic(); Set<ConsumerSubscribeBo> set = pushSubscribeMap.get(topicName); if(CollectionUtil.isEmpty(set)) { return Collections.emptyList(); } //2. 获取匹配的 tag 列表 final List<String> tagNameList = mqMessage.getTags(); Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>(); for(ConsumerSubscribeBo bo : set) { String tagRegex = bo.getTagRegex(); if(RegexUtil.hasMatch(tagNameList, tagRegex)) { String groupName = bo.getGroupName(); MapUtil.putToListMap(groupMap, groupName, bo); } } //3. 依照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 抉择 final String shardingKey = mqMessage.getShardingKey(); List<ChannelGroupNameDto> channelGroupNameList = new ArrayList<>(); for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) { List<ConsumerSubscribeBo> list = entry.getValue(); ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey); final String channelId = bo.getChannelId(); BrokerServiceEntryChannel entryChannel = registerMap.get(channelId); if(entryChannel == null) { log.warn("channelId: {} 对应的通道信息为空", channelId); continue; } final String groupName = entry.getKey(); ChannelGroupNameDto channelGroupNameDto = ChannelGroupNameDto.of(groupName, entryChannel.getChannel()); channelGroupNameList.add(channelGroupNameDto); } return channelGroupNameList;}ChannelGroupNameDto 的定义如下: ...

May 17, 2022 · 2 min · jiezi

关于mq:mq从零开始实现-mq09消费者拉取消息-pull-message

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 【mq】从零开始实现 mq-07-负载平衡 load balance 【mq】从零开始实现 mq-08-配置优化 fluent 【mq】从零开始实现 mq-09-消费者拉取音讯 pull message 音讯的推与拉大家好,我是老马。 这一节咱们来一起看一下 MQ 音讯中的推和拉两种模式。 推音讯由 broker 间接推送给消费者,实时性比拟好。 毛病是如果消费者解决不过去,就会造成大量问题。 拉音讯由消费者定时从 broker 拉取,长处是实现简略,能够依据消费者本人的解决能力来生产。 毛病是实时性绝对较差。 理论业务中,须要联合具体的场景,抉择适合的策略。 拉取策略实现push 策略咱们首先看一下 push 策略的简化外围实现: package com.github.houbb.mq.consumer.core;/** * 推送生产策略 * * @author binbin.hou * @since 1.0.0 */public class MqConsumerPush extends Thread implements IMqConsumer { @Override public void run() { // 启动服务端 log.info("MQ 消费者开始启动服务端 groupName: {}, brokerAddress: {}", groupName, brokerAddress); //1. 参数校验 this.paramCheck(); try { //0. 配置信息 //1. 初始化 //2. 连贯到服务端 //3. 标识为可用 //4. 增加钩子函数 //5. 启动实现当前的事件 this.afterInit(); log.info("MQ 消费者启动实现"); } catch (Exception e) { log.error("MQ 消费者启动异样", e); throw new MqException(ConsumerRespCode.RPC_INIT_FAILED); } } /** * 初始化实现当前 */ protected void afterInit() { } // 其余办法 /** * 获取生产策略类型 * @return 类型 * @since 0.0.9 */ protected String getConsumerType() { return ConsumerTypeConst.PUSH; }}咱们在 push 中预留了一个 afterInit 办法,便于子类重载。 ...

May 11, 2022 · 3 min · jiezi

关于mq:mq从零开始实现-mq08配置优化-fluent

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 【mq】从零开始实现 mq-07-负载平衡 load balance 【mq】从零开始实现 mq-08-配置优化 fluent fluent大家好,我是老马。 fluent 的配置形式,是我集体十分喜爱的一种配置形式。 传统的 java 应用 get/set 办法进行属性设置。 相似这种: MqBroker mqBroker = new MqBroker();mqBroker.setPort(9999);mqBroker.setAddress("127.0.0.1");fluent 写法能够让咱们写起来代码更加晦涩: MqBroker.newInstance().port(9999).address("127.0.0.1")写起来更加丝滑晦涩。 Broker 配置属性/** * 端口号 */private int port = BrokerConst.DEFAULT_PORT;/** * 调用治理类 * * @since 1.0.0 */private final IInvokeService invokeService = new InvokeService();/** * 消费者治理 * * @since 0.0.3 */private IBrokerConsumerService registerConsumerService = new LocalBrokerConsumerService();/** * 生产者治理 * * @since 0.0.3 */private IBrokerProducerService registerProducerService = new LocalBrokerProducerService();/** * 长久化类 * * @since 0.0.3 */private IMqBrokerPersist mqBrokerPersist = new LocalMqBrokerPersist();/** * 推送服务 * * @since 0.0.3 */private IBrokerPushService brokerPushService = new BrokerPushService();/** * 获取响应超时工夫 * @since 0.0.3 */private long respTimeoutMills = 5000;/** * 负载平衡 * @since 0.0.7 */private ILoadBalance<ConsumerSubscribeBo> loadBalance = LoadBalances.weightRoundRobbin();/** * 推送最大尝试次数 * @since 0.0.8 */private int pushMaxAttempt = 3;flent 配置public MqBroker port(int port) { this.port = port; return this;}public MqBroker registerConsumerService(IBrokerConsumerService registerConsumerService) { this.registerConsumerService = registerConsumerService; return this;}public MqBroker registerProducerService(IBrokerProducerService registerProducerService) { this.registerProducerService = registerProducerService; return this;}public MqBroker mqBrokerPersist(IMqBrokerPersist mqBrokerPersist) { this.mqBrokerPersist = mqBrokerPersist; return this;}public MqBroker brokerPushService(IBrokerPushService brokerPushService) { this.brokerPushService = brokerPushService; return this;}public MqBroker respTimeoutMills(long respTimeoutMills) { this.respTimeoutMills = respTimeoutMills; return this;}public MqBroker loadBalance(ILoadBalance<ConsumerSubscribeBo> loadBalance) { this.loadBalance = loadBalance; return this;}Producer 配置属性/** * 分组名称 */private String groupName = ProducerConst.DEFAULT_GROUP_NAME;/** * 中间人地址 */private String brokerAddress = "127.0.0.1:9999";/** * 获取响应超时工夫 * @since 0.0.2 */private long respTimeoutMills = 5000;/** * 检测 broker 可用性 * @since 0.0.4 */private volatile boolean check = true;/** * 调用治理服务 * @since 0.0.2 */private final IInvokeService invokeService = new InvokeService();/** * 状态治理类 * @since 0.0.5 */private final IStatusManager statusManager = new StatusManager();/** * 生产者-两头服务端服务类 * @since 0.0.5 */private final IProducerBrokerService producerBrokerService = new ProducerBrokerService();/** * 为残余的申请等待时间 * @since 0.0.5 */private long waitMillsForRemainRequest = 60 * 1000;/** * 负载平衡策略 * @since 0.0.7 */private ILoadBalance<RpcChannelFuture> loadBalance = LoadBalances.weightRoundRobbin();/** * 音讯发送最大尝试次数 * @since 0.0.8 */private int maxAttempt = 3;fluent 配置public MqProducer groupName(String groupName) { this.groupName = groupName; return this;}public MqProducer brokerAddress(String brokerAddress) { this.brokerAddress = brokerAddress; return this;}public MqProducer respTimeoutMills(long respTimeoutMills) { this.respTimeoutMills = respTimeoutMills; return this;}public MqProducer check(boolean check) { this.check = check; return this;}public MqProducer waitMillsForRemainRequest(long waitMillsForRemainRequest) { this.waitMillsForRemainRequest = waitMillsForRemainRequest; return this;}public MqProducer loadBalance(ILoadBalance<RpcChannelFuture> loadBalance) { this.loadBalance = loadBalance; return this;}public MqProducer maxAttempt(int maxAttempt) { this.maxAttempt = maxAttempt; return this;}Consuemr 配置属性/** * 组名称 */private String groupName = ConsumerConst.DEFAULT_GROUP_NAME;/** * 中间人地址 */private String brokerAddress = "127.0.0.1:9999";/** * 获取响应超时工夫 * @since 0.0.2 */private long respTimeoutMills = 5000;/** * 检测 broker 可用性 * @since 0.0.4 */private volatile boolean check = true;/** * 为残余的申请等待时间 * @since 0.0.5 */private long waitMillsForRemainRequest = 60 * 1000;/** * 调用治理类 * * @since 1.0.0 */private final IInvokeService invokeService = new InvokeService();/** * 音讯监听服务类 * @since 0.0.5 */private final IMqListenerService mqListenerService = new MqListenerService();/** * 状态治理类 * @since 0.0.5 */private final IStatusManager statusManager = new StatusManager();/** * 生产者-两头服务端服务类 * @since 0.0.5 */private final IConsumerBrokerService consumerBrokerService = new ConsumerBrokerService();/** * 负载平衡策略 * @since 0.0.7 */private ILoadBalance<RpcChannelFuture> loadBalance = LoadBalances.weightRoundRobbin();/** * 订阅最大尝试次数 * @since 0.0.8 */private int subscribeMaxAttempt = 3;/** * 勾销订阅最大尝试次数 * @since 0.0.8 */private int unSubscribeMaxAttempt = 3;fluent 配置public MqConsumerPush subscribeMaxAttempt(int subscribeMaxAttempt) { this.subscribeMaxAttempt = subscribeMaxAttempt; return this;}public MqConsumerPush unSubscribeMaxAttempt(int unSubscribeMaxAttempt) { this.unSubscribeMaxAttempt = unSubscribeMaxAttempt; return this;}public MqConsumerPush groupName(String groupName) { this.groupName = groupName; return this;}public MqConsumerPush brokerAddress(String brokerAddress) { this.brokerAddress = brokerAddress; return this;}public MqConsumerPush respTimeoutMills(long respTimeoutMills) { this.respTimeoutMills = respTimeoutMills; return this;}public MqConsumerPush check(boolean check) { this.check = check; return this;}public MqConsumerPush waitMillsForRemainRequest(long waitMillsForRemainRequest) { this.waitMillsForRemainRequest = waitMillsForRemainRequest; return this;}public MqConsumerPush loadBalance(ILoadBalance<RpcChannelFuture> loadBalance) { this.loadBalance = loadBalance; return this;}小结这一节的实现非常简单,能够说是没有啥技术难度。 ...

May 10, 2022 · 3 min · jiezi

关于mq:mq从零开始实现-mq07负载均衡-load-balance

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 【mq】从零开始实现 mq-07-负载平衡 load balance 为什么须要负载平衡大家好,我是老马。 这一节让咱们看一下如何实现 MQ 的负载平衡。 为什么须要负载平衡呢? 作用负载平衡最外围的作用: (1)能够防止单点故障 (2)能够让申请均分的扩散到每一个节点 实现思路负载平衡实现的形式比拟多,最简略的就是随机抉择一个。 拓展浏览: 从零手写实现负载平衡 http://houbb.github.io/2020/0... MQ 中用到负载平衡的中央生产者发送生产者发送音讯时,能够发送给任一 broker。 broker 推送给消费者broker 接管到音讯当前,在推送给消费者时,也能够任一抉择一个。 消费者的生产 ACK消费者生产完,状态回执给 broker,能够抉择任一一个。 音讯黏连有些音讯比拟非凡,比方须要保障生产的有序性,能够通过 shardingKey 的形式,在负载的时候固定到指定的片区。 代码实现生产者发送对立调整获取 channel 的办法。 @Overridepublic Channel getChannel(String key) { // 期待启动实现 while (!statusManager.status()) { log.debug("期待初始化实现..."); DateUtil.sleep(100); } RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(this.loadBalance, channelFutureList, key); return rpcChannelFuture.getChannelFuture().channel();}工具类实现为外围实现: /** * 负载平衡 * * @param list 列表 * @param key 分片键 * @return 后果 * @since 0.0.7 */public static <T extends IServer> T loadBalance(final ILoadBalance<T> loadBalance, final List<T> list, String key) { if(CollectionUtil.isEmpty(list)) { return null; } if(StringUtil.isEmpty(key)) { LoadBalanceContext<T> loadBalanceContext = LoadBalanceContext.<T>newInstance() .servers(list); return loadBalance.select(loadBalanceContext); } // 获取 code int hashCode = Objects.hash(key); int index = hashCode % list.size(); return list.get(index);}如果指定了 shardingKey,那么依据 shadringKey 进行 hash 判断。 ...

May 8, 2022 · 2 min · jiezi

关于mq:mq从零开始实现-mq06消费者心跳检测-heartbeat

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat 为什么须要心跳?心跳(heartbeat ),顾名思义就是心脏的跳动。 医学上个别通过心跳是否跳动,来判断一个人是否活着。 那么,分布式服务中如何判断一个服务是否还活着呢? 实现思路比方 mq 中,broker 须要把音讯实时推送给在线的消费者。 那么如何判断一个消费者是否活着呢? 咱们能够让消费者定时,比方每 5 秒钟给 broker 发送一个心跳包,思考到网络提早等,如果间断 1min 都没有收到心跳,咱们则移除这个消费者,认为服务曾经挂了。 消费者实现上代码! 心跳实现心跳能够是一个很简略的音讯体。 @Overridepublic void heartbeat() { final MqHeartBeatReq req = new MqHeartBeatReq(); final String traceId = IdHelper.uuid32(); req.setTraceId(traceId); req.setMethodType(MethodType.C_HEARTBEAT); req.setAddress(NetUtil.getLocalHost()); req.setPort(0); req.setTime(System.currentTimeMillis()); log.debug("[HEARTBEAT] 往服务端发送心跳包 {}", JSON.toJSON(req)); // 告诉全副 for(RpcChannelFuture channelFuture : channelFutureList) { try { Channel channel = channelFuture.getChannelFuture().channel(); callServer(channel, req, null); } catch (Exception exception) { log.error("[HEARTBEAT] 往服务端解决异样", exception); } }}消费者把心跳告诉所有的 broker. ...

May 6, 2022 · 2 min · jiezi

关于mq:mq从零开始实现-mq05实现优雅停机

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 【mq】从零开始实现 mq-05-实现优雅停机 为什么须要优雅敞开?我记得多年前,那个时候 rpc 框架支流用的还是 dubbo,每次都是中午还是上线,上线上完根本都是凌晨 2-3 点。 为什么要中午上线呢? 因为这个时候个别业务流量最低。 还有就是上线公布,每次都要人工期待一段几分钟。 因为 rpc 调用入口曾经敞开了,然而自身可能还没有解决完。 那么有没有办法能够让服务的敞开更加优雅,而不是人工期待呢? 实现思路人工期待几分钟的形式个别能够解决问题,然而大部分状况是无用功,还比拟浪费时间。 比拟天然的一种形式是引入钩子函数。 当利用筹备敞开时,首先判断是否存在解决中的申请,不存在则间接敞开;存在,则期待申请实现再敞开。 实现生产者和消费者是相似的,咱们以生产者为例。 启动实现的调整@Overridepublic synchronized void run() { this.paramCheck(); // 启动服务端 log.info("MQ 生产者开始启动客户端 GROUP: {} brokerAddress: {}", groupName, brokerAddress); try { //0. 配置信息 ProducerBrokerConfig config = ProducerBrokerConfig.newInstance() .groupName(groupName) .brokerAddress(brokerAddress) .check(check) .respTimeoutMills(respTimeoutMills) .invokeService(invokeService) .statusManager(statusManager); //1. 初始化 this.producerBrokerService.initChannelFutureList(config); //2. 连贯到服务端 this.producerBrokerService.registerToBroker(); //3. 标识为可用 statusManager.status(true); //4. 增加钩子函数 final DefaultShutdownHook rpcShutdownHook = new DefaultShutdownHook(); rpcShutdownHook.setStatusManager(statusManager); rpcShutdownHook.setInvokeService(invokeService); rpcShutdownHook.setWaitMillsForRemainRequest(waitMillsForRemainRequest); rpcShutdownHook.setDestroyable(this.producerBrokerService); ShutdownHooks.rpcShutdownHook(rpcShutdownHook); log.info("MQ 生产者启动实现"); } catch (Exception e) { log.error("MQ 生产者启动遇到异样", e); throw new MqException(ProducerRespCode.RPC_INIT_FAILED); }}状态治理类这里咱们引入 statusManager 治理整体的状态。 ...

May 5, 2022 · 2 min · jiezi

关于mq:mq从零开始实现-mq04启动检测与实现优化

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 【mq】从零开始实现 mq-04-启动检测与实现优化 上一节咱们引入了中间人 broker,让音讯的生产者和消费者解耦。 这一节咱们对初始化代码进行优化,便于前期拓展保护。 生产者启动优化启动实现整体实现调整如下: @Overridepublic synchronized void run() { this.paramCheck(); // 启动服务端 log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}", groupName, port, brokerAddress); try { //channel future this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress, initChannelHandler(), check); // register to broker this.registerToBroker(); // 标识为可用 enableFlag = true; log.info("MQ 生产者启动实现"); } catch (Exception e) { log.error("MQ 生产者启动遇到异样", e); throw new MqException(ProducerRespCode.RPC_INIT_FAILED); }}看起来是不是比起原来清新很多呢? 然而复杂性只会转移,不会隐没。 答案就是封装到 initChannelFutureList 中去了。 ...

May 3, 2022 · 2 min · jiezi

关于mq:mq从零开始实现-mq03引入-broker-中间人

前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】从零开始实现 mq-02-如何实现生产者调用消费者? 【mq】从零开始实现 mq-03-引入 broker 中间人 上一节咱们学习了如何实现生产者给消费者发送音讯,然而是通过直连的形式。 那么如何能力达到解耦的成果呢? 答案就是引入 broker,音讯的中间人。 MqBroker 实现外围启动类相似咱们后面 consumer 的启动实现: package com.github.houbb.mq.broker.core;/** * @author binbin.hou * @since 1.0.0 */public class MqBroker extends Thread implements IMqBroker { // 省略 private ChannelHandler initChannelHandler() { MqBrokerHandler handler = new MqBrokerHandler(); handler.setInvokeService(invokeService); handler.setRegisterConsumerService(registerConsumerService); handler.setRegisterProducerService(registerProducerService); handler.setMqBrokerPersist(mqBrokerPersist); handler.setBrokerPushService(brokerPushService); handler.setRespTimeoutMills(respTimeoutMills); return handler; } @Override public void run() { // 启动服务端 log.info("MQ 中间人开始启动服务端 port: {}", port); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(workerGroup, bossGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf)) .addLast(initChannelHandler()); } }) // 这个参数影响的是还没有被accept 取出的连贯 .option(ChannelOption.SO_BACKLOG, 128) // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 .childOption(ChannelOption.SO_KEEPALIVE, true); // 绑定端口,开始接管进来的链接 ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly(); log.info("MQ 中间人启动实现,监听【" + port + "】端口"); channelFuture.channel().closeFuture().syncUninterruptibly(); log.info("MQ 中间人敞开实现"); } catch (Exception e) { log.error("MQ 中间人启动异样", e); throw new MqException(BrokerRespCode.RPC_INIT_FAILED); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}initChannelHandler 中有不少新面孔,咱们前面会具体介绍。 ...

April 30, 2022 · 10 min · jiezi

关于mq:MQjava-从零开始实现消息队列-mq02如何实现生产者调用消费者

前景回顾上一节咱们学习了如何实现基于 netty 客服端和服务端的启动。 【mq】从零开始实现 mq-01-生产者、消费者启动 【mq】java 从零开始实现音讯队列 mq-02-如何实现生产者调用消费者? 那么客户端如何调用服务端呢? 咱们本节就来一起实现一下。 消费者实现启动类的调整ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(workerGroup, bossGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf)) .addLast(new MqConsumerHandler(invokeService)); } }) // 这个参数影响的是还没有被accept 取出的连贯 .option(ChannelOption.SO_BACKLOG, 128) // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 .childOption(ChannelOption.SO_KEEPALIVE, true);这里咱们通过指定分隔符解决 netty 粘包问题。 解决 netty 粘包问题MqConsumerHandler 解决类MqConsumerHandler 的实现如下,增加对应的业务解决逻辑。 package com.github.houbb.mq.consumer.handler;/** * @author binbin.hou * @since 1.0.0 */public class MqConsumerHandler extends SimpleChannelInboundHandler { private static final Log log = LogFactory.getLog(MqConsumerHandler.class); /** * 调用治理类 * @since 1.0.0 */ private final IInvokeService invokeService; public MqConsumerHandler(IInvokeService invokeService) { this.invokeService = invokeService; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); RpcMessageDto rpcMessageDto = null; try { rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class); } catch (Exception exception) { log.error("RpcMessageDto json 格局转换异样 {}", new String(bytes)); return; } if (rpcMessageDto.isRequest()) { MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx); if(commonResp == null) { log.debug("以后音讯为 null,疏忽解决。"); return; } writeResponse(rpcMessageDto, commonResp, ctx); } else { final String traceId = rpcMessageDto.getTraceId(); // 抛弃掉 traceId 为空的信息 if(StringUtil.isBlank(traceId)) { log.debug("[Server Response] response traceId 为空,间接抛弃", JSON.toJSON(rpcMessageDto)); return; } // 增加音讯 invokeService.addResponse(traceId, rpcMessageDto); } }}rpc 音讯体定义为了统一标准,咱们的 rpc 音讯体 RpcMessageDto 定义如下: ...

April 23, 2022 · 8 min · jiezi

关于mq:mq从零开始实现-mq01生产者消费者启动

MQ 是什么?MQ(Message Queue)音讯队列,是根底数据结构中“先进先出”的一种数据结构。 指把要传输的数据(音讯)放在队列中,用队列机制来实现消息传递——生产者产生音讯并把音讯放入队列,而后由消费者去解决。 消费者能够到指定队列拉取音讯,或者订阅相应的队列,由MQ服务端给其推送音讯。 MQ 的作用?音讯队列中间件是分布式系统中重要的组件,次要解决利用解耦,异步音讯,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。 解耦:一个业务须要多个模块独特实现,或者一条音讯有多个零碎须要对应解决,只须要主业务实现当前,发送一条MQ,其余模块生产MQ音讯,即可实现业务,升高模块之间的耦合。 异步:主业务执行完结后隶属业务通过MQ,异步执行,减低业务的响应工夫,进步用户体验。 削峰:高并发状况下,业务异步解决,提供高峰期业务解决能力,防止零碎瘫痪。 ps: 以上内容摘选自百科。 实现 mq 的筹备工作maven 引入<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.42.Final</version></dependency><dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.76</version></dependency>模块划分The message queue in java. 作为 mq 的从零开始的学习我的项目,目前已开源。 我的项目的模块如下: 模块阐明mq-common公共代码mq-broker注册核心mq-producer音讯生产者mq-consumer音讯消费者音讯消费者接口定义package com.github.houbb.mq.consumer.api;/** * @author binbin.hou * @since 1.0.0 */public interface IMqConsumer { /** * 订阅 * @param topicName topic 名称 * @param tagRegex 标签正则 */ void subscribe(String topicName, String tagRegex); /** * 注册监听器 * @param listener 监听器 */ void registerListener(final IMqConsumerListener listener);}IMqConsumerListener 作为音讯监听类的接口,定义如下: ...

April 21, 2022 · 3 min · jiezi

关于mq:MQ面试问题整理

我的项目中为什么应用MQ长处解耦<br/>同一块业务的相干能力被很多个我的项目须要,后续也有极大可能会被其余我的项目须要。将该局部能力解耦,通过MQ进行音讯的对立解决。若有其余我的项目或业务须要相干性能,则对此进行订阅,解决相干业务。缩小我的项目间接依赖。<br/>异步<br/>大型项目相干合作人员,参加团队越来越多,我的项目之间的关联越来越深。造成整个链路特地长,且问题排查不容易。<br/>削峰<br/>对于局部业务,在某些工夫点,可能因为刹时申请过大,造成所有申请间接拜访至数据库对数据库造成压力,从而影响我的项目原有业务。个别MySQL数据库的解决能力在2k次/s,MQ的顶峰解决能力1w+。能无效的解决<br/><!-- more -->毛病零碎可用性升高因为新减少了MQ,因为MQ的可用性、可靠性会造成零碎可用性的额外负担。<br/> 可用性问题如何解决<br/>不同的MQ对此的解决方案不一样。ActiveMQZookeeper及多个activeMQ,批改长久化为性能更好的LevelDB替换掉默认的KahaDB.<br/>长处:简略实现了可用性。<br/>毛病:每次理论可用的MQ只有一个,一旦有一个MQ呈现问题,相干的音讯就会失落。<br/>RocketMQ 单master模式,一旦呈现问题,就会整个服务不可用。个别不会间接生产应用,适宜集体学习;多master模式,多个master节点组成集群。单个master呈现问题不影响。长处:所有模式中性能最高的。 毛病:单个master节点宕机期间,未被生产的音讯在节点复原之前不可用,音讯实时性就受到影响。留神:应用同步刷盘可保障音讯不失落,同时topic绝对应的queue应该散布在集权中各个master节点,而不是只在某一个master节点上;多master多slave异步复制模式,在多master模式下,每个master节点至多都有一个对应的slave;<br/>master节点可读可写,slave只可读 不可写。长处:在master宕机时,消费者能够间接从slave读取音讯,音讯实时性不会受到影响,性能简直与多master一样。毛病:应用异步复制可能造成音讯失落问题。多master多slave同步双写,同上一种模式相似,区别在于master与slave之间的数据同步形式。长处:同步双写的同步模式能保证数据不失落。毛病:发送单个音讯时长会比拟长,性能会比拟差;同步刷盘,异步刷盘的区别:在于多master之间的数据同步写入磁盘模式。 同步复制异步复制概念即等 Master 和 Slave 均写胜利后才反馈给客户端写胜利状态只有 Master 写胜利,就反馈客户端写胜利状态可靠性可靠性高,若 Master 呈现故障,Slave 上有全副的备份数据,容易复原若 Master 呈现故障,可能存在一些数据还没来得及写入 Slave,可能会失落效率因为是同步复制,会减少数据写入提早,升高零碎吞吐量因为只有写入 Master 即可,故数据写入提早较低,吞吐量较高理论利用中的举荐把Master和Slave设置成ASYNC_FLUSH的异步刷盘形式,主从之间配置成SYNC_MASTER的同步复制形式,这样即便有一台机器出故障,依然能够保证数据不丢。可靠性问题如何解决<br/>引入MQ造成的数据可靠性问题,或者如何解决音讯失落问题?整个MQ的链路中,可能存在音讯失落的地位仅有三个,一个是生产端,一个是MQ本身,最初一个就是上游的消费者。<br/>不同的MQ对此解决方案均不一样,需别离剖析。ActiveMQ生产端程序在每次发往MQ的音讯后,可接管到MQ对其的Ack响应, 零碎复杂度进步减少了MQ,引入了音讯是否会反复生产,音讯失落问题。<br/> 数据一致性引入MQ之后,繁多事务会被分拆为多个事务。引入了分布式事务一致性的问题。<br/> MQ如何解决反复生产问题幂等校验上游不能反复下发<br/>程序本人保障不反复创立音讯,应用业务事务进行管制。<br/>上游不能反复生产<br/>将音讯信息入库,应用数据库、Redis进行音讯存储。有音讯过去应用惟一主键进行存储,每次进行校验是否存在,不存在持续解决,否则依据业务进行疏忽或更新;<br/>将音讯信息入库,依据业务进行惟一主键束缚,若有反复数据,则会报惟一主键抵触,也插入不了;如果仅是redis的set操作,则无需解决。即时反复也不影响;MQ如何解决音讯失落问题http://yannis.club

February 22, 2022 · 1 min · jiezi

关于mq:RabbitMQ-Exchange-Types-四种类型

交换器RabbitMQ 消息传递模型的核心思想是生产者从不间接向队列发送任何音讯。生产者只将音讯发送到 Exchange 交换器中,并不知道音讯是否会被传送到队列。交换器负责接管生产者生产的音讯,并通过肯定路由规定将音讯发送到指定的队列,起到一个传递的作用 类型介绍RabbitMQ 罕用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP标准里还提到两种 Exchange Type,别离为 system 与自定义,这里不予以形容)。 fanoutfanout 类型的 Exchange 路由规定十分。它会把所有发送到该 Exchange 的音讯路由到所有与它绑定的 Queue 中。这种模式在 RabboitMQ 官网介绍中称之为:公布/订阅 图 1 fanout Exchange 来看下官网给出的代码示例: emit_log.php <?phprequire_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');$channel = $connection->channel();$exchange_name = 'exchange_name';$channel->exchange_declare($exchange_name, 'fanout', false, false, false);$data = implode(' ', array_slice($argv, 1));if (empty($data)) { $data = "info: Hello World!";}$msg = new AMQPMessage($data);$channel->basic_publish($msg, 'logs');echo ' [x] Sent ', $data, "\n";$channel->close();$connection->close();receive_logs.php ...

July 16, 2021 · 3 min · jiezi

关于mq:MQ-面试要点

为什么要应用MQ当面试官问出 “为什么要应用MQ” 时,其实也就是在问 MQ 有哪些作用? 1. 解耦一个我的项目随着业务迭代会一直减少上游调用方,依照传统形式,那么每减少一个上游,就须要批改一次以后我的项目的代码,减少新的上游交互逻辑,工作量不堪称不大。 而如果在这里思考接入 MQ 作为两头代理,那么只须要将数据写入到 MQ,而不必思考简单的交互通信。并且,上游减少时,根本也不必批改上游代码,只须要让上游监听对应的通道即可。甚至,配合上动静配置,能够实现上游零批改地减少上游服务。 2. 异步如果须要进步高延时接口的响应速度。那么,就能够思考应用 MQ 来实现异步执行,只须要将申请信息写入 MQ 就能够立刻返回,实现高响应速度,至于残余的工作交给 MQ 即可。 3. 流量削峰如果业务中有高流量的场景,为了防止高流量一下子打到数据库层,造成数据库瘫痪,能够思考将申请写入 MQ,即 MQ 作为一个缓冲区,而后在零碎反对的生产速度下,通过上游服务一直地生产 MQ 中累积的音讯。当高峰期过后,MQ 中的积压音讯将缓缓被生产掉。 MQ长处和毛病长处如上所述: 解耦异步流量削峰毛病升高了零碎的可用性。原先只有上下游,现在两头多了个 MQ。一旦 MQ 故障,整个零碎将不可用。进步了零碎的复杂性。多了 MQ 之后,开发过程中可能须要思考 MQ 反复生产,音讯失落,如果保障音讯程序的问题。一致性问题。MQ 作为两头代理,多个上游执行过程中,某个或多个执行失败,那么势必造成数据的不统一。不同的MQ区别 总结而言,如果是技术挑战不高,举荐应用 RabbitMQ(Erlang语言妨碍Java工程师深入研究把握它);如果研发能力强的,能够思考 RocketMQ;如果是用于大数据畛域,那么举荐 Kafka。 不同MQ架构剖析RabbitMQRocketMQKafka如何保障音讯不反复生产(幂等)如果将数据写入到 Mysql,须要依据表主键判断行是否存在进行新增或更新即可;如果将数据写入到 redis,间接写入即可。不过,须要一个业务惟一ID作为 redis 的 key;如果没有应用到 Mysql 或 redis,那么还是须要应用其中一个来存储曾经生产过的音讯,解决办法同上。如何保障音讯不失落(以RabbitMQ为例)1. 保障生产者不丢数据一是能够思考开启 RabbitMQ 提供的事务操作。当产生音讯失落异样时,能够回滚事务,而后从新发送。这种形式下会极大地升高 MQ 吞吐量; 二是开启 channel 的 confirm 模式。生产者发送音讯给 MQ 后,MQ 会回传一个 ack。如果生产者没有收到,就能够重试。该监听办法是异步的,不会阻塞,能够间接进行下一条的发送。 2. 保障 MQ 不丢数据这里须要开启 MQ 的长久化。RabbitMQ 开启长久化分两步,一是开启 queue 的长久化,这样 MQ 会长久化 queue 的元数据;二是设置发送音讯时的音讯选项,是否长久化。 ...

May 16, 2021 · 1 min · jiezi

关于mq:常见消息队列

April 4, 2021 · 0 min · jiezi

关于mq:网关交易同步优化

现有架构:对于mysql的操作: Req:一次插入Rsp:一次查问,一次批改交易同步:一次查问,一次删除Mysql表构造为交易全副信息,作用为匹配交易 定时工作: 同步失常交易同步开放平台已超时但又有响应的交易长期表剩下只有申请但没有响应的交易重放的交易只会记录一次,因为长期表依据traceNo,appId,timestamp匹配 批改后架构: 流程: 申请插入req的队列响应蕴含残缺交易,插入rsp,监听rsp队列将交易插入es,延时异步删除req定时工作将只有申请的交易标记为“无响应”插入rsp,再同步至esFlink监听rsp进行交易告警对于mysql的操作: Req:一次插入Rsp:延时删除定时工作:一次查问劣势: 流程简略,没有简单的匹配规定可记录所有的交易,蕴含重放交易Mysql表构造为交易申请,作用为记录申请,如果须要增加交易字段,不必批改表构造。即扩展性更好实时性更好,对mysql压力更小告警可用flink进行解决,可配置更简单的规定非凡的交易状况: 失常状况下每个申请在开放平台都有响应,不论是超时还是重放的交易,响应应该在mysq中找到申请信息。申请无任何响应(gateway承受申请后忽然挂掉):定时工作查看1分钟(gateway超时工夫)前在mysql的申请,标记为未响应(在es中查问?)插入rsp队列响应在mysql未找到申请(响应比申请先到mysql,网关超时又来了业务响应):延时删除申请标记为网关超时又来了业务响应(mq超时但响应又来,业务响应在mysql中找不req):Es记录两条流水号雷同的交易,一条为网关超时,一条没有申请信息反复的交易,即雷同的申请参数反复发(req插入mysql时可能会报错雷同ID,rsp删除mysql时找不到req):es全副记录,不论是否反复响应来了的删除策略: Req来了后插入mysql,收到rsp后删除如果响应查不到申请,过五百毫秒再查,防止响应比申请先入库的状况;如果没有,应该为业务已超时但响应又回来的交易,再在es查;如果还没有,标记为mysql找不到记录。

February 20, 2021 · 1 min · jiezi

关于mq:ZMQ-指南第二章-ZeroMQ进阶

第二章 ZeroMQ进阶第一章咱们简略试用了ZMQ的若干通信模式:申请-应答模式、公布-订阅模式、管道模式。这一章咱们将学习更多在理论开发中会应用到的货色: 本章波及的内容有: 创立和应用ZMQ套接字应用套接字发送和接管音讯应用ZMQ提供的异步I/O套接字构建你的应用程序在繁多线程中应用多个套接字失当地解决致命和非致命谬误解决诸如Ctrl-C的中断信号正确地敞开ZMQ应用程序查看ZMQ应用程序的内存泄露发送和接管多帧音讯在网络中转发音讯建设简略的音讯队列代理应用ZMQ编写多线程应用程序应用ZMQ在线程间传递信号应用ZMQ协调网络中的节点应用标识创立长久化套接字在公布-订阅模式中创立和应用音讯信封如何让长久化的订阅者可能从解体中复原应用阈值(HWM)避免内存溢出零的哲学ØMQ一词中的Ø让咱们纠结了很久。一方面,这个特殊字符会升高ZMQ在谷歌和推特中的收录量;另一方面,这会惹恼某些丹麦语种的民族,他们会嚷道Ø并不是一个奇怪的0。 一开始ZMQ代表零中间件、零提早,同时,它又有了新的含意:零治理、零老本、零节约。总的来说,零示意最小、最简,这是贯通于该项目标哲理。咱们致力于缩小复杂程度,进步易用性。 套接字API说实话,ZMQ有些移花接木的嫌疑。不过咱们并不会为此赔罪,因为这种概念上的切换相对不会有害处。ZMQ提供了一套相似于BSD套接字的API,但将很多音讯解决机制的细节暗藏了起来,你会逐步适应这种变动,并乐于用它进行编程。 套接字事实上是用于网络编程的标准接口,ZMQ之所那么吸引人眼球,起因之一就是它是建设在规范套接字API之上。因而,ZMQ的套接字操作非常容易了解,其生命周期次要蕴含四个局部: 创立和销毁套接字:zmq_socket(), zmq_close()配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()为套接字建设连贯:zmq_bind(), zmq_connect()发送和接管音讯:zmq_send(), zmq_recv()如以下C代码: void *mousetrap;// Create socket for catching micemousetrap = zmq_socket (context, ZMQ_PULL);// Configure the socketint64_t jawsize = 10000;zmq_setsockopt (mousetrap, ZMQ_HWM, &jawsize, sizeof jawsize);// Plug socket into mouse holezmq_connect (mousetrap, "tcp://192.168.55.221:5001");// Wait for juicy mouse to arrivezmq_msg_t mouse;zmq_msg_init (&mouse);zmq_recv (mousetrap, &mouse, 0);// Destroy the mousezmq_msg_close (&mouse);// Destroy the socketzmq_close (mousetrap);请留神,套接字永远是空指针类型的,而音讯则是一个数据结构(咱们下文会讲述)。所以,在C语言中你通过变量传递套接字,而用援用传递音讯。记住一点,在ZMQ中所有的套接字都是由ZMQ治理的,只有音讯是由程序员治理的。 创立、销毁、以及配置套接字的工作和解决一个对象差不多,但请记住ZMQ是异步的,伸缩性很强,因而在将其利用到网络结构中时,可能会须要多一些工夫来了解。 应用套接字构建拓扑构造在连贯两个节点时,其中一个须要应用zmq_bind(),另一个则应用zmq_connect()。通常来讲,应用zmq_bind()连贯的节点称之为服务端,它有着一个较为固定的网络地址;应用zmq_connect()连贯的节点称为客户端,其地址不固定。咱们会有这样的说法:绑定套接字至端点;连贯套接字至端点。端点指的是某个广为周知网络地址。 ZMQ连贯和传统的TCP连贯是有区别的,次要有: 应用多种协定,inproc(过程内)、ipc(过程间)、tcp、pgm(播送)、epgm;当客户端应用zmq_connect()时连贯就曾经建设了,并不要求该端点已有某个服务应用zmq_bind()进行了绑定;连贯是异步的,并由一组音讯队列做缓冲;连贯会体现出某种音讯模式,这是由创立连贯的套接字类型决定的;一个套接字能够有多个输出和输入连贯;ZMQ没有提供相似zmq_accept()的函数,因为当套接字绑定至端点时它就主动开始承受连贯了;应用程序无奈间接和这些连贯打交道,因为它们是被封装在ZMQ底层的。在很多架构中都应用了相似于C/S的架构。服务端组件式比较稳定的,而客户端组件则较为动静,来去自如。所以说,服务端地址对客户端而言往往是可见的,反之则不然。这样一来,架构中应该将哪些组件作为服务端(应用zmq_bind()),哪些作为客户端(应用zmq_connect()),就很显著了。同时,这须要和你应用的套接字类型相分割起来,咱们下文会具体讲述。 让咱们试想一下,如果先关上了客户端,后关上服务端,会产生什么?传统网络连接中,咱们关上客户端时肯定会收到零碎的报错信息,但ZMQ让咱们可能自在地启动架构中的组件。当客户端应用zmq_connect()连贯至某个端点时,它就曾经可能应用该套接字发送音讯了。如果这时,服务端启动起来了,并应用zmq_bind()绑定至该端点,ZMQ将主动开始转发音讯。 服务端节点能够仅应用一个套接字就能绑定至多个端点。也就是说,它可能应用不同的协定来建设连贯: zmq_bind (socket, "tcp://*:5555");zmq_bind (socket, "tcp://*:9999");zmq_bind (socket, "ipc://myserver.ipc");当然,你不能屡次绑定至同一端点,这样是会报错的。 ...

January 22, 2021 · 13 min · jiezi

关于mq:ZMQ-指南第一章-ZeroMQ基础

ZMQ 指南作者: Pieter Hintjens <ph@imatix.com>, CEO iMatix Corporation. 翻译: 张吉 <jizhang@anjuke.com>, 安居客团体 好租网工程师 With thanks to Bill Desmarais, Brian Dorsey, CAF, Daniel Lin, Eric Desgranges, Gonzalo Diethelm, Guido Goldstein, Hunter Ford, Kamil Shakirov, Martin Sustrik, Mike Castleman, Naveen Chawla, Nicola Peduzzi, Oliver Smith, Olivier Chamoux, Peter Alexander, Pierre Rouleau, Randy Dryburgh, John Unwin, Alex Thomas, Mihail Minkov, Jeremy Avnet, Michael Compton, Kamil Kisiel, Mark Kharitonov, Guillaume Aubert, Ian Barber, Mike Sheridan, Faruk Akgul, Oleg Sidorov, Lev Givon, Allister MacLeod, Alexander D'Archangel, Andreas Hoelzlwimmer, Han Holl, Robert G. Jakabosky, Felipe Cruz, Marcus McCurdy, Mikhail Kulemin, Dr. Gergő Érdi, Pavel Zhukov, Alexander Else, Giovanni Ruggiero, Rick "Technoweenie", Daniel Lundin, Dave Hoover, Simon Jefford, Benjamin Peterson, Justin Case, Devon Weller, Richard Smith, Alexander Morland, Wadim Grasza, Michael Jakl, and Zed Shaw for their contributions, and to Stathis Sideris for Ditaa. ...

January 22, 2021 · 8 min · jiezi

关于mq:RabbitMQ安装

https://www.erlang.org/downlo... 装置:tar -zxvf otp_src_19.3.tar.gzcd otp_src_19.3/./configure --prefix=/opt/erlang yum install ncurses-devel openssl-devel unixODBC-devel gcc-c++ 装置wxWidgetshttp://www.wxwidgets.org/down...https://github.com/wxWidgets/... yum install gtk3.i686yum install gtk3-develyum install mesa*yum install freeglut* ./configure --with-opengl --enable-debug --enable-unicode

November 9, 2020 · 1 min · jiezi

关于mq:中间件MQ如何保证消息的可靠性

分布式事务的一种计划:音讯可靠性 + 最终一致性, 那么 如何保障音讯的可靠性? 1. 音讯失落(最重要)音讯失落:如何保障音讯不失落? 1.1. 网络起因发不出-> 记录日志+定时工作解决比方网络起因公布进来:怎么弄? catch异样从新while(true)发几次?NoNoNo! 如果是网络起因, 很可能发不进来, 正确做法: 数据库建一个mq_message的表, 每发一条mq音讯都记录到库;定时工作扫, 状态是 "未发送胜利" 的记录, 从新发送;1.2 Broker长久化失败->生产者发送确认+失败重发咱们的音讯都要设置长久化保留, 咱们胜利将音讯发送到mq的Broker,然而broker须要将音讯长久化, 而后能力进入到queue里, 这个两头mq挂了, 怎么保障音讯不失落? Producer-->(Broker(Exchange)-->(Queue))-->Consumer 咱们应用生产者发送者的确认机制: 只有发送到broker, broker长久化实现音讯到达送给队列当前, broker才会给发送者确认;这样就能保障发送胜利 1.3 Consumer生产失落->敞开主动确认, 应用手动ACKMQ生产时宕机或生产失败?敞开生产时主动确认, 应用 手动manual ACK, 生产完确认! 2. 音讯反复2.1 音讯反复的2种情景:比方消费者生产音讯, 收到了两遍 consumer生产失败, 手动basicReject回绝了, 这种是容许的;consumer生产实现,手动ACK之前忽然宕机, 此时没给broker确认音讯生产实现;这样, 就有可能会再发给其余的消费者,造成音讯反复; 生产胜利,ack宕机,音讯由 unack变为ready, 而后broker会从新发送主旨: 设计生产时, 做到 幂等生产 2.2 幂等生产2.2.1 生产做状态判断业务设置状态, 生产时做状态判断, 解决过就疏忽 2.2.2 防重表:惟一标识防重表:发送的每一个音讯都有惟一标识, 解决过就疏忽 3. 音讯积压音讯积压会带来MQ性能的降落, 所以肯定要解决音讯积压的问题 3.1 音讯积压的3种起因3.1.1 消费者宕机3.1.2 消费者自身生产能力有余3.1.3 发送者流量太大3.2 解决音讯积压的计划3.2.1 限度业务流量+限度发送流量(上策)3.2.2 上线更多的消费者, 加大生产规模3.2.2.3 若生产逻辑简单,能够设置:先纯生产-记录数据库-离线解决能够解决mq压力, 转移mq压力 ...

September 9, 2020 · 1 min · jiezi

关于mq:常用的MQ中间件技术选型比较

参考:https://www.cnblogs.com/imstudy/p/11064589.html

August 20, 2020 · 1 min · jiezi

消息队列-MQ-1-3分钟让你快速了解消息队列

1 什么是音讯队列队列是一个根底的“先进先出”的数据结构,音讯队列,就是一个用来寄存音讯的队列。一个根本的音讯队列的模型如下图,生产者负责往音讯队列里写入音讯,消费者通过订阅生产者公布的音讯就能够生产音讯。 2 为什么要音讯队列次要的性能有利用解藕,流量消峰,音讯散发,除此之前还有放弃一致性和不便动静扩容等性能。 1.利用解耦比方电商零碎,用户创立订单,会波及到,“订单零碎”,”库存零碎“,“物流零碎” 和 “领取零碎”,如果这些是耦合的,则任何一个零碎的故障都会导致订单创立失败。 可是如果应用音讯队列进行利用解耦,则如下图,比方“物流零碎”产生故障,则物流零碎须要解决的内容被缓存在音讯队列里,用户的下单操作仍然能够实现,等到几分钟后物流零碎复原,这时再补充解决缓存在音讯队列里的订单音讯即可,终端用户是感知不到这几分钟的系统故障的。 2.流量消峰当高并发的时候,利用零碎的压力增大,此时通过音讯队列把大量的申请暂存起来,而后扩散到绝对长的一段时间内解决,大大缓解利用零碎的压力。 仍然以电商零碎为例,比方双11秒杀流动,假如“订单零碎” 每秒能解决10000次申请,可是此时流入的是20000次申请。要解决这样的状况,要么: 减少服务器性能,让他能反对20000次申请,可是这种形式不够经济,因为大多数状况下10000次申请的解决能力曾经入不敷出。订单超过10000次后就不容许用户下单,这个想也不太可能应用音讯队列,把1秒内下单订单扩散成一段时间来解决。这时尽管有些用户能在下单后几十秒能力收到下单胜利的状态。可是总比不能下单的体验好。 3.音讯散发数据产生方只须要把数据放到音讯队列,数据生产方依据本人的须要订阅感兴趣的数据,不同数据生产方能够反复也能够不反复,互不烦扰,也不须要和数据产生方关联。 如下图各个子系统将日志数据不停的写入音讯队列, 数据生产方比方“日志解决1” 还能够把本人解决后的数据从新放入音讯队列供其余数据生产方应用,能够防止反复计算。 应用场景,比方对于用户数据进行用户画像,精准推送等。 4.异步A零碎接管一个申请后,须要: 在本人本地写入数据 3ms在B零碎写入数据 300ms在C零碎写入数据 450ms在D零碎写入数据 200ms则总共须要 3+300+450+200=980ms 可是如果应用MQ,A零碎间断发送3条申请到MQ 5ms,则此时A零碎从承受一个申请到返回响应给客户,总共 3+5=8ms, 比之前的980ms快了很多。 3 音讯队列的两种模式点对点与公布式订阅. 3.1 PTP点对点音讯从一个生产者(producer/sender)通过一个队列传给另一个消费者(consumer/receiver)。能够有多个消费者监听一个队列,然而如果有一个消费者生产了音讯,则其余消费者则获取不到音讯了。所以叫做点到点模式。发送者和接收者之间在工夫上没有依赖性,队列保留着音讯,能够放在内存 中也能够长久化,直到他们被生产或超时。 3.2 公布订阅 Publish/Subscribe发布者把音讯公布到一个topic,多个订阅者能够订阅同一个topic,公布的音讯能够被所有的订阅者生产。所以叫做公布订阅模式。发布者和订阅者之间有工夫上的依赖性。如果发布者公布音讯的时候,订阅者处于某种原因没有接管到,这个音讯就失落了。 4 音讯队列的毛病零碎可用性升高:零碎引入的内部依赖越多则越容易挂掉,以下面的电商为例子,原本订单零碎,调用领取零碎,库存零碎,物流零碎的接口就好了,当初引入MQ进入,万一MQ挂了,整套零碎就奔溃了,所以升高零碎可用性。 零碎复杂性减少:退出MQ后,须要思考,如何保障一致性,如何保障音讯不被反复生产,如何报纸额音讯的传递程序,如何保障音讯牢靠传输等等。 一致性问题:比方下面的电商零碎,订单零碎解决完,间接返回胜利,人都认为你胜利了,然而问题是,物流零碎,领取零碎和库存零碎,只有其中一个挂了,那就会呈现数据不统一的问题。 音讯反复生产的问题:比方kafka里,kafka会给每条数据调配一个offset,消费者依照offset的编号程序去生产,生产后提交offset。消费者不是一生产好一条数据就立即去提交offset,是定期提交一次offset,如果在消费者还没提交的时候,消费者的过程重启了,那么此时曾经生产过的音讯的offset就没有提交,这次kafka会认为你没有生产这条数据,而后从新给你发送。导致反复生产。 数据失落:生产者在写音讯的时候,在网络传输过程就失落了生产者把音讯存到MQ,可是MQ本人出错了,没有保留胜利MQ收到音讯,并且暂存在内存里,可是在消费者还没来得及生产之前MQ挂了,导致暂存在内存的数据失落消费者生产到这个音讯,可是还没来得及解决本人就坏了,导致MQ认为他曾经解决完了,不再给他发消息。 5 常见的音讯队列 此外还有Amazon SQS/SNS 和 Aliyun MQTT.下一章节会着重介绍下 Aliyun MQTT和 RocketMQ的区别和应用场景。 参考:《RocketMQ实战与原理解析》https://my.oschina.net/u/4383...https://www.jianshu.com/p/fdd...https://www.erlang-solutions....

July 13, 2020 · 1 min · jiezi

聊聊rocketmqclientgo的QueueSelector

序本文主要研究一下rocketmq-client-go的QueueSelector QueueSelectorrocketmq-client-go-v2.0.0/producer/selector.go type QueueSelector interface { Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue}QueueSelector接口,定义了Select方法manualQueueSelectorrocketmq-client-go-v2.0.0/producer/selector.go type manualQueueSelector struct{}func NewManualQueueSelector() QueueSelector { return new(manualQueueSelector)}func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { return message.Queue}manualQueueSelector的select方法直接返回message.QueueNewRandomQueueSelectorrocketmq-client-go-v2.0.0/producer/selector.go type randomQueueSelector struct { rander *rand.Rand}func NewRandomQueueSelector() QueueSelector { s := new(randomQueueSelector) s.rander = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) return s}func (r randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { i := r.rander.Intn(len(queues)) return queues[i]}NewRandomQueueSelector方法先创建randomQueueSelector,然后设置其rander;Select方法通过r.rander.Intn(len(queues))随机选择index,然后从queue取值roundRobinQueueSelectorrocketmq-client-go-v2.0.0/producer/selector.go type roundRobinQueueSelector struct { sync.Locker indexer map[string]*int32}func NewRoundRobinQueueSelector() QueueSelector { s := &roundRobinQueueSelector{ Locker: new(sync.Mutex), indexer: map[string]*int32{}, } return s}func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { t := message.Topic if _, exist := r.indexer[t]; !exist { r.Lock() if _, exist := r.indexer[t]; !exist { var v = int32(0) r.indexer[t] = &v } r.Unlock() } index := r.indexer[t] i := atomic.AddInt32(index, 1) if i < 0 { i = -i atomic.StoreInt32(index, 0) } qIndex := int(i) % len(queues) return queues[qIndex]}roundRobinQueueSelector的qIndex为int(i) % len(queues)hashQueueSelectorrocketmq-client-go-v2.0.0/producer/selector.go ...

July 4, 2020 · 2 min · jiezi

activemq-延时消息

在mq 的安装目录下 的conf/activemq.xml在配置文件的40 行里加上schedulerSupport="true"<broker xmlns\="http://activemq.apache.org/schema/core" brokerName\="localhost" dataDirectory\="${activemq.data}" schedulerSupport\="true"\> 将修改的文件保存,服务重启,mq 的延时功能就可以正常执行了

June 30, 2020 · 1 min · jiezi

RabbitMQ入门指南

简介RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理(英语:Message broker)软件(亦称面向消息的中间件(英语:Message-oriented middleware))。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在OTP(Open Telecom Platform)上的。所有主要的编程语言均有与代理接口通讯的客户端函式库。 主要特性可靠性(Reliability)RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 灵活的路由(Flexible Routing)在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 消息集群(Clustering)多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 高可用(Highly Available Queues)队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 多种协议(Multi-protocol)RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 多语言客户端(Many Clients)RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 管理界面(Management UI)RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 跟踪机制(Tracing)如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 插件机制(Plugin System)RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。 基本概念(以下使用AMQP-0-9-1版本)先放一张主要结构图 BrokerBroker(可以理解为RabbitMQ)从发布者(发布消息的应用程序,也称为生产者)接收消息,并将其路由到消费者(处理消息的应用程序)。 ExchangeExchange是 AMQP 0-9-1实体,用于发送消息。 Exchange接收消息并将其路由到零个或多个队列。 所使用的路由算法取决于称为绑定的交换类型和规则。 Amqp 0-9-1型经纪商提供四种交易所类型: 类型默认名称Direct exchange空字符串或amq.directFanout exchangeamq.fanoutTopic exchangeamq.topicHeaders exchangeamq.match(以及 RabbitMQ 中的 amq.headers)Default Exchange默认交换是直接交换,没有代理预先声明的名称(空字符串)。 它有一个特殊的属性,这使得它对简单的应用程序非常有用: 创建的每个队列都自动用与队列名相同的路由键绑定到它。 Direct Exchange直接交换器根据邮件路由键将邮件传递到队列。 直接交换对于消息的单播路由是理想的(尽管它们也可以用于广播路由)。 以下是它的工作原理: ...

June 22, 2019 · 1 min · jiezi

多维度对比5款主流分布式MQ消息队列妈妈再也不担心我的技术选型了

1、引言对于即时通讯系统(包括IM、消息推送系统等)来说,MQ消息中件间是非常常见的基础软件,但市面上种类众多、各有所长的MQ消息中件间产品,该怎么去选择?这是个问题! 对于很多经验不足的开发者来说,一个公司内部用的IM聊天系统,总用户量也不过百十来人,动辄就是Kafka、MongoDB,美其名曰为了高性能和可扩展性,真是大炮打蚊子。而对于中大型的即时通讯场景来说,有的开发者确为了贪图使用简单、资料全面,反而使用臃肿不堪的ActiveMQ,这就有点失去章法了。 唧唧歪歪这么多,那什么样的场景到底该用哪种MQ消息中件间产品合适?读完本文您或许就有了答案。 本文将从17个维度综合对比Kafka、RabbitMQ、ZeroMQ、RocketMQ、ActiveMQ这5款当前最主流的MQ消息中间件产品,希望能为您的下一次产品的架构设计和MQ消息中间件选型提供参考依据。 学习交流: 即时通讯/推送技术开发交流4群:101279154[推荐]移动端IM开发入门文章:《新手入门一篇就够:从零开发移动端IM》(本文同步发布于:http://www.52im.net/thread-26...) 2、相关资料官网地址: Kafka:http://kafka.apache.org/ RabbitMQ:https://www.rabbitmq.com/ ZeroMQ:http://zeromq.org/ RocketMQ:http://rocketmq.apache.org/ ActiveMQ:http://activemq.apache.org/ 相关文章: 《IM开发基础知识补课(五):通俗易懂,正确理解并用好MQ消息队列》 《IM系统的MQ消息中间件选型:Kafka还是RabbitMQ?》 3、维度1:资料文档1)Kafka:资料数量中等。有Kafka作者自己写的书,网上资料也有一些。 2)RabbitMQ:资料数量多。有一些不错的书,网上资料多。 3)ZeroMQ:资料数量少。专门写ZeroMQ的书较少,网上的资料多是一些代码的实现和简单介绍。 4)RocketMQ:资料数量少。专门写RocketMQ的书目前有了两本;网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。 5)ActiveMQ:资料数量多。没有专门写ActiveMQ的书,网上资料多。 4、维度2:开发语言1)Kafka:Scala 2)RabbitMQ:Erlang 3)ZeroMQ:C语言 4)RocketMQ:Java 5)ActiveMQ:Java 5、维度3:支持的协议1)Kafka:自己定义的一套…(基于TCP) 2)RabbitMQ:AMQP 3)ZeroMQ:TCP、UDP 4)RocketMQ:自己定义的一套… 5)ActiveMQ:OpenWire、STOMP、REST、XMPP、AMQP 6、维度4:消息存储1)Kafka: 内存、磁盘、数据库。支持大量堆积。 Kafka的最小存储单元是分区,一个topic包含多个分区,Kafka创建主题时,这些分区会被分配在多个服务器上,通常一个broker一台服务器。 分区首领会均匀地分布在不同的服务器上,分区副本也会均匀的分布在不同的服务器上,确保负载均衡和高可用性,当新的broker加入集群的时候,部分副本会被移动到新的broker上。 根据配置文件中的目录清单,Kafka会把新的分区分配给目录清单里分区数最少的目录。 默认情况下,分区器使用轮询算法把消息均衡地分布在同一个主题的不同分区中,对于发送时指定了key的情况,会根据key的hashcode取模后的值存到对应的分区中。 2)RabbitMQ: 内存、磁盘。支持少量堆积。 RabbitMQ的消息分为持久化的消息和非持久化消息,不管是持久化的消息还是非持久化的消息都可以写入到磁盘。 持久化的消息在到达队列时就写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。 非持久化的消息一般只存在于内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存。 引入镜像队列机制,可将重要队列“复制”到集群中的其他broker上,保证这些队列的消息不会丢失。 配置镜像的队列,都包含一个主节点master和多个从节点slave,如果master失效,加入时间最长的slave会被提升为新的master,除发送消息外的所有动作都向master发送,然后由master将命令执行结果广播给各个slave,RabbitMQ会让master均匀地分布在不同的服务器上,而同一个队列的slave也会均匀地分布在不同的服务器上,保证负载均衡和高可用性。 3)ZeroMQ: 消息发送端的内存或者磁盘中。不支持持久化。 4)RocketMQ: 磁盘。支持大量堆积。 commitLog文件存放实际的消息数据,每个commitLog上限是1G,满了之后会自动新建一个commitLog文件保存数据。 ConsumeQueue队列只存放offset、size、tagcode,非常小,分布在多个broker上。 ConsumeQueue相当于CommitLog的索引文件,消费者消费时会从consumeQueue中查找消息在commitLog中的offset,再去commitLog中查找元数据。 ConsumeQueue存储格式的特性,保证了写过程的顺序写盘(写CommitLog文件),大量数据IO都在顺序写同一个commitLog,满1G了再写新的。 加上RocketMQ是累计4K才强制从PageCache中刷到磁盘(缓存),所以高并发写性能突出。 5)ActiveMQ: 内存、磁盘、数据库。支持少量堆积。 7、维度5:消息事务1)Kafka:支持 2)RabbitMQ:支持。客户端将信道设置为事务模式,只有当消息被RabbitMQ接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降 3)ZeroMQ:不支持 4)RocketMQ:支持 5)ActiveMQ:支持 8、维度6:负载均衡8.1 Kafka支持负载均衡。 1)一个broker通常就是一台服务器节点。 对于同一个Topic的不同分区,Kafka会尽力将这些分区分布到不同的Broker服务器上,zookeeper保存了broker、主题和分区的元数据信息。 分区首领会处理来自客户端的生产请求,Kafka分区首领会被分配到不同的broker服务器上,让不同的broker服务器共同分担任务。 每一个broker都缓存了元数据信息,客户端可以从任意一个broker获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。 2)Kafka的消费者组订阅同一个topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。 ...

June 21, 2019 · 3 min · jiezi

开源一个kafka增强okmq100

本工具的核心思想就是:赌。只有两个基础组件同时死亡,才会受到严重影响。哦,断电除外。mq是个好东西,我们都在用。这也决定了mq应该是高高高可用的。某团就因为这个组件,出了好几次生产事故,呵呵。 大部分业务系统,要求的消息语义都是at least once,即都会有重复消息,但保证不会丢。即使这样,依然有很多问题: 一、mq可用性无法保证。 mq的意外死亡,造成生产端发送失败。很多消息要通过扒取日志进行回放,成本高耗时长。 二、mq阻塞业务正常进行。 mq卡顿或者网络问题,会造成业务线程卡在mq的发送方法上,正常业务进行不下去,造成灾难性的后果。 三、消息延迟。 mq死了就用不着说了,消息还没投胎就已死亡。消息延迟主要是客户端消费能力不强,或者是消费通道单一造成的。 使用组合存储来保证消息的可靠投递,就是okmq。 注意:okmq注重的是可靠性。对于顺序性、事务等其他要素,不予考虑。当然,速度是必须的。设计想法我即使用两套redis来模拟一些mq操作,都会比现有的一些解决方案要强。但这肯定不是我们需要的,因为redis的堆积能力太有限,内存占用率直线上升的感觉并不太好。 但我们可以用redis来作为额外的发送确认机制。这个想法,在《使用多线程增加kafka消费能力》一文中曾经提到过,现在到了实现的时候了。 首先看下使用ApiOkmqKafkaProducer producer = new ProducerBuilder().defaultSerializer().eanbleHa("redis").any("okmq.redis.mode", "single").any("okmq.redis.endpoint", "127.0.0.1:6379").any("okmq.redis.poolConfig.maxTotal", 100).servers("localhost:9092").clientID("okMQProducerTest").build();Packet packet = new Packet();packet.setTopic("okmq-test-topic");packet.setContent("i will send you a msg");producer.sendAsync(packet, null);producer.shutdown();以redis为例我们按照数字标号来介绍: 1、 在消息发送到kafka之前,首先入库redis。由于后续回调需要用到一个唯一表示,我们在packet包里添加了一个uuid。 2、 调用底层的api,进行真正的消息投递。 3、 通过监听kafka的回调,删除redis中对应的key。在这里可以得到某条消息确切的的ack时间。那么长时间没有删除的,就算是投递失败的消息。 4、 后台会有一个线程进行这些失败消息的遍历和重新投递。我们叫做recovery。最复杂的也就是这一部分。对于redis来说,会首先争抢一个持续5min的锁,然后遍历相关hashkey。 所以,对于以上代码,redis发出以下命令: 1559206423.395597 [0 127.0.0.1:62858] "HEXISTS" "okmq:indexhash" "okmq:5197354"1559206423.396670 [0 127.0.0.1:62858] "HSET" "okmq:indexhash" "okmq:5197354" ""1559206423.397300 [0 127.0.0.1:62858] "HSET" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" "{\"content\":\"i will send you a msg104736623015238\",\"topic\":\"okmq-test-topic\",\"identify\":\"2b9b33fd-95fd-4cd6-8815-4c572f13f76e\",\"timestamp\":1559206423318}"1559206423.676212 [0 127.0.0.1:62858] "HDEL" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e"1559206428.327788 [0 127.0.0.1:62861] "SET" "okmq:recovery:lock" "01fb85a9-0670-40c3-8386-b2b7178d4faf" "px" "300000"1559206428.337930 [0 127.0.0.1:62858] "HGETALL" "okmq:indexhash"1559206428.341365 [0 127.0.0.1:62858] "HSCAN" "okmq:5197354" "0"1559206428.342446 [0 127.0.0.1:62858] "HDEL" "okmq:indexhash" "okmq:5197354"1559206428.342788 [0 127.0.0.1:62861] "GET" "okmq:recovery:lock"1559206428.343119 [0 127.0.0.1:62861] "DEL" "okmq:recovery:lock"以上问题解答所以对于以上的三个问题,回答如下:一、mq可用性无法保证。 ...

June 5, 2019 · 1 min · jiezi

Kafka压缩详解初稿

Kafka压缩概括需要理解kafka压缩则需要理解Kafka的存储格式. Kafka存储格式RecordBatch baseOffset: int64batchLength: int32partitionLeaderEpoch: int32magic: int8 (current magic value is 2)crc: int32attributes: int16 bit 0~2: 0: no compression 1: gzip 2: snappy 3: lz4 4: zstd bit 3: timestampType bit 4: isTransactional (0 means not transactional) bit 5: isControlBatch (0 means not a control batch) bit 6~15: unusedlastOffsetDelta: int32firstTimestamp: int64maxTimestamp: int64producerId: int64producerEpoch: int16baseSequence: int32records: [Record]Record length: varintattributes: int8 bit 0~7: unusedtimestampDelta: varintoffsetDelta: varintkeyLength: varintkey: byte[]valueLen: varintvalue: byte[]Headers => [Header]Record Header ...

May 21, 2019 · 2 min · jiezi

Kafka消息过长详解

Kafka发送消息大小问题⚠️ 本文实验的Kafka版本为2.11版本. 消息概述kafka中的消息指的就是一条ProducerRecord,里面除了携带发送的数据之外,还包含: topic 发往的Topicpartition 发往的分区headers 头信息key 数据value 数据timestamp-long 时间戳Producer生产消息过长在生产者发送消息的时候,并不是上面所有的信息都算在发送的消息大小.详情见下面代码. 上面的代码会将value序列化成字节数组,参与序列化的有topic,headers,key. 用来验证value是否超出长度的是ensureValidRecordSize(serializedSize);方法. ensureValidRecordSize从两个方面验证,一个是maxRequestSize(max.request.size),另一个是totalMemorySize(buffer.memory), 只有当value的长度同时小于时,消息才可以正常发送. private void ensureValidRecordSize(int size) { if (size > this.maxRequestSize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the maximum request size you have configured with the " + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration."); if (size > this.totalMemorySize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration.");}单条消息过长或产生如下错误. ...

May 19, 2019 · 2 min · jiezi

RabbitMQ快速入门

一、前言RabbitMQ其实是我最早接触的一个MQ框架,我记得当时是在大学的时候跑到图书馆一个人去看,由于RabbitMQ官网的英文还不算太难,因此也是参考官网学习的,一共有6章,当时是用Node来开发的,当时花了一下午看完了,也理解了。而现在回过头来再看,发现已经忘记了个差不多了,现在再回过头来继续看看,然乎记之。以防再忘,读者看时最好有一定的MQ基础。 二、RabbitMQ首先我们需要知道的是RabbitMQ它是基于高级队列协议(AMQP)的,它是Elang编写的,下面将围绕RabbitMQ队列、交换机、RPC三个重点进行展开。 2.1、队列存储消息的地方,多个生产者可以将消息发送到一个队列,多个消费者也可以消费同一个队列的消息。 注意:当多个消费者监听一个队列,此时生产者发送消息到队列只有一个消费者被消费,并且消费端的消费方式是按照消费端在内部启动的顺序轮询(round-robin)。2.2、消费者消费消息的一方 public class Send { private final static String QUEUE_NAME = "hello"; private final static String IP = "172.16.12.162"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername("admin"); factory.setPassword("admin"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); }catch (Exception e){ e.printStackTrace(); } }}public class Recv { private final static String QUEUE_NAME = "hello"; private final static String IP = "172.16.12.162"; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }catch (Exception e){ e.printStackTrace(); } }}2.3、小结1、Rabbit是如何保证消息被消费的?答:通过ack机制。每当一个消息被消费端消费的时候,消费端可以发送一个ack给RabbitMQ,这样RabbitMQ就知道了该条消息已经被完整消费并且可以被delete了。;如果一条消息被消费但是没有发送ack,那么此时RabbitMQ将会认为需要重新消费该消息,如果此时还有其它的消费者,那么此时RabbitMQ将会把这条消息交给它处理。 ...

May 7, 2019 · 3 min · jiezi

利用blink+MQ实现流计算中的超时统计问题

案例与解决方案汇总页:阿里云实时计算产品案例&解决方案汇总一. 背景介绍菜鸟的物流数据本身就有链路复杂、实操节点多、汇总维度多、考核逻辑复杂的特点,对于实时数据的计算存在很大挑战。经过仓配ETL团队的努力,目前仓配实时数据已覆盖了绝大多数场景,但是有这样一类特殊指标:“晚点超时指标”(例如:出库超6小时未揽收的订单量),仍存在实时汇总计算困难。原因在于:流计算是基于消息触发计算的,若没有消息到达到则无法计算,这类指标恰好是要求在指定的超时时间计算出有多少未达到的消息。然而,这类指标对于指导实操有着重要意义,可以告知运营小二当前多少订单积压在哪些作业节点,应该督促哪些实操人员加快作业,这对于物流的时效KPI达成至关重要。之前的方案是:由产品前端根据用户的请求查询OLAP数据库,由OLAP从明细表出结果。大促期间,用户请求量大,加之数据量大,故对OLAP的明细查询造成了比较大的压力。二. 解决方案2.1 问题定义“超时晚点指标” 是指,一笔订单的两个相邻的实操节点node_n-1 、node_n 的完成时间 time_n-1、time_n,当满足 : time_n is null && current_time - time_n-1 > kpi_length 时,time_flag_n 为 true , 该笔订单计入 超时晚点指标的计数。如下图,有一笔订单其 node_1 为出库节点,时间为time_1 = ‘2018-06-18 00:00:00’ ,运营对出库与揽收之间考核的时长 kpi_length = 6h, 那么当前自然时间 current_time > ‘2018-06-18 06:00:00’ 时,且node_2揽收节点的time_2 为null,则该笔订单的 timeout_flag_2 = true , “出库超6小时未揽收订单量” 加1。由于要求time_2 为null,即要求没有揽收消息下发的情况下让流计算做汇总值更新,这违背了流计算基于消息触发的基本原理,故流计算无法直接算出这种“超时晚点指标”。决问题的基本思路是:在考核时刻(即 kpi_time = time_n-1+kpi_length )“制造”出一条消息下发给流计算,触发汇总计算。继续上面的例子:在考核时刻“2018-06-18 06:00:00”利用MetaQ定时消息功能“制造”出一条消息下发给流计算汇总任务,触发对该笔订单的 time_out_flag_2 的判断,增加汇总计数。同时,还利用 Blink 的Retraction 机制,当time_2 由null变成有值的时候,Blink 可以对 time_out_flag_2 更新,重新计数。2.2 方案架构如上图所示:Step1: Blink job1 接收来自上游系统的订单数据,做清洗加工,生成订单明细表:dwd_ord_ri,利用TT下发给Blink job2 和 Blink job3。Step2:Blink job2 收到 dwd_ord_ri后,对每笔订单算出考核时刻 kpi_time = time_n-1+kpi_length,作为MetaQ消息的“TIMER_DELIVER_MS” 属性,写入MetaQ。MetaQ的定时消息功能,可以根据用户写入的TIMER_DELIVER_MS 在指定时刻下发给消费者,即上图中的Blink job3。Step3:Blink job3 接收 TT、MetaQ 两个消息源,先做Join,再对time_flag判断,最后做Aggregate计算。同一笔订单,dwd_ord_ri、timing_msg任意一个消息到来,都会触发join,time_flag判断,aggregate重新计算一遍,Blink的Retraction可对结果进行实时更新。2.3 实现细节本方案根据物流场景中多种实操节点、多种考核时长的特点,从Blink SQL代码 和 自定义Sink两方面做了特殊设计,从而实现了灵活配置、高效开发。(1) Blink job2 — 生成定时消息关键Blink SQL 代码如下。约定每条record的第一个字段为投递时间列表,即MetaQ向消费者下发消息的时刻List,也就是上面所说的多个考核时刻。第二个字段为保序字段,比如在物流场景中经常以订单code、运单号作为保序主键。该代码实现了对每个出库的物流订单,根据其出库时间,向后延迟6小时(21600000毫秒)、12小时(43200000毫秒)、24小时(86400000毫秒)由MetaQ向消费者下发三个定时消息。create table metaq_timing_msg(deliver_time_list varchar comment ‘投递时间列表’, – 约定第一个字段为投递时间listlg_code varchar comment ‘物流订单code’, – 约定第二字段为保序主键node_name varchar comment ‘节点名称’,node_time varchar comment ‘节点时间’,)WITH(type = ‘custom’,class = ‘com.alibaba.xxx.xxx.udf.MetaQTimingMsgSink’,tag = ‘store’,topic = ‘blink_metaq_delay_msg_test’,producergroup = ‘blinktest’,retrytimes = ‘5’,sleeptime = ‘1000’);insert into metaq_timing_msgselectconcat_ws(’,’,cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 21600000) as varchar), –6小时cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 43200000) as varchar), –12小时cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 86400000) as varchar) –24小时) as deliver_time_list,lg_code,‘wms’ as node_name,store_out_time as node_timefrom(selectlg_code,FIRST_VALUE(store_out_time) as store_out_timefrom srctablegroup by lg_code)bwhere store_out_time is not null ;(2) Blink 自定义Sink — MetaQTimingMsg SinkBlink的当前版本还不支持 MetaQ的定时消息功能的Sink,故利用 Blink的自定义Sink功能,并结合菜鸟物流数据的特点开发了MetaQTimingMsg Sink。关键代码如下(实现 writeAddRecord 方法)。@Overridepublic void writeAddRecord(Row row) throws IOException {Object deliverTime = row.getField(0);String[] deliverTimeList = deliverTime.toString().split(",");for(String dTime:deliverTimeList){ String orderCode = row.getField(1).toString(); String key = orderCode + “_” + dTime; Message message = newMessage(row, dTime, key); boolean result = sendMessage(message,orderCode); if(!result){ LOG.error(orderCode + " : " + dTime + " send failed"); } }}private Message newMessage(Row row,String deliverMillisec,String key){ //Support Varbinary Type Insert Into MetaQ Message message = new Message(); message.setKeys(key); message.putUserProperty(“TIMER_DELIVER_MS”,deliverMillisec); int arity = row.getArity(); Object[] values = new Object[arity]; for(int i=0;i<arity;i++){ values[i]=row.getField(i); } String lineStr=org.apache.commons.lang3.StringUtils.join(values, FIELD_DELIMITER); try { byte[] bytes = lineStr.getBytes(ENCODING); message.setBody(bytes); message.setWaitStoreMsgOK(true); } catch (UnsupportedEncodingException e) { LOG.error(“create new message error”,e); } return message;}private boolean sendMessage(Message message,String orderCode){ long retryTime = 0; boolean isSendSuccess = true; if(message != null){ message.setTopic(topicName); message.setTags(tagName); } SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { …. // 针对物流订单code的hash算法 return list.get(index.intValue()); } },orderCode); if(!result.getSendStatus().equals(SendStatus.SEND_OK)){ LOG.error("" + orderCode +" write to metaq result is " + result.getSendStatus().toString()); isSendSuccess = false; } return isSendSuccess;}}(3)Blink job3 — 汇总计算关键Blink SQL 代码如下,统计了每个仓库的“出库超6小时未揽收物理订单”、“出库超12小时未揽收物理订单”、“出库超24小时未揽收物理订单”的汇总值。代码中使用了“stringLast()”函数处理来自dwd_ord_ri的每条消息,以取得每个物流订单的最新出库揽收情况,利用Blink Retraction机制,更新汇总值。create view dws_store_view as select t1.store_code, max(t1.store_name) as store_name, count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,‘yyyy-MM-dd HH:mm:ss’) >= 21600 then t2.lg_code end ) as tms_not_collect_6h_ord_cnt, —出库超6小时未揽收物流订单量 count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,‘yyyy-MM-dd HH:mm:ss’) >= 43200 then t2.lg_code end ) as tms_not_collect_12h_ord_cnt,—出库超6小时未揽收物流订单量 count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,‘yyyy-MM-dd HH:mm:ss’) >= 86400 then t2.lg_code end ) as tms_not_collect_24h_ord_cnt —出库超6小时未揽收物流订单量from ( select lg_code, coalesce(store_code,’-1’) as store_code, store_name, store_out_time, tms_collect_time from ( select lg_code, max(store_code) as store_code, max(store_name) as store_name, stringLast(store_out_time) as store_out_time, stringLast(tms_collect_time)as tms_collect_time, from dwd_ord_ri group by lg_code ) a ) t1left outer join ( select lg_code, from timing_msg where node_name = ‘wms’ group by lg_code) t2on t1.lg_code = t2.lg_codegroup by t1.store_code ;三. 方案优势3.1 配置灵活我们从“Blink SQL 代码” 和“自定义MetaQ” 两个方面设计,用户可以根据具体的业务场景,在Blink SQL的一个view里就能实现多种节点多种考核时间的定时消息生成,而不是针对每一个实操节点的每一种定时指标都要写一个view,这样大大节省了代码量,提升了开发效率。例如对于仓库节点的出库超6小时未揽收、超12小时未揽收、超24小时未揽收,这三个指标利用上述方案,仅需在Blink job2的中metaq_timing_msg的第一个字段deliver_time_list中拼接三个kpi_length,即6小时、12小时、24小时为一个字符串即可,由MetaQTimingMsg Sink自动拆分成三条消息下发给MetaQ。对于不同的节点的考核,仅需在node_name,node_time填写不同的节点名称和节点实操时间即可。3.2 主键保序如2.3节所述,自定义的Sink中 实现了MetaQ的 MessageQueueSelector 接口的 select() 方法,同时在Blink SQL 生成的MetaQ消息默认第二个字段为保序主键字段。从而,可以根据用户自定义的主键,保证同一主键的所有消息放在同一个通道内处理,从而保证按主键保序,这对于流计算非常关键,能够实现数据的实时准确性。3.3 性能优良让专业的团队做专业的事。个人认为,这种大规模的消息存储、消息下发的任务本就应该交给“消息中间件”来处理,这样既可以做到计算与消息存储分离,也可以方便消息的管理,比如针对不同的实操节点,我们还可以定义不同的MetaQ的tag。另外,正如2.2节所述,我们对定时消息量做了优化。考虑到一笔订单的属性字段或其他节点更新会下发多条消息,我们利用了Blink的FIRST_VALUE函数,在Blink job2中同一笔订单的的一种考核指标只下发一条定时消息,大大减少了消息量,减轻了Blink的写压力,和MetaQ的存储。四. 自我介绍马汶园 阿里巴巴 -菜鸟网络—数据部 数据工程师菜鸟仓配实时研发核心成员,主导多次仓配大促实时数据研发,对利用Blink的原理与特性解决物流场景问题有深入思考与理解。本文作者:付空阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

March 6, 2019 · 3 min · jiezi

NATS--NATS Streaming持久化

前言最近项目中需要使用到一个消息队列,主要用来将原来一些操作异步化。根据自己的使用场景和熟悉程度,选择了NATS Streaming。之所以,选择NATS Streaming。一,因为我选型一些中间件,我会优先选取一些自己熟悉的语言编写的,这样方便排查问题和进一步的深究。二,因为自己一直做k8s等云原生这块,偏向于cncf基金会管理的项目,毕竟这些项目从一开始就考虑了如何部署在k8s当中。三,是评估项目在不断发展过程中,引入的组件是否能够依旧满足需求。消息队列的使用场景如果问为什么这么做,需要说一下消息队列的使用场景。之前看知乎的时候,看到一些回答比较认同,暂时拿过来,更能形象表达。感谢ScienJus同学的精彩解答。消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。使用场景的话,举个例子:假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:校验用户名等信息,如果没问题会在数据库中添加一个用户记录如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他发送给用户一个包含操作指南的系统通知等等……但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结果,由消息队列异步的进行这些操作。或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。所以在软件的正常功能开发中,并不需要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在可以异步处理的耗时操作,如果存在的话便可以引入消息队列来解决。否则盲目的使用消息队列可能会增加维护和开发的成本却无法得到可观的性能提升,那就得不偿失了。其实,总结一下消息队列的作用削峰,形象点的话,可以比喻为蓄水池。比如elk日志收集系统中的kafka,主要在日志高峰期的时候,在牺牲实时性的同时,保证了整个系统的安全。同步系统异构化。原先一个同步操作里的诸多步骤,可以考虑将一些不影响主线发展的步骤,通过消息队列异步处理。比如,电商行业,一个订单完成之后,一般除了直接返回给客户购买成功的消息,还要通知账户组进行扣费,通知处理库存变化,通知物流进行派送等,通知一些用户组做一些增加会员积分等操作等。NATS Streaming 简介NATS Streaming是一个由NATS驱动的数据流系统,用Go编程语言编写。 NATS Streaming服务器的可执行文件名是nats-streaming-server。 NATS Streaming与核心NATS平台无缝嵌入,扩展和互操作。 NATS Streaming服务器作为Apache-2.0许可下的开源软件提供。 Synadia积极维护和支持NATS Streaming服务器。特点除了核心NATS平台的功能外,NATS Streaming还提供以下功能:增强消息协议NATS Streaming使用谷歌协议缓冲区实现自己的增强型消息格式。这些消息通过二进制数据流在NATS核心平台进行传播,因此不需要改变NATS的基本协议。NATS Streaming信息包含以下字段: - 序列 - 一个全局顺序序列号为主题的通道 - 主题 - 是NATS Streaming 交付对象 - 答复内容 - 对应"reply-to"对应的对象内容 - 数据 - 真是数据内容 - 时间戳 - 接收的时间戳,单位是纳秒 - 重复发送 - 标志这条数据是否需要服务再次发送 - CRC32 - 一个循环冗余数据校验选项,在数据存储和数据通讯领域里,为了保证数据的正确性所采用的检错手段,这里使用的是 IEEE CRC32 算法 - 消息/事件的持久性 NATS Streaming提供了可配置的消息持久化,持久目的地可以为内存或者文件。另外,对应的存储子系统使用了一个公共接口允许我们开发自己自定义实现来持久化对应的消息 - 至少一次的发送 NATS Streaming提供了发布者和服务器之间的消息确认(发布操作) 和订阅者和服务器之间的消息确认(确认消息发送)。其中消息被保存在服务器端内存或者辅助存储(或其他外部存储器)用来为需要重新接受消息的订阅者进行重发消息。 - 发布者发送速率限定 NATS Streaming提供了一个连接选项叫 MaxPubAcksInFlight,它能有效的限制一个发布者可能随意的在任何时候发送的未被确认的消息。当达到这个配置的最大数量时,异步发送调用接口将会被阻塞,直到未确认消息降到指定数量之下。- 每个订阅者的速率匹配/限制 NATS Streaming运行指定的订阅中设置一个参数为 MaxInFlight,它用来指定已确认但未消费的最大数据量,当达到这个限制时,NATS Streaming 将暂停发送消息给订阅者,直到未确认的数据量小于设定的量为止以主题重发的历史数据 新订阅的可以在已经存储起来的订阅的主题频道指定起始位置消息流。通过使用这个选项,消息就可以开始发送传递了: 1. 订阅的主题存储的最早的信息 2. 与当前订阅主题之前的最近存储的数据,这通常被认为是 “最后的值” 或 “初值” 对应的缓存 3. 一个以纳秒为基准的 日期/时间 4. 一个历史的起始位置相对当前服务的 日期/时间,例如:最后30秒 5. 一个特定的消息序列号持久订阅 订阅也可以指定一个“持久化的名称”可以在客户端重启时不受影响。持久订阅会使得对应服务跟踪客户端最后确认消息的序列号和持久名称。当这个客户端重启或者重新订阅的时候,使用相同的客户端ID 和 持久化的名称,对应的服务将会从最早的未被确认的消息处恢复。docker 运行NATS Streaming在运行之前,前面已经讲过NATS Streaming 相比nats,多了持久化的一个future。所以我们在接下来的demo演示中,会重点说这点。运行基于memory的持久化示例:docker run -ti -p 4222:4222 -p 8222:8222 nats-streaming:0.12.0你将会看到如下的输出:[1] 2019/02/26 08:13:01.769734 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0[1] 2019/02/26 08:13:01.769811 [INF] STREAM: ServerID: arfYGWPtu7Cn8Ojcb1yko3[1] 2019/02/26 08:13:01.769826 [INF] STREAM: Go version: go1.11.5[1] 2019/02/26 08:13:01.770363 [INF] Starting nats-server version 1.4.1[1] 2019/02/26 08:13:01.770398 [INF] Git commit [not set][4] 2019/02/26 08:13:01.770492 [INF] Starting http monitor on 0.0.0.0:8222[1] 2019/02/26 08:13:01.770555 [INF] Listening for client connections on 0.0.0.0:4222[1] 2019/02/26 08:13:01.770581 [INF] Server is ready[1] 2019/02/26 08:13:01.799435 [INF] STREAM: Recovering the state…[1] 2019/02/26 08:13:01.799461 [INF] STREAM: No recovered state[1] 2019/02/26 08:13:02.052460 [INF] STREAM: Message store is MEMORY[1] 2019/02/26 08:13:02.052552 [INF] STREAM: ———- Store Limits ———-[1] 2019/02/26 08:13:02.052574 [INF] STREAM: Channels: 100 *[1] 2019/02/26 08:13:02.052586 [INF] STREAM: ——— Channels Limits ——–[1] 2019/02/26 08:13:02.052601 [INF] STREAM: Subscriptions: 1000 *[1] 2019/02/26 08:13:02.052613 [INF] STREAM: Messages : 1000000 *[1] 2019/02/26 08:13:02.052624 [INF] STREAM: Bytes : 976.56 MB *[1] 2019/02/26 08:13:02.052635 [INF] STREAM: Age : unlimited *[1] 2019/02/26 08:13:02.052649 [INF] STREAM: Inactivity : unlimited *[1] 2019/02/26 08:13:02.052697 [INF] STREAM: ———————————-可以看出默认的是基于内存的持久化。运行基于file的持久化示例:docker run -ti -v /Users/gao/test/mq:/datastore -p 4222:4222 -p 8222:8222 nats-streaming:0.12.0 -store file –dir /datastore -m 8222你将会看到如下的输出:[1] 2019/02/26 08:16:07.641972 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0[1] 2019/02/26 08:16:07.642038 [INF] STREAM: ServerID: 9d4H6GAFPibpZv282KY9QM[1] 2019/02/26 08:16:07.642099 [INF] STREAM: Go version: go1.11.5[1] 2019/02/26 08:16:07.643733 [INF] Starting nats-server version 1.4.1[1] 2019/02/26 08:16:07.643762 [INF] Git commit [not set][5] 2019/02/26 08:16:07.643894 [INF] Listening for client connections on 0.0.0.0:4222[1] 2019/02/26 08:16:07.643932 [INF] Server is ready[1] 2019/02/26 08:16:07.672145 [INF] STREAM: Recovering the state…[1] 2019/02/26 08:16:07.679327 [INF] STREAM: No recovered state[1] 2019/02/26 08:16:07.933519 [INF] STREAM: Message store is FILE[1] 2019/02/26 08:16:07.933570 [INF] STREAM: Store location: /datastore[1] 2019/02/26 08:16:07.933633 [INF] STREAM: ———- Store Limits ———-[1] 2019/02/26 08:16:07.933679 [INF] STREAM: Channels: 100 *[1] 2019/02/26 08:16:07.933697 [INF] STREAM: ——— Channels Limits ——–[1] 2019/02/26 08:16:07.933711 [INF] STREAM: Subscriptions: 1000 *[1] 2019/02/26 08:16:07.933749 [INF] STREAM: Messages : 1000000 *[1] 2019/02/26 08:16:07.933793 [INF] STREAM: Bytes : 976.56 MB *[1] 2019/02/26 08:16:07.933837 [INF] STREAM: Age : unlimited *[1] 2019/02/26 08:16:07.933857 [INF] STREAM: Inactivity : unlimited *[1] 2019/02/26 08:16:07.933885 [INF] STREAM: ———————————-PS如果部署在k8s当中,那么就可以采取基于file的持久化,通过挂载一个块存储来保证,数据可靠。比如,aws的ebs或是ceph的rbd。4222为客户端连接的端口。8222为监控端口。启动以后访问:localhost:8222,可以看到如下的网页:启动参数解析Streaming Server Options: -cid, –cluster_id <string> Cluster ID (default: test-cluster) -st, –store <string> Store type: MEMORY|FILE|SQL (default: MEMORY) –dir <string> For FILE store type, this is the root directory -mc, –max_channels <int> Max number of channels (0 for unlimited) -msu, –max_subs <int> Max number of subscriptions per channel (0 for unlimited) -mm, –max_msgs <int> Max number of messages per channel (0 for unlimited) -mb, –max_bytes <size> Max messages total size per channel (0 for unlimited) -ma, –max_age <duration> Max duration a message can be stored (“0s” for unlimited) -mi, –max_inactivity <duration> Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited) -ns, –nats_server <string> Connect to this external NATS Server URL (embedded otherwise) -sc, –stan_config <string> Streaming server configuration file -hbi, –hb_interval <duration> Interval at which server sends heartbeat to a client -hbt, –hb_timeout <duration> How long server waits for a heartbeat response -hbf, –hb_fail_count <int> Number of failed heartbeats before server closes the client connection –ft_group <string> Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore -sl, –signal <signal>[=<pid>] Send signal to nats-streaming-server process (stop, quit, reopen) –encrypt <bool> Specify if server should use encryption at rest –encryption_cipher <string> Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES –encryption_key <sting> Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable insteadStreaming Server Clustering Options: –clustered <bool> Run the server in a clustered configuration (default: false) –cluster_node_id <string> ID of the node within the cluster if there is no stored ID (default: random UUID) –cluster_bootstrap <bool> Bootstrap the cluster if there is no existing state by electing self as leader (default: false) –cluster_peers <string> List of cluster peer node IDs to bootstrap cluster state. –cluster_log_path <string> Directory to store log replication data –cluster_log_cache_size <int> Number of log entries to cache in memory to reduce disk IO (default: 512) –cluster_log_snapshots <int> Number of log snapshots to retain (default: 2) –cluster_trailing_logs <int> Number of log entries to leave after a snapshot and compaction –cluster_sync <bool> Do a file sync after every write to the replication log and message store –cluster_raft_logging <bool> Enable logging from the Raft library (disabled by default)Streaming Server File Store Options: –file_compact_enabled <bool> Enable file compaction –file_compact_frag <int> File fragmentation threshold for compaction –file_compact_interval <int> Minimum interval (in seconds) between file compactions –file_compact_min_size <size> Minimum file size for compaction –file_buffer_size <size> File buffer size (in bytes) –file_crc <bool> Enable file CRC-32 checksum –file_crc_poly <int> Polynomial used to make the table used for CRC-32 checksum –file_sync <bool> Enable File.Sync on Flush –file_slice_max_msgs <int> Maximum number of messages per file slice (subject to channel limits) –file_slice_max_bytes <size> Maximum file slice size - including index file (subject to channel limits) –file_slice_max_age <duration> Maximum file slice duration starting when the first message is stored (subject to channel limits) –file_slice_archive_script <string> Path to script to use if you want to archive a file slice being removed –file_fds_limit <int> Store will try to use no more file descriptors than this given limit –file_parallel_recovery <int> On startup, number of channels that can be recovered in parallel –file_truncate_bad_eof <bool> Truncate files for which there is an unexpected EOF on recovery, dataloss may occurStreaming Server SQL Store Options: –sql_driver <string> Name of the SQL Driver (“mysql” or “postgres”) –sql_source <string> Datasource used when opening an SQL connection to the database –sql_no_caching <bool> Enable/Disable caching for improved performance –sql_max_open_conns <int> Maximum number of opened connections to the databaseStreaming Server TLS Options: -secure <bool> Use a TLS connection to the NATS server without verification; weaker than specifying certificates. -tls_client_key <string> Client key for the streaming server -tls_client_cert <string> Client certificate for the streaming server -tls_client_cacert <string> Client certificate CA for the streaming serverStreaming Server Logging Options: -SD, –stan_debug=<bool> Enable STAN debugging output -SV, –stan_trace=<bool> Trace the raw STAN protocol -SDV Debug and trace STAN –syslog_name On Windows, when running several servers as a service, use this name for the event source (See additional NATS logging options below)Embedded NATS Server Options: -a, –addr <string> Bind to host address (default: 0.0.0.0) -p, –port <int> Use port for clients (default: 4222) -P, –pid <string> File to store PID -m, –http_port <int> Use port for http monitoring -ms,–https_port <int> Use port for https monitoring -c, –config <string> Configuration fileLogging Options: -l, –log <string> File to redirect log output -T, –logtime=<bool> Timestamp log entries (default: true) -s, –syslog <string> Enable syslog as log method -r, –remote_syslog <string> Syslog server addr (udp://localhost:514) -D, –debug=<bool> Enable debugging output -V, –trace=<bool> Trace the raw protocol -DV Debug and traceAuthorization Options: –user <string> User required for connections –pass <string> Password required for connections –auth <string> Authorization token required for connectionsTLS Options: –tls=<bool> Enable TLS, do not verify clients (default: false) –tlscert <string> Server certificate file –tlskey <string> Private key for server certificate –tlsverify=<bool> Enable TLS, verify client certificates –tlscacert <string> Client certificate CA for verificationNATS Clustering Options: –routes <string, …> Routes to solicit and connect –cluster <string> Cluster URL for solicited routesCommon Options: -h, –help Show this message -v, –version Show version –help_tls TLS help.源码简单分析NATS Streaming 持久化目前NATS Streaming支持以下4种持久化方式:MEMORYFILESQLRAFT其实看源码可以知道:NATS Streaming的store基于接口实现,很容易扩展到更多的持久化方式。具体的接口如下:// Store is the storage interface for NATS Streaming servers.//// If an implementation has a Store constructor with StoreLimits, it should be// noted that the limits don’t apply to any state being recovered, for Store// implementations supporting recovery.//type Store interface { // GetExclusiveLock is an advisory lock to prevent concurrent // access to the store from multiple instances. // This is not to protect individual API calls, instead, it // is meant to protect the store for the entire duration the // store is being used. This is why there is no Unlock API. // The lock should be released when the store is closed. // // If an exclusive lock can be immediately acquired (that is, // it should not block waiting for the lock to be acquired), // this call will return true with no error. Once a store // instance has acquired an exclusive lock, calling this // function has no effect and true with no error will again // be returned. // // If the lock cannot be acquired, this call will return // false with no error: the caller can try again later. // // If, however, the lock cannot be acquired due to a fatal // error, this call should return false and the error. // // It is important to note that the implementation should // make an effort to distinguish error conditions deemed // fatal (and therefore trying again would invariably result // in the same error) and those deemed transient, in which // case no error should be returned to indicate that the // caller could try later. // // Implementations that do not support exclusive locks should // return false and ErrNotSupported. GetExclusiveLock() (bool, error) // Init can be used to initialize the store with server’s information. Init(info *spb.ServerInfo) error // Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc…). Name() string // Recover returns the recovered state. // Implementations that do not persist state and therefore cannot // recover from a previous run MUST return nil, not an error. // However, an error must be returned for implementations that are // attempting to recover the state but fail to do so. Recover() (*RecoveredState, error) // SetLimits sets limits for this store. The action is not expected // to be retroactive. // The store implementation should make a deep copy as to not change // the content of the structure passed by the caller. // This call may return an error due to limits validation errors. SetLimits(limits *StoreLimits) error // GetChannelLimits returns the limit for this channel. If the channel // does not exist, returns nil. GetChannelLimits(name string) *ChannelLimits // CreateChannel creates a Channel. // Implementations should return ErrAlreadyExists if the channel was // already created. // Limits defined for this channel in StoreLimits.PeChannel map, if present, // will apply. Otherwise, the global limits in StoreLimits will apply. CreateChannel(channel string) (*Channel, error) // DeleteChannel deletes a Channel. // Implementations should make sure that if no error is returned, the // channel would not be recovered after a restart, unless CreateChannel() // with the same channel is invoked. // If processing is expecting to be time consuming, work should be done // in the background as long as the above condition is guaranteed. // It is also acceptable for an implementation to have CreateChannel() // return an error if background deletion is still happening for a // channel of the same name. DeleteChannel(channel string) error // AddClient stores information about the client identified by clientID. AddClient(info *spb.ClientInfo) (*Client, error) // DeleteClient removes the client identified by clientID from the store. DeleteClient(clientID string) error // Close closes this store (including all MsgStore and SubStore). // If an exclusive lock was acquired, the lock shall be released. Close() error}官方也提供了mysql和pgsql两种数据的支持:postgres.db.sqlCREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INTEGER DEFAULT 1, id VARCHAR(1024), proto BYTEA, version INTEGER, PRIMARY KEY (uniquerow));CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id));CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id));CREATE INDEX Idx_ChannelsName ON Channels (name(256));CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT, timestamp BIGINT, size INTEGER, data BYTEA, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq));CREATE INDEX Idx_MsgsTimestamp ON Messages (timestamp);CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT, lastsent BIGINT DEFAULT 0, proto BYTEA, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT, row BIGINT, seq BIGINT DEFAULT 0, lastsent BIGINT DEFAULT 0, pending BYTEA, acks BYTEA, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, row));CREATE INDEX Idx_SubsPendingSeq ON SubsPending (seq);CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT DEFAULT 0);– Updates for 0.10.0ALTER TABLE Clients ADD proto BYTEA;mysql.db.sqlCREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INT DEFAULT 1, id VARCHAR(1024), proto BLOB, version INTEGER, PRIMARY KEY (uniquerow));CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id(256)));CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT UNSIGNED DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id), INDEX Idx_ChannelsName (name(256)));CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT UNSIGNED, timestamp BIGINT, size INTEGER, data BLOB, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq), INDEX Idx_MsgsTimestamp (timestamp));CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT UNSIGNED, lastsent BIGINT UNSIGNED DEFAULT 0, proto BLOB, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT UNSIGNED, row BIGINT UNSIGNED, seq BIGINT UNSIGNED DEFAULT 0, lastsent BIGINT UNSIGNED DEFAULT 0, pending BLOB, acks BLOB, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, row), INDEX Idx_SubsPendingSeq(seq));CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT UNSIGNED DEFAULT 0);# Updates for 0.10.0ALTER TABLE Clients ADD proto BLOB;总结后续会详细解读一下代码实现和一些集群部署。当然肯定少不了如何部署高可用的集群在k8s当中。参阅文章:NATS Streaming详解 ...

February 26, 2019 · 11 min · jiezi

关于微服务架构的思考

最近在项目中遇到了一些问题,一个比较多的问题服务和服务直接调用混乱 a服务调用b b服务调用c c服务调用d 导致后期升级会出现很多问题 如果有个流程图也许会好些 但是没有 因此我陷入了思考, 如果进行重构的话那什么样的架构会是较好的价格 我想 设计模式的六大原则 在此也一样适用什么是好的架构明确的分工,服务之间优雅的调用我给出的一个结果这里简单画的一个草图先介绍一下查询:对应查询操作操作:对应增删改操作分为四层 ui: 页面及后台调用网关层: 路由聚合层:查询聚合 操作聚合服务层:订单服务 商品服务遵循的原则各个服务只专注于自己的功能 由聚合层来协调服务之间的关系维护与调用上层通过http调用下层 下层通过mq通知上层 同级不能调用服务要想调用服务 如 a服务想调用b服务 可以 a通过mq传递给聚合层 然后聚合层根据消息调用b ,服务之前的调用交给 聚合层维护后面还会不断完善这篇文章的

February 17, 2019 · 1 min · jiezi

RocketMQ搭建

RocketMQ也已经加入了apache的开源项目,今天说说windows下的搭建1 下载安装包方式1(自己编译)下载:https://www.apache.org/dyn/cl…方式2,编译好的下载:https://www.apache.org/dyn/cl…2.使用maven编译(方式一需要这一步) > unzip rocketmq-all-4.4.0-source-release.zip > cd rocketmq-all-4.4.0/ > mvn -Prelease-all -DskipTests clean install -U > cd distribution/target/apache-rocketmq3.先启动Name Server> nohup sh bin/mqnamesrv & > tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success…windows平台:配置环境变量:ROCKETMQ_HOME:解压后的mq目录路径,注意是bin目录所在的那层目录start mqnamesrv.cmd4.启动Broker > nohup sh bin/mqbroker -n localhost:9876 & > tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success…windows平台:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true5.关闭服务> sh bin/mqshutdown brokerThe mqbroker(36695) is running…Send shutdown request to mqbroker(36695) OK> sh bin/mqshutdown namesrvThe mqnamesrv(36664) is running…Send shutdown request to mqnamesrv(36664) OK6.可视化管理控制台rocketmq-console-ng下载源码 https://github.com/apache/inc…进入rocketmq-console目录,打包mvn package运行,需要jdk1.8进入target目录java -jar rocketmq-console-ng-1.0.0.jar –server.port=12581 –rocketmq.config.namesrvAddr=127.0.0.1:9876如果不想配置这么多参数,你可以直接在rocketmq-console-ng目录里的application.properties文件中修改完毕后再打包,这样就只需要运行java -jar rocketmq-console-ng-1.0.0.jar启动成功后,我们就可以通过浏览器访问http://localhost:12581进入控制台界面了,如下图: ...

January 29, 2019 · 1 min · jiezi

高级开发人员必备技术:MQ

也许在你们公司从没有使用过MQ,也不知道这东西是用来干什么的,但是一旦你进入大公司你就会发现,这东西处处可见。今天就来说说MQ方面的东西,我公众号有activemq的 demo,大家可以自己去看看。什么是MQMessage Queue简称MQ,中文消息队列。“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。你可以把它理解为一个中间件,帮你完成多个系统或应用间的消息传递。为什么要使用MQ首先它有3个核心,解耦,异步,削峰,因此我们可以想到以下使用场景:你的系统要和多个系统发生关系,别的系统要从你这获取一些数据,今天A系统和你要这样的数据,明天B系统说你的数据有问题,后天C系统让你加个别的数据。你一个人要维护解决很多问题,忙得不可开交。而有了MQ,你就可以通过Pub/Sub 发布订阅消息这么一个模型,系统就跟其它系统彻底解耦了。只要把消息放到队列里,其它系统就自己去处理自己需要的数据,自己不再考虑该给谁发送数据。比如:下完订单,不再去通知库存做同步处理。把该物品的信息放在队列中,库存自己去选择什么时候去处理计算自己的库存。还是上面的例子,之前的流程,user浏览器端发起购物请求3ms,订单系统处理数据库300ms,之后库存系统处理数据库300ms,这样同步情况下加起来就要603ms。即使前端有加载提示框,等待时间超过300ms,人眼是能感受到这种延迟的,体验很不好,速度越快才能留住user。现在使用MQ采用异步消息处理,假如消息放进队列需要3ms,那么最终的响应时间是3+3=6ms,对于创建订单和库存计算user并不关心,这样极大的提高了响应时间。一个大的网站或是应用,总会迎来访问量的高峰,可能是营销活动突然带来的大流量,或是节假日。比如双十一,购物人数突然猛增,并发数提高,数据库的压力突然增大,超出了每秒钟的处理能力,就会导致网站瘫痪。使用mq后,数据库可以不必立马处理这么多的请求,可以自己选择能承受的消息慢慢去处理。所有的消息积压在队列中,而不是同时积压到数据库。加入队列中积压了1亿条数据,而我的数据库只能每秒处理100万条数据,那我就每秒从队列中取出100万条来处理,始终不会超出阈值,这样数据库就不会被挤垮。把峰值慢慢消耗。现在想想你为什么没有使用到mq吧?或是考略使用mq使用后带来的威胁任何事物都有它的两面性,既然有优点那也有缺点:系统可用性降低万一mq挂了,队列里面的数据没有了,其它系统数据还没处理完,这可咋整?系统的复杂度提高了你用个mq是爽了,其它系统也要对应的修改自己的系统,来消费队列中的消息。硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。一致性问题你订单系统操作成功了,但是库存系统却失败了,这样导致了数据的不一致。所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。主流的MQ产品KafkaActiveMQRabbitMQ特性ActiveMQRabbitMQRocketMQKafka单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用上面表格来自:https://github.com/doocs/adva…推荐使用早期大家都在使用ActiveMQ ,适合小型项目,如果你尝试使用MQ,你可以选择。RabbitMQ社区活跃度比较高,开源,有问题可以在社区寻求帮助。但是底层使用了erlang 语言,不是小公司又能力掌控的 。RocketMQ 阿里出品,是用的中国公司比较多,经历过使用场景的考验,且自家产品也在用,不用担心。但是社区活跃度不高。推荐使用。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

January 25, 2019 · 1 min · jiezi

XXL-MQ v1.2.2 发布,分布式消息队列

Release Notes1、访问令牌(accessToken):为提升系统安全性,消息中心和客户端进行安全性校验,双方AccessToken匹配才允许通讯;2、支持批量注册、摘除,提升注册发现性能;升级 xxl-rpc 至 v1.3.1;3、升级 pom 依赖至较新版本;4、表结构调整提升兼容性,表名转小写;5、客户端取消Consumer非空的限制;简介XXL-MQ是一款轻量级分布式消息队列,支持 “并发消息、串行消息、广播消息、延迟消息、事务消息、失败重试、超时控制” 等消息特性。现已开放源代码,开箱即用。特性1、简单易用: 一行代码即可发布一条消息; 一行注解即可订阅一个消息主题;2、轻量级: 部署简单,不依赖第三方服务,一分钟上手;3、消息中心HA:消息中心支持集群部署,可大大提高系统可用性,以及消息吞吐能力;4、消费者HA:消费者支持集群部署,保证消费者可用性;5、三种消息模式:并行消息:消息平均分配在该主题在线消费者,分片方式并行消费;适用于吞吐量较大的消息场景,如邮件发送、短信发送等业务逻辑串行消息:消息固定分配给该主题在线消费者中其中一个,FIFO方式串行消费;适用于严格限制并发的消息场景,如秒杀、抢单等排队业务逻辑;广播消息:消息将会广播发送给该主题在线消费者分组,全部分组都会消费该消息,但是一个分组下只会消费一次;适用于广播场景,如广播更新缓存等6、延时消息: 支持设置消息的延迟生效时间, 到达设置的生效时间时该消息才会被消费;适用于延时消费场景,如订单超时取消等;7、事务性: 消费者开启事务开关后,消息事务性保证只会成功执行一次;8、失败重试: 支持设置消息的重试次数, 在消息执行失败后将会按照设置的值进行消息重试执行,直至重试次数耗尽或者执行成功;9、超时控制: 支持自定义消息超时时间,消息消费超时将会主动中断;10、吞吐量: 依赖于部署的消费中心集群和DB性能;DB可借助多表提升性能,不考虑DB的情况下,吞吐量可以无限横向扩展;可参考示例项目性能测试用例,单机TPS过万;11、消息可见: 系统中每一条消息可通过Web界面在线查看,甚至支持编辑消息内容和消息状态;12、消息可追踪: 支持追踪每一条消息的执行路径, 便于排查业务问题;13、消息失败告警:支持以Topic粒度监控消息,存在失败消息时主动推送告警邮件;默认提供邮件方式失败告警,同时预留扩展接口,可方面的扩展短信、钉钉等告警方式;14、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用;15、消息持久化:全部消息持久化存储,消息中心支持通过配置选择是否清理过期消息。16、访问令牌(accessToken):为提升系统安全性,消息中心和客户端进行安全性校验,双方AccessToken匹配才允许通讯;文档地址中文文档技术交流社区交流

December 21, 2018 · 1 min · jiezi

Kafka消息系统基础知识索引

我们在《360度测试:KAFKA会丢数据么?其高可用是否满足需求?》这篇文章中,详细说明了KAFKA是否适合用在业务系统中。但有些朋友,还不知道KAFKA为何物,以及它为何存在。这在工作和面试中是比较吃亏的,因为不知道什么时候起,KAFKA似乎成了一种工程师的必备技能。一些观念的修正从 0.9 版本开始,Kafka 的标语已经从“一个高吞吐量,分布式的消息系统”改为"一个分布式流平台"。Kafka不仅仅是一个队列,而且是一个存储,有超强的堆积能力。Kafka不仅用在吞吐量高的大数据场景,也可以用在有事务要求的业务系统上,但性能较低。Kafka不是Topic越多越好,由于其设计原理,在数量达到阈值后,其性能和Topic数量成反比。引入了消息队列,就等于引入了异步,不管你是出于什么目的。这通常意味着业务流程的改变,甚至产品体验的变更。消息系统是什么典型场景上图是一些小系统的典型架构。考虑订单的业务场景,有大量的请求指向我们的业务系统,如果直接经过复杂的业务逻辑进入业务表,将会有大量请求超时失败。所以我们加入了一张中间缓冲表(或者Redis),用来承接用户的请求。然后,有一个定时任务,不断的从缓冲表中获取数据,进行真正的业务逻辑处理。这种设计有以下几个问题:定时任务的轮询间隔不好控制。业务处理容易延迟。无法横向扩容处理能力,且会引入分布式锁、顺序性保证等问题。当其他业务也需要这些订单数据的时候,业务逻辑就必须要加入到定时任务里。当访问量增加、业务逻辑复杂化的时候,消息队列就呼之欲出了。请求会暂存在消息队列,然后实时通过推(或者拉)的方式进行处理。在此场景下,消息队列充当了削峰和冗余的组件。消息系统的作用削峰 用于承接超出业务系统处理能力的请求,使业务平稳运行。这能够大量节约成本,比如某些秒杀活动,并不是针对峰值设计容量。缓冲 在服务层和缓慢的落地层作为缓冲层存在,作用与削峰类似,但主要用于服务内数据流转。比如批量短信发送。解耦 项目尹始,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。冗余 消息数据能够采用一对多的方式,供多个毫无关联的业务使用。健壮性 消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。消息系统要求消息系统即然这么重要,那么除了能够保证高可用,对它本身的特性也有较高需求。大体有下面几点:性能要高 包含消息投递和消息消费,都要快。一般通过增加分片数获取并行处理能力。消息要可靠 在某些场景,不能丢消息。生产、消费、MQ端都不能丢消息。一般通过增加副本,强制刷盘来解决。扩展性要好 能够陪你把项目做大,陪你到天荒地老。增加节点集群增大后,不能降低性能。生态成熟 监控、运维、多语言支持、社区的活跃。KAFKA名词解释基本功能Kafka是一个分布式消息(存储)系统。分布式系统通过分片增加并行度;通过副本增加可靠性,kafka也不例外。我们来看一下它的结构,顺便解释一下其中的术语。你在一台机器上安装了Kafka,那么这台机器就叫Broker,KAFKA集群包含了一个或者多个这样的实例。负责往KAFKA写入数据的组件就叫做Producer,消息的生产者一般写在业务系统里。发送到KAFKA的消息可能有多种,如何区别其分类?就是Topic的概念。一个主题分布式化后,可能会存在多个Broker上。将Topic拆成多个段,增加并行度后,拆成的每个部分叫做Partition,分区一般平均分布在所有机器上。那些消费Kafka中数据的应用程序,就叫做Consumer,我们给某个主题的某个消费业务起一个名字,这么名字就叫做Consumer Group扩展功能Connector 连接器Task,包含Source和Sink两种接口,给用户提供了自定义数据流转的可能。比如从JDBC导入到Kafka,或者将Kafka数据直接落地到DB。Stream 类似于Spark Stream,能够进行流数据处理。但它本身没有集群,只是在KAFKA集群上的抽象。如果你想要实时的流处理,且不需要Hadoop生态的某些东西,那么这个比较适合你。Topic我们的消息就是写在主题里。有了多个Topic,就可以对消息进行归类与隔离。比如登录信息写在user_activity_topic,日志消息写在log_topic中。每一个topic都可以调整其分区数量。假设我们的集群有三个Broker,那么当分区数量为1的时候,消息就仅写在其中一个节点上;当我们的分区为3,消息会根据hash写到三个节点上;当我们的分区为6,那每个节点将会有2个分区信息。增加分区可以增加并行度,但不是越多越好。一般,6-12最佳,最好能够被节点数整除,避免数据倾斜。每个分区都由一系列有序的、不可变的消息组成,这些消息被顺序的追加。分区中的每个消息都有一个连续的序列号叫做offset。Kafka将保留配置时间内的所有消息,所以它也是一个临时存储。在这段时间内,所有的消息都可被消费,并且可以通过改变offset的值进行重复、多次消费。Offset一般由消费者管理,当然也可以通过程序按需要设置。Offset只有commit以后,才会改变,否则,你将一直获取重复的数据。新的kafka已经将这些Offset的放到了一个专有的主题:__consumer_offsets,就是上图的紫色区域。值得一提的是,消费者的个数,不要超过分区的个数。否则,多出来的消费者,将接收不到任何数据。ISR分布式系统保证数据可靠性的一个常用手段就是增加副本个数,ISR就是建立在这个手段上。ISR全称"In-Sync Replicas",是保证HA和一致性的重要机制。副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性。一般2-3个为宜。副本有两个要素,一个是数量要够多,一个是不要落在同一个实例上。ISR是针对与Partition的,每个分区都有一个同步列表。N个replicas中,其中一个replica为leader,其他都为follower, leader处理partition的所有读写请求,其他的都是备份。与此同时,follower会被动定期地去复制leader上的数据。如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除。当ISR中所有Replica都向Leader发送ACK时,leader才commit。Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。当Leader节点失效,也会依赖Zk进行新的Leader选举。Offset转移到Kafka内部的Topic以后,KAFKA对ZK的依赖就越来越小了。可靠性消息投递语义At least once可能会丢消息,但不不会重复At most once不不丢消息,但可能重复,所以消费端要做幂等Exactly once消息不不会丢,且保证只投递⼀一次整体的消息投递语义需要Producer端和Consumer端两者来保证。KAFKA默认是At most once,也可以通过配置事务达到Exactly once,但效率很低,不推荐。ACK当生产者向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:1(默认) 数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。0 生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。-1 producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。KAFKA为什么快Cache Filesystem Cache PageCache缓存 顺序写 由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。Zero-copy 零拷⻉,少了一次内存交换。Batching of Messages 批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限。Pull 拉模式 使用拉模式进行消息的获取消费,与消费端处理能力相符。使用场景传递业务消息用户活动日志 • 监控项等日志流处理,比如某些聚合Commit Log,作为某些重要业务的冗余下面是一个日志方面的典型使用场景。压测KAFKA自带压测工具,如下。./kafka-producer-perf-test.sh –topic test001 –num- records 1000000 –record-size 1024 –throughput -1 –producer.config ../config/producer.properties配置管理关注点应⽤用场景 不同的应用场景有不一样的配置策略和不一样的SLA服务水准。需要搞清楚自己的消息是否允许丢失或者重复,然后设定相应的副本数量和ACK模式。Lag 要时刻注意消息的积压。Lag太高意味着处理能力有问题。如果在低峰时候你的消息有积压,那么当大流量到来,必然会出问题。扩容 扩容后会涉及到partition的重新分布,你的网络带宽可能会是瓶颈。磁盘满了 建议设置过期天数,或者设置磁盘最大使用量。log.retention.bytes过期删除 磁盘空间是有限的,建议保留最近的记录,其余自动删除。log.retention.hours log.retention.minutes log.retention.ms 监控管理工具KafkaManager 雅虎出品,可管理多个Kafka集群,是目前功能最全的管理工具。但是注意,当你的Topic太多,监控数据会占用你大量的带宽,造成你的机器负载增高。其监控功能偏弱,不满足需求。KafkaOffsetMonitor 程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。Kafka Web Console 监控功能较为全面,可以预览消息,监控Offset、Lag等信息,不建议在生产环境中使用。Burrow 是LinkedIn开源的一款专门监控consumer lag的框架。支持报警,只提供HTTP接口,没有webui。Availability Monitor for Kafka 微软开源的Kafka可用性、延迟性的监控框架,提供JMX接口,用的很少。Rebalance消费端Rebalance消费端的上线下线会造成分区与消费者的关系重新分配,造成Rebalance。业务会发生超时、抖动等。服务端reassign服务器扩容、缩容,节点启动、关闭,会造成数据的倾斜,需要对partition进行reassign。在kafka manager后台可以手动触发这个过程,使得分区的分布更加平均。这个过程会造成集群间大量的数据拷贝,当你的集群数据量大,这个过程会持续数个小时或者几天,谨慎操作。linkedin开源了其自动化管理工具cruise-control,有自动化运维需求的不妨一看。结尾本文是KAFKA相关的最基础的知识,基本涵盖了大部分简单的面试题。为了达到Exactly once这个语义,KAFKA做了很多努力,努力的结果就是几乎不可用,吞吐量实在是太低了。如果你真要将“高可靠”挂在嘴上,不如做好“补偿策略”。性能不成,最终的结果可能是整体不可用;而数据丢失,仅是极端情况下的一部分小数据而已。你会如何权衡呢?大流量下的KAFKA是非常吓人的,数据经常将网卡打满。而一旦Broker当机,如果单节点有上T的数据,光启动就需要半个小时,它还要作为Follower去追赶其他Master分区的数据。所以,不要让你的KAFKA集群太大,故障恢复会是一场灾难。启动以后,如果执行reassign,又会是另一番折腾了。 ...

December 18, 2018 · 1 min · jiezi

NSQ源码-nsqlookupd

为什么选择nsq之前一直在用erlang做电信产品的开发,对erlang的一些生态也比较了解,和erlang相关的产品在互联网公司使用最多的应该就是rabbitmq了,也许很多人听说过erlang就是因为他们公司在使用rabbitmq。在之前也看过一点rabbitmq的代码,以及后来的emqtt都看过一点, 所以对消息队列这块是情有独钟。转到go后也在关注消息队列这块,nsq是一个golng的消息系统, 而且架构也非常的简单。所以想通过源码的学习来掌握一些语言技巧。nsq的架构与代码结构nsq的的话主要有三个模块构成, 这里直接复制官方的介绍:nsqd: is the daemon that receives, queues, and delivers messages to clients.nsqlookupd: is the daemon that manages topology information and provides an eventually consistent discovery service.nsqadmin: is a web UI to introspect the cluster in realtime (and perform various administrative tasks).这里是一个消息投递的过程, 显示了消息怎么从nsqd到达consumer, 缺少了producer和nsqlookupd. nsqlookupd主要提供了两个功能:向nsqd提供一个topic和channel的注册信息对consumser提供了toic和channel的查询功能然后consumer查询到nsqd之后就是上面看到的动态图了, consumer直接和nsqd通信, 下面是一个更全面一点的时序图整个项目的代码结构也是围绕上面的三个模块构建:internal(公共部分的实现)nsqadmin(对nsqadmin的时间)nsqd(对nsqd的实现)nsqlookupd(对nsqlookupd的实现)总共也就这四个package,是不是有很想看下去的冲动(smile).lookupd的启动流程经过上面的介绍,我们对lookupd有里简单的认识.首先他是一个独立的进程, 为topic和channel的发现服务. 但不参与时间的消息投递. 对lookup的实现是在nsq/apps/nsqlookupd/nsqlookupd.go和nsq/nsqlookupd/中. lookupd的启动是使用了一个叫go-srv的windows wrapper.通过在nsq/apps/nsqlookupd/nsqlookupd.go中实现:type Service interface { // Init is called before the program/service is started and after it’s // determined if the program is running as a Windows Service. Init(Environment) error // Start is called after Init. This method must be non-blocking. Start() error // Stop is called in response to os.Interrupt, os.Kill, or when a // Windows Service is stopped. Stop() error}来完成整个进程的管理,go-srv帮助我们做了系统信号的管理, 下面来看下lookupd的启动流程,实例化一个NSQLookupd对象// apps/nsqlookupd/nsqlookupd.go daemon := nsqlookupd.New(opts) // 实例化一个NSQLookupd的对象 err := daemon.Main() // 开始启动NSQLookupd // nsq/nsqlookupd/nsqlookupd.gofunc New(opts *Options) *NSQLookupd { …. n := &NSQLookupd{ opts: opts, // 启动参数 DB: NewRegistrationDB(), // 内从里面的一个数据库,主要用来存储tpoic/channel以及nsqd的消息 } … return n}开始启动// Main starts an instance of nsqlookupd and returns an// error if there was a problem starting up.func (l *NSQLookupd) Main() error { ctx := &Context{l} // 启动两场go routine来处理tcp/http的请求 tcpListener, err := net.Listen(“tcp”, l.opts.TCPAddress) if err != nil { return fmt.Errorf(“listen (%s) failed - %s”, l.opts.TCPAddress, err) } httpListener, err := net.Listen(“tcp”, l.opts.HTTPAddress) if err != nil { return fmt.Errorf(“listen (%s) failed - %s”, l.opts.TCPAddress, err) } l.tcpListener = tcpListener l.httpListener = httpListener tcpServer := &tcpServer{ctx: ctx} l.waitGroup.Wrap(func() { protocol.TCPServer(tcpListener, tcpServer, l.logf) }) httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { http_api.Serve(httpListener, httpServer, “HTTP”, l.logf) }) return nil}下面是一个lookupd里面的进程模型lookupd里的主要数据结构在上面创建一个instance的时候我们看到创建一个NewRegistrationDB()的函数, 这里就是存储lookupd所有数据结构的地方.每个topic/channe/clientl就是一个Registration的key, 然后value对应的就是该topic/channel对应的nsqd信息.所有的接口都是在操作上面的那个数据结构.lookupd和其他模块的交互在进程模型中我们看到一个tcp server和一个http seerver, 和其他模块之间的交互都是在里面完成的.看下tcp server的处理有新的tcp连接进来,创建一个新的go routine去服务该请求// /nsq/internal/tcp_server.gofunc TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { for { … go handler.Handle(clientConn) }实例化一个protocol对象// /nsq/nsqlookupd/tcp.gofunc (p *tcpServer) Handle(clientConn net.Conn) { … prot.IOLoop(clientConn) …}对请求的具体处理// /nsq/nsqlookupd/lookup_protocol_v1.gofunc (p *LookupProtocolV1) IOLoop(conn net.Conn) error { … p.Exec(client, reader, params) …}// /nsq/nsqlookupd/lookup_protocol_v1.gofunc (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { switch params[0] { case “PING”: // NSQD的心跳包 return p.PING(client, params) case “IDENTIFY”: // NQSD启动时候的indentify就是我们上面看到的peerInfo return p.IDENTIFY(client, reader, params[1:]) case “REGISTER”: // 注册topic/channel信息到lookupd return p.REGISTER(client, reader, params[1:]) case “UNREGISTER”: // unregister topic/lookup 信息 return p.UNREGISTER(client, reader, params[1:]) } return nil, protocol.NewFatalClientErr(nil, “E_INVALID”, fmt.Sprintf(“invalid command %s”, params[0]))}上面就是整个tcp server的流程, 每个连接都是一个go routine. 相对tcp server来说的话http server就简单很多, 如果你对httprouter熟悉的话就更简单了就是对RegistrationDB的增删查改. http测的api的话可以参考:官方的文档总结lookupd是其中比较简单的模块,通过源码的学习我们可以更好的掌握go的一些技巧,也鼓励大家通过一一些开源的代码来掌握语言的一些技巧。其实通过lookupd我们可以抽象一套自己的HTTP/TCP服务端架构来。 ...

December 1, 2018 · 2 min · jiezi