Kafka消息系统基础知识索引

我们在《360度测试:KAFKA会丢数据么?其高可用是否满足需求?》这篇文章中,详细说明了KAFKA是否适合用在业务系统中。但有些朋友,还不知道KAFKA为何物,以及它为何存在。这在工作和面试中是比较吃亏的,因为不知道什么时候起,KAFKA似乎成了一种工程师的必备技能。一些观念的修正从 0.9 版本开始,Kafka 的标语已经从“一个高吞吐量,分布式的消息系统”改为"一个分布式流平台"。Kafka不仅仅是一个队列,而且是一个存储,有超强的堆积能力。Kafka不仅用在吞吐量高的大数据场景,也可以用在有事务要求的业务系统上,但性能较低。Kafka不是Topic越多越好,由于其设计原理,在数量达到阈值后,其性能和Topic数量成反比。引入了消息队列,就等于引入了异步,不管你是出于什么目的。这通常意味着业务流程的改变,甚至产品体验的变更。消息系统是什么典型场景上图是一些小系统的典型架构。考虑订单的业务场景,有大量的请求指向我们的业务系统,如果直接经过复杂的业务逻辑进入业务表,将会有大量请求超时失败。所以我们加入了一张中间缓冲表(或者Redis),用来承接用户的请求。然后,有一个定时任务,不断的从缓冲表中获取数据,进行真正的业务逻辑处理。这种设计有以下几个问题:定时任务的轮询间隔不好控制。业务处理容易延迟。无法横向扩容处理能力,且会引入分布式锁、顺序性保证等问题。当其他业务也需要这些订单数据的时候,业务逻辑就必须要加入到定时任务里。当访问量增加、业务逻辑复杂化的时候,消息队列就呼之欲出了。请求会暂存在消息队列,然后实时通过推(或者拉)的方式进行处理。在此场景下,消息队列充当了削峰和冗余的组件。消息系统的作用削峰 用于承接超出业务系统处理能力的请求,使业务平稳运行。这能够大量节约成本,比如某些秒杀活动,并不是针对峰值设计容量。缓冲 在服务层和缓慢的落地层作为缓冲层存在,作用与削峰类似,但主要用于服务内数据流转。比如批量短信发送。解耦 项目尹始,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。冗余 消息数据能够采用一对多的方式,供多个毫无关联的业务使用。健壮性 消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。消息系统要求消息系统即然这么重要,那么除了能够保证高可用,对它本身的特性也有较高需求。大体有下面几点:性能要高 包含消息投递和消息消费,都要快。一般通过增加分片数获取并行处理能力。消息要可靠 在某些场景,不能丢消息。生产、消费、MQ端都不能丢消息。一般通过增加副本,强制刷盘来解决。扩展性要好 能够陪你把项目做大,陪你到天荒地老。增加节点集群增大后,不能降低性能。生态成熟 监控、运维、多语言支持、社区的活跃。KAFKA名词解释基本功能Kafka是一个分布式消息(存储)系统。分布式系统通过分片增加并行度;通过副本增加可靠性,kafka也不例外。我们来看一下它的结构,顺便解释一下其中的术语。你在一台机器上安装了Kafka,那么这台机器就叫Broker,KAFKA集群包含了一个或者多个这样的实例。负责往KAFKA写入数据的组件就叫做Producer,消息的生产者一般写在业务系统里。发送到KAFKA的消息可能有多种,如何区别其分类?就是Topic的概念。一个主题分布式化后,可能会存在多个Broker上。将Topic拆成多个段,增加并行度后,拆成的每个部分叫做Partition,分区一般平均分布在所有机器上。那些消费Kafka中数据的应用程序,就叫做Consumer,我们给某个主题的某个消费业务起一个名字,这么名字就叫做Consumer Group扩展功能Connector 连接器Task,包含Source和Sink两种接口,给用户提供了自定义数据流转的可能。比如从JDBC导入到Kafka,或者将Kafka数据直接落地到DB。Stream 类似于Spark Stream,能够进行流数据处理。但它本身没有集群,只是在KAFKA集群上的抽象。如果你想要实时的流处理,且不需要Hadoop生态的某些东西,那么这个比较适合你。Topic我们的消息就是写在主题里。有了多个Topic,就可以对消息进行归类与隔离。比如登录信息写在user_activity_topic,日志消息写在log_topic中。每一个topic都可以调整其分区数量。假设我们的集群有三个Broker,那么当分区数量为1的时候,消息就仅写在其中一个节点上;当我们的分区为3,消息会根据hash写到三个节点上;当我们的分区为6,那每个节点将会有2个分区信息。增加分区可以增加并行度,但不是越多越好。一般,6-12最佳,最好能够被节点数整除,避免数据倾斜。每个分区都由一系列有序的、不可变的消息组成,这些消息被顺序的追加。分区中的每个消息都有一个连续的序列号叫做offset。Kafka将保留配置时间内的所有消息,所以它也是一个临时存储。在这段时间内,所有的消息都可被消费,并且可以通过改变offset的值进行重复、多次消费。Offset一般由消费者管理,当然也可以通过程序按需要设置。Offset只有commit以后,才会改变,否则,你将一直获取重复的数据。新的kafka已经将这些Offset的放到了一个专有的主题:__consumer_offsets,就是上图的紫色区域。值得一提的是,消费者的个数,不要超过分区的个数。否则,多出来的消费者,将接收不到任何数据。ISR分布式系统保证数据可靠性的一个常用手段就是增加副本个数,ISR就是建立在这个手段上。ISR全称"In-Sync Replicas",是保证HA和一致性的重要机制。副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性。一般2-3个为宜。副本有两个要素,一个是数量要够多,一个是不要落在同一个实例上。ISR是针对与Partition的,每个分区都有一个同步列表。N个replicas中,其中一个replica为leader,其他都为follower, leader处理partition的所有读写请求,其他的都是备份。与此同时,follower会被动定期地去复制leader上的数据。如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除。当ISR中所有Replica都向Leader发送ACK时,leader才commit。Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。当Leader节点失效,也会依赖Zk进行新的Leader选举。Offset转移到Kafka内部的Topic以后,KAFKA对ZK的依赖就越来越小了。可靠性消息投递语义At least once可能会丢消息,但不不会重复At most once不不丢消息,但可能重复,所以消费端要做幂等Exactly once消息不不会丢,且保证只投递⼀一次整体的消息投递语义需要Producer端和Consumer端两者来保证。KAFKA默认是At most once,也可以通过配置事务达到Exactly once,但效率很低,不推荐。ACK当生产者向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:1(默认) 数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。0 生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。-1 producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。KAFKA为什么快Cache Filesystem Cache PageCache缓存 顺序写 由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。Zero-copy 零拷⻉,少了一次内存交换。Batching of Messages 批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限。Pull 拉模式 使用拉模式进行消息的获取消费,与消费端处理能力相符。使用场景传递业务消息用户活动日志 • 监控项等日志流处理,比如某些聚合Commit Log,作为某些重要业务的冗余下面是一个日志方面的典型使用场景。压测KAFKA自带压测工具,如下。./kafka-producer-perf-test.sh –topic test001 –num- records 1000000 –record-size 1024 –throughput -1 –producer.config ../config/producer.properties配置管理关注点应⽤用场景 不同的应用场景有不一样的配置策略和不一样的SLA服务水准。需要搞清楚自己的消息是否允许丢失或者重复,然后设定相应的副本数量和ACK模式。Lag 要时刻注意消息的积压。Lag太高意味着处理能力有问题。如果在低峰时候你的消息有积压,那么当大流量到来,必然会出问题。扩容 扩容后会涉及到partition的重新分布,你的网络带宽可能会是瓶颈。磁盘满了 建议设置过期天数,或者设置磁盘最大使用量。log.retention.bytes过期删除 磁盘空间是有限的,建议保留最近的记录,其余自动删除。log.retention.hours log.retention.minutes log.retention.ms 监控管理工具KafkaManager 雅虎出品,可管理多个Kafka集群,是目前功能最全的管理工具。但是注意,当你的Topic太多,监控数据会占用你大量的带宽,造成你的机器负载增高。其监控功能偏弱,不满足需求。KafkaOffsetMonitor 程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。Kafka Web Console 监控功能较为全面,可以预览消息,监控Offset、Lag等信息,不建议在生产环境中使用。Burrow 是LinkedIn开源的一款专门监控consumer lag的框架。支持报警,只提供HTTP接口,没有webui。Availability Monitor for Kafka 微软开源的Kafka可用性、延迟性的监控框架,提供JMX接口,用的很少。Rebalance消费端Rebalance消费端的上线下线会造成分区与消费者的关系重新分配,造成Rebalance。业务会发生超时、抖动等。服务端reassign服务器扩容、缩容,节点启动、关闭,会造成数据的倾斜,需要对partition进行reassign。在kafka manager后台可以手动触发这个过程,使得分区的分布更加平均。这个过程会造成集群间大量的数据拷贝,当你的集群数据量大,这个过程会持续数个小时或者几天,谨慎操作。linkedin开源了其自动化管理工具cruise-control,有自动化运维需求的不妨一看。结尾本文是KAFKA相关的最基础的知识,基本涵盖了大部分简单的面试题。为了达到Exactly once这个语义,KAFKA做了很多努力,努力的结果就是几乎不可用,吞吐量实在是太低了。如果你真要将“高可靠”挂在嘴上,不如做好“补偿策略”。性能不成,最终的结果可能是整体不可用;而数据丢失,仅是极端情况下的一部分小数据而已。你会如何权衡呢?大流量下的KAFKA是非常吓人的,数据经常将网卡打满。而一旦Broker当机,如果单节点有上T的数据,光启动就需要半个小时,它还要作为Follower去追赶其他Master分区的数据。所以,不要让你的KAFKA集群太大,故障恢复会是一场灾难。启动以后,如果执行reassign,又会是另一番折腾了。 ...

December 18, 2018 · 1 min · jiezi

当Elasticsearch遇见Kafka

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~本文由michelmu发表于云+社区专栏Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和高扩展性之外,对多种数据源的兼容能力也是其成功的秘诀之一。而Elasticsearch强大的数据源兼容能力,主要来源于其核心组件之一的Logstash, Logstash通过插件的形式实现了对多种数据源的输入和输出。Kafka是一种高吞吐量的分布式发布订阅消息系统,是一种常见的数据源,也是Logstash支持的众多输入输出源的其中一个。本文将从实践的角度,研究使用Logstash Kafka Input插件实现将Kafka中数据导入到Elasticsearch的过程。使用Logstash Kafka插件连接Kafka和Elasticsearch1 Logstash Kafka input插件简介Logstash Kafka Input插件使用Kafka API从Kafka topic中读取数据信息,使用时需要注意Kafka的版本及对应的插件版本是否一致。该插件支持通过SSL和Kerveros SASL方式连接Kafka。另外该插件提供了group管理,并使用默认的offset管理策略来操作Kafka topic。 Logstash默认情况下会使用一个单独的group来订阅Kafka消息,每个Logstash Kafka Consumer会使用多个线程来增加吞吐量。当然也可以多个Logstash实例使用同一个group_id,来均衡负载。另外建议把Consumer的个数设置为Kafka分区的大小,以提供更好的性能。2 测试环境准备2.1 创建Elasticsearch集群为了简化搭建过程,本文使用了腾讯云Elasticsearch service。腾讯云Elasticsearch service不仅可以实现Elasticsearch集群的快速搭建,还提供了内置Kibana,集群监控,专用主节点,Ik分词插件等功能,极大的简化了Elasticsearch集群的创建和管理工作。2.2 创建Kafka服务Kafka服务的搭建采用腾讯云CKafka来完成。与Elasticsearch Service一样,腾讯云CKafka可以实现Kafka服务的快速创建,100%兼容开源Kafka API(0.9版本)。2.3 服务器除了准备Elasticsearch和Kafka,另外还需要准备一台服务器,用于运行Logstash以连接Elasticsearch和Kafka。本文采用腾讯云CVM服务器2.4 注意事项1) 需要将Elasticsearch、Kafka和服务器创建在同一个网络下,以便实现网络互通。由于本文采用的是腾讯云相关的技术服务,因此只需要将Elasticsearch service,CKafka和CVM创建在同一个私有网路(VPC)下即可。2) 注意获取Elasticsearch serivce,CKafka和CVM的内网地址和端口,以便后续服务使用 本次测试中:服务ipportElasticsearch service192.168.0.89200Ckafka192.168.13.109092CVM192.168.0.13-3 使用Logstash连接Elasticsearch和Kafka3.1 Kafka准备可以参考[CKafka 使用入门] 按照上面的教程1) 创建名为kafka_es_test的topic2) 安装JDK3) 安装Kafka工具包4) 创建producer和consumer验证kafka功能3.2 安装LogstashLogstash的安装和使用可以参考[一文快速上手Logstash]3.3 配置Logstash Kafka input插件创建kafka_test_pipeline.conf文件内容如下:input{ kafka{ bootstrap_servers=>“192.168.13.10:9092” topics=>[“kafka_es_test”] group_id=>“logstash_kafka_test” }}output{ elasticsearch{ hosts=>[“192.168.0.8:9200”] }}其中定义了一个kafka的input和一个elasticsearch的output 对于Kafka input插件上述三个参数为必填参数,除此之外还有一些对插件行为进行调整的一些参数如: auto_commit_interval_ms 用于设置Consumer提交offset给Kafka的时间间隔 consumer_threads 用于设置Consumer的线程数,默认为1,实际中应设置与Kafka Topic分区数一致 fetch_max_wait_ms 用于指定Consumer等待一个fetch请求达到fetch_min_bytes的最长时间 fetch_min_bytes 用于指定Consumer fetch请求应返回的最小数据量 topics_pattern 用于通过正则订阅符合某一规则的一组topic 更多参数参考:[Kafka Input Configuration Options]3.4 启动Logstash以下操作在Logstash根目录中进行1) 验证配置./bin/logstash -f kafka_test_pipeline.conf –config.test_and_exit如有错误,根据提示修改配置文件。若配置正确会得到如下结果Sending Logstash’s logs to /root/logstash-5.6.13/logs which is now configured via log4j2.properties[2018-11-11T15:24:01,598][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>“netflow”, :directory=>"/root/logstash-5.6.13/modules/netflow/configuration"}[2018-11-11T15:24:01,603][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>“fb_apache”, :directory=>"/root/logstash-5.6.13/modules/fb_apache/configuration"}Configuration OK[2018-11-11T15:24:01,746][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash2) 启动Logstash./bin/logstash -f kafka_test_pipeline.conf –config.reload.automatic观察日志是否有错误提示,并及时处理3.4 启动Kafka Producer以下操作在Kafka工具包根目录下进行./bin/kafka-console-producer.sh –broker-list 192.168.13.10:9092 –topic kafka_es_test写入测试数据This is a message3.5 Kibana验证结果登录Elasticsearch对应Kibana, 在Dev Tools中进行如下操作1) 查看索引GET _cat/indices可以看到一个名为logstash-xxx.xx.xx的索引被创建成功green open .kibana QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kbgreen open logstash-2018.11.11 DejRdNJVQ1e1MwbyJjJjLw 5 1 1 0 8.7kb 4.3kb2) 查看写入的数据GET logstash-2018.11.11/_search可以看到数据已经被成功写入{ “took”: 0, “timed_out”: false, “_shards”: { “total”: 5, “successful”: 5, “skipped”: 0, “failed”: 0 }, “hits”: { “total”: 1, “max_score”: 1, “hits”: [ { “_index”: “logstash-2018.11.11”, “_type”: “logs”, “_id”: “AWcBsEegMu-Dkjm1ap3H”, “_score”: 1, “_source”: { “message”: “This is a message”, “@version”: “1”, “@timestamp”: “2018-11-11T07:33:09.079Z” } } ] }}4 总结Logstash作为Elastic Stack中数据采集和处理的核心组件,为Elasticsearch提供了强大的数据源兼容能力。从测试过程可以看出,使用Logstash实现kafka和Elaticsearch的连接过程相当简单方便。另外Logstash的数据处理功能,也使得采用该架构的系统对数据映射和处理有天然的优势。 然而,使用Logstash实现Kafka和Elasticsearch的连接,并不是连接Kafka和Elasticsearch的唯一方案,另一种常见的方案是使用Kafka Connect, 可以参考“当Elasticsearch遇见Kafka–Kafka Connect”相关阅读【每日课程推荐】机器学习实战!快速入门在线广告业务及CTR相应知识 ...

November 12, 2018 · 1 min · jiezi

从源码分析如何优雅的使用 Kafka 生产者

前言在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢?正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。内容较多,对源码感兴趣的朋友请系好安全带????(源码基于 v0.10.0.0 版本分析)。同时最好是有一定的 Kafka 使用经验,知晓基本的用法。简单的消息发送在分析之前先看一个简单的消息发送是怎么样的。以下代码基于 SpringBoot 构建。首先创建一个 org.apache.kafka.clients.producer.Producer 的 bean。主要关注 bootstrap.servers,它是必填参数。指的是 Kafka 集群中的 broker 地址,例如 127.0.0.1:9094。其余几个参数暂时不做讨论,后文会有详细介绍。接着注入这个 bean 即可调用它的发送函数发送消息。这里我给某一个 Topic 发送了 10W 条数据,运行程序消息正常发送。但这仅仅只是做到了消息发送,对消息是否成功送达完全没管,等于是纯异步的方式。同步那么我想知道消息到底发送成功没有该怎么办呢?其实 Producer 的 API 已经帮我们考虑到了,发送之后只需要调用它的 get() 方法即可同步获取发送结果。发送结果:这样的发送效率其实是比较低下的,因为每次都需要同步等待消息发送的结果。异步为此我们应当采取异步的方式发送,其实 send() 方法默认则是异步的,只要不手动调用 get() 方法。但这样就没法获知发送结果。所以查看 send() 的 API 可以发现还有一个参数。Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);Callback 是一个回调接口,在消息发送完成之后可以回调我们自定义的实现。执行之后的结果:同样的也能获取结果,同时发现回调的线程并不是上文同步时的主线程,这样也能证明是异步回调的。同时回调的时候会传递两个参数:RecordMetadata 和上文一致的消息发送成功后的元数据。Exception 消息发送过程中的异常信息。但是这两个参数并不会同时都有数据,只有发送失败才会有异常信息,同时发送元数据为空。所以正确的写法应当是:至于为什么会只有参数一个有值,在下文的源码分析中会一一解释。源码分析现在只掌握了基本的消息发送,想要深刻的理解发送中的一些参数配置还是得源码说了算。首先还是来谈谈消息发送时的整个流程是怎么样的,Kafka 并不是简单的把消息通过网络发送到了 broker 中,在 Java 内部还是经过了许多优化和设计。发送流程为了直观的了解发送的流程,简单的画了几个在发送过程中关键的步骤。从上至下依次是:初始化以及真正发送消息的 kafka-producer-network-thread IO 线程。将消息序列化。得到需要发送的分区。写入内部的一个缓存区中。初始化的 IO 线程不断的消费这个缓存来发送消息。步骤解析接下来详解每个步骤。初始化调用该构造方法进行初始化时,不止是简单的将基本参数写入 KafkaProducer。比较麻烦的是初始化 Sender 线程进行缓冲区消费。初始化 IO 线程处:可以看到 Sender 线程有需要成员变量,比如:acks,retries,requestTimeout等,这些参数会在后文分析。序列化消息在调用 send() 函数后其实第一步就是序列化,毕竟我们的消息需要通过网络才能发送到 Kafka。其中的 valueSerializer.serialize(record.topic(), record.value()); 是一个接口,我们需要在初始化时候指定序列化实现类。我们也可以自己实现序列化,只需要实现 org.apache.kafka.common.serialization.Serializer 接口即可。路由分区接下来就是路由分区,通常我们使用的 Topic 为了实现扩展性以及高性能都会创建多个分区。如果是一个分区好说,所有消息都往里面写入即可。但多个分区就不可避免需要知道写入哪个分区。通常有三种方式。指定分区可以在构建 ProducerRecord 为每条消息指定分区。这样在路由时会判断是否有指定,有就直接使用该分区。这种一般在特殊场景下会使用。自定义路由策略如果没有指定分区,则会调用 partitioner.partition 接口执行自定义分区策略。而我们也只需要自定义一个类实现 org.apache.kafka.clients.producer.Partitioner 接口,同时在创建 KafkaProducer 实例时配置 partitioner.class 参数。通常需要自定义分区一般是在想尽量的保证消息的顺序性。或者是写入某些特有的分区,由特别的消费者来进行处理等。默认策略最后一种则是默认的路由策略,如果我们啥都没做就会执行该策略。该策略也会使得消息分配的比较均匀。来看看它的实现:简单的来说分为以下几步:获取 Topic 分区数。将内部维护的一个线程安全计数器 +1。与分区数取模得到分区编号。其实这就是很典型的轮询算法,所以只要分区数不频繁变动这种方式也会比较均匀。写入内部缓存在 send() 方法拿到分区后会调用一个 append() 函数:该函数中会调用一个 getOrCreateDeque() 写入到一个内部缓存中 batches。消费缓存在最开始初始化的 IO 线程其实是一个守护线程,它会一直消费这些数据。通过图中的几个函数会获取到之前写入的数据。这块内容可以不必深究,但其中有个 completeBatch 方法却非常关键。调用该方法时候肯定已经是消息发送完毕了,所以会调用 batch.done() 来完成之前我们在 send() 方法中定义的回调接口。从这里也可以看出为什么之前说发送完成后元数据和异常信息只会出现一个。Producer 参数解析发送流程讲完了再来看看 Producer 中比较重要的几个参数。acksacks 是一个影响消息吞吐量的一个关键参数。主要有 [all、-1, 0, 1] 这几个选项,默认为 1。由于 Kafka 不是采取的主备模式,而是采用类似于 Zookeeper 的主备模式。前提是 Topic 配置副本数量 replica > 1。当 acks = all/-1 时:意味着会确保所有的 follower 副本都完成数据的写入才会返回。这样可以保证消息不会丢失!但同时性能和吞吐量却是最低的。当 acks = 0 时:producer 不会等待副本的任何响应,这样最容易丢失消息但同时性能却是最好的!当 acks = 1 时:这是一种折中的方案,它会等待副本 Leader 响应,但不会等到 follower 的响应。一旦 Leader 挂掉消息就会丢失。但性能和消息安全性都得到了一定的保证。batch.size这个参数看名称就知道是内部缓存区的大小限制,对他适当的调大可以提高吞吐量。但也不能极端,调太大会浪费内存。小了也发挥不了作用,也是一个典型的时间和空间的权衡。上图是几个使用的体现。retriesretries 该参数主要是来做重试使用,当发生一些网络抖动都会造成重试。这个参数也就是限制重试次数。但也有一些其他问题。因为是重发所以消息顺序可能不会一致,这也是上文提到就算是一个分区消息也不会是完全顺序的情况。还是由于网络问题,本来消息已经成功写入了但是没有成功响应给 producer,进行重试时就可能会出现消息重复。这种只能是消费者进行幂等处理。高效的发送方式如果消息量真的非常大,同时又需要尽快的将消息发送到 Kafka。一个 producer 始终会收到缓存大小等影响。那是否可以创建多个 producer 来进行发送呢?配置一个最大 producer 个数。发送消息时首先获取一个 producer,获取的同时判断是否达到最大上限,没有就新建一个同时保存到内部的 List 中,保存时做好同步处理防止并发问题。获取发送者时可以按照默认的分区策略使用轮询的方式获取(保证使用均匀)。这样在大量、频繁的消息发送场景中可以提高发送效率减轻单个 producer 的压力。关闭 Producer最后则是 Producer 的关闭,Producer 在使用过程中消耗了不少资源(线程、内存、网络等)因此需要显式的关闭从而回收这些资源。默认的 close() 方法和带有超时时间的方法都是在一定的时间后强制关闭。但在过期之前都会处理完剩余的任务。所以使用哪一个得视情况而定。总结本文内容较多,从实例和源码的角度分析了 Kafka 生产者。希望看完的朋友能有收获,同时也欢迎留言讨论。不出意外下期会讨论 Kafka 消费者。如果对你有帮助还请分享让更多的人看到。欢迎关注公众号一起交流: ...

October 11, 2018 · 1 min · jiezi

reactor-kafka小试牛刀

序本文主要展示一下如何使用reactor-kafkamaven <dependency> <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> <version>1.0.1.RELEASE</version> </dependency>准备启动zookeepercd zookeeper-3.4.13sh bin/zkServer.sh startZooKeeper JMX enabled by defaultZooKeeper remote JMX Port set to 8999ZooKeeper remote JMX authenticate set to falseZooKeeper remote JMX ssl set to falseZooKeeper remote JMX log4j set to trueUsing config: zookeeper-3.4.13/bin/../conf/zoo.cfg-n Starting zookeeper …STARTED启动kafkacd kafka_2.11-1.1.1sh bin/kafka-server-start.sh config/server.properties创建topiccd kafka_2.11-1.1.1sh bin/kafka-topics.sh –create –topic demotopic –replication-factor 1 –partitions 3 –zookeeper localhost:2181Created topic “demotopic”.实例producer @Test public void testProducer() throws InterruptedException { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, “sample-producer”); props.put(ProducerConfig.ACKS_CONFIG, “all”); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); SenderOptions<Integer, String> senderOptions = SenderOptions.create(props); KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions); SimpleDateFormat dateFormat = new SimpleDateFormat(“HH:mm:ss:SSS z dd MMM yyyy”); CountDownLatch latch = new CountDownLatch(100); sender.<Integer>send(Flux.range(1, 100) .map(i -> SenderRecord.create(new ProducerRecord<>(TOPIC, i, “Message_” + i), i))) .doOnError(e -> log.error(“Send failed”, e)) .subscribe(r -> { RecordMetadata metadata = r.recordMetadata(); System.out.printf(“Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n”, r.correlationMetadata(), metadata.topic(), metadata.partition(), metadata.offset(), dateFormat.format(new Date(metadata.timestamp()))); latch.countDown(); }); latch.await(10, TimeUnit.SECONDS); sender.close(); }consumer @Test public void testConsumer() throws InterruptedException { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.CLIENT_ID_CONFIG, “sample-consumer”); props.put(ConsumerConfig.GROUP_ID_CONFIG, “sample-group”); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”); ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions.create(props); SimpleDateFormat dateFormat = new SimpleDateFormat(“HH:mm:ss:SSS z dd MMM yyyy”); CountDownLatch latch = new CountDownLatch(100); ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(TOPIC)) .addAssignListener(partitions -> log.debug(“onPartitionsAssigned {}”, partitions)) .addRevokeListener(partitions -> log.debug(“onPartitionsRevoked {}”, partitions)); Flux<ReceiverRecord<Integer, String>> kafkaFlux = KafkaReceiver.create(options).receive(); Disposable disposable = kafkaFlux.subscribe(record -> { ReceiverOffset offset = record.receiverOffset(); System.out.printf(“Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n”, offset.topicPartition(), offset.offset(), dateFormat.format(new Date(record.timestamp())), record.key(), record.value()); offset.acknowledge(); latch.countDown(); }); latch.await(10, TimeUnit.SECONDS); disposable.dispose(); }小结reactor-kafka对kafka的api进行封装,改造为reactive streams模式,这样用起来更为顺手,熟悉reactor的开发人员可以轻车熟路。docreactor-kafka-samplesReactor Kafka Reference Guide ...

October 5, 2018 · 2 min · jiezi

设计一个百万级的消息推送系统

前言首先迟到的祝大家中秋快乐。最近一周多没有更新了。其实我一直想憋一个大招,分享一些大家感兴趣的干货。鉴于最近我个人的工作内容,于是利用这三天小长假憋了一个出来(其实是玩了两天????)。先简单说下本次的主题,由于我最近做的是物联网相关的开发工作,其中就不免会遇到和设备的交互。最主要的工作就是要有一个系统来支持设备的接入、向设备推送消息;同时还得满足大量设备接入的需求。所以本次分享的内容不但可以满足物联网领域同时还支持以下场景:基于 WEB 的聊天系统(点对点、群聊)。WEB 应用中需求服务端推送的场景。基于 SDK 的消息推送平台。技术选型要满足大量的连接数、同时支持双全工通信,并且性能也得有保障。在 Java 技术栈中进行选型首先自然是排除掉了传统 IO。那就只有选 NIO 了,在这个层面其实选择也不多,考虑到社区、资料维护等方面最终选择了 Netty。最终的架构图如下:现在看着蒙没关系,下文一一介绍。协议解析既然是一个消息系统,那自然得和客户端定义好双方的协议格式。常见和简单的是 HTTP 协议,但我们的需求中有一项需要是双全工的交互方式,同时 HTTP 更多的是服务于浏览器。我们需要的是一个更加精简的协议,减少许多不必要的数据传输。因此我觉得最好是在满足业务需求的情况下定制自己的私有协议,在我这个场景下其实有标准的物联网协议。如果是其他场景可以借鉴现在流行的 RPC 框架定制私有协议,使得双方通信更加高效。不过根据这段时间的经验来看,不管是哪种方式都得在协议中预留安全相关的位置。协议相关的内容就不过讨论了,更多介绍具体的应用。简单实现首先考虑如何实现功能,再来思考百万连接的情况。注册鉴权在做真正的消息上、下行之前首先要考虑的就是鉴权问题。就像你使用微信一样,第一步怎么也得是登录吧,不能无论是谁都可以直接连接到平台。所以第一步得是注册才行。如上面架构图中的 注册/鉴权 模块。通常来说都需要客户端通过 HTTP 请求传递一个唯一标识,后台鉴权通过之后会响应一个 token,并将这个 token 和客户端的关系维护到 Redis 或者是 DB 中。客户端将这个 token 也保存到本地,今后的每一次请求都得带上这个 token。一旦这个 token 过期,客户端需要再次请求获取 token。鉴权通过之后客户端会直接通过TCP 长连接到图中的 push-server 模块。这个模块就是真正处理消息的上、下行。保存通道关系在连接接入之后,真正处理业务之前需要将当前的客户端和 Channel 的关系维护起来。假设客户端的唯一标识是手机号码,那就需要把手机号码和当前的 Channel 维护到一个 Map 中。这点和之前 SpringBoot 整合长连接心跳机制 类似。同时为了可以通过 Channel 获取到客户端唯一标识(手机号码),还需要在 Channel 中设置对应的属性:public static void putClientId(Channel channel, String clientId) {channel.attr(CLIENT_ID).set(clientId);}获取时手机号码时:public static String getClientId(Channel channel) {return (String)getAttribute(channel, CLIENT_ID);}这样当我们客户端下线的时便可以记录相关日志:String telNo = NettyAttrUtil.getClientId(ctx.channel());NettySocketHolder.remove(telNo);log.info(“客户端下线,TelNo=” + telNo);这里有一点需要注意:存放客户端与 Channel 关系的 Map 最好是预设好大小(避免经常扩容),因为它将是使用最为频繁同时也是占用内存最大的一个对象。消息上行接下来则是真正的业务数据上传,通常来说第一步是需要判断上传消息输入什么业务类型。在聊天场景中,有可能上传的是文本、图片、视频等内容。所以我们得进行区分,来做不同的处理;这就和客户端协商的协议有关了。可以利用消息头中的某个字段进行区分。更简单的就是一个 JSON 消息,拿出一个字段用于区分不同消息。不管是哪种只有可以区分出来即可。消息解析与业务解耦消息可以解析之后便是处理业务,比如可以是写入数据库、调用其他接口等。我们都知道在 Netty 中处理消息一般是在 channelRead() 方法中。在这里可以解析消息,区分类型。但如果我们的业务逻辑也写在里面,那这里的内容将是巨多无比。甚至我们分为好几个开发来处理不同的业务,这样将会出现许多冲突、难以维护等问题。所以非常有必要将消息解析与业务处理完全分离开来。这时面向接口编程就发挥作用了。这里的核心代码和 「造个轮子」——cicada(轻量级 WEB 框架) 是一致的。都是先定义一个接口用于处理业务逻辑,然后在解析消息之后通过反射创建具体的对象执行其中的处理函数即可。这样不同的业务、不同的开发人员只需要实现这个接口同时实现自己的业务逻辑即可。伪代码如下:想要了解 cicada 的具体实现请点击这里:https://github.com/TogetherOS/cicada上行还有一点需要注意;由于是基于长连接,所以客户端需要定期发送心跳包用于维护本次连接。同时服务端也会有相应的检查,N 个时间间隔没有收到消息之后将会主动断开连接节省资源。这点使用一个 IdleStateHandler 就可实现,更多内容可以查看 Netty(一) SpringBoot 整合长连接心跳机制TCP-Heartbeat/#%E6%9C%8D%E5%8A%A1%E7%AB%AF%E5%BF%83%E8%B7%B3)。消息下行有了上行自然也有下行。比如在聊天的场景中,有两个客户端连上了 push-server,他们直接需要点对点通信。这时的流程是:A 将消息发送给服务器。服务器收到消息之后,得知消息是要发送给 B,需要在内存中找到 B 的 Channel。通过 B 的 Channel 将 A 的消息转发下去。这就是一个下行的流程。甚至管理员需要给所有在线用户发送系统通知也是类似:遍历保存通道关系的 Map,挨个发送消息即可。这也是之前需要存放到 Map 中的主要原因。伪代码如下:具体可以参考:https://github.com/crossoverJie/netty-action/分布式方案单机版的实现了,现在着重讲讲如何实现百万连接。百万连接其实只是一个形容词,更多的是想表达如何来实现一个分布式的方案,可以灵活的水平拓展从而能支持更多的连接。再做这个事前首先得搞清楚我们单机版的能支持多少连接。影响这个的因素就比较多了。服务器自身配置。内存、CPU、网卡、Linux 支持的最大文件打开数等。应用自身配置,因为 Netty 本身需要依赖于堆外内存,但是 JVM 本身也是需要占用一部分内存的,比如存放通道关系的大 Map。这点需要结合自身情况进行调整。结合以上的情况可以测试出单个节点能支持的最大连接数。单机无论怎么优化都是有上限的,这也是分布式主要解决的问题。架构介绍在将具体实现之前首先得讲讲上文贴出的整体架构图。先从左边开始。上文提到的 注册鉴权 模块也是集群部署的,通过前置的 Nginx 进行负载。之前也提过了它主要的目的是来做鉴权并返回一个 token 给客户端。但是 push-server 集群之后它又多了一个作用。那就是得返回一台可供当前客户端使用的 push-server。右侧的 平台 一般指管理平台,它可以查看当前的实时在线数、给指定客户端推送消息等。推送消息则需要经过一个推送路由(push-server)找到真正的推送节点。其余的中间件如:Redis、Zookeeper、Kafka、MySQL 都是为了这些功能所准备的,具体看下面的实现。注册发现首先第一个问题则是 注册发现,push-server 变为多台之后如何给客户端选择一台可用的节点是第一个需要解决的。这块的内容其实已经在 分布式(一) 搞定服务注册与发现 中详细讲过了。所有的 push-server 在启动时候需要将自身的信息注册到 Zookeeper 中。注册鉴权 模块会订阅 Zookeeper 中的节点,从而可以获取最新的服务列表。结构如下:以下是一些伪代码:应用启动注册 Zookeeper。对于注册鉴权模块来说只需要订阅这个 Zookeeper 节点:路由策略既然能获取到所有的服务列表,那如何选择一台刚好合适的 push-server 给客户端使用呢?这个过程重点要考虑以下几点:尽量保证各个节点的连接均匀。增删节点是否要做 Rebalance。首先保证均衡有以下几种算法:轮询。挨个将各个节点分配给客户端。但会出现新增节点分配不均匀的情况。Hash 取模的方式。类似于 HashMap,但也会出现轮询的问题。当然也可以像 HashMap 那样做一次 Rebalance,让所有的客户端重新连接。不过这样会导致所有的连接出现中断重连,代价有点大。由于 Hash 取模方式的问题带来了一致性 Hash算法,但依然会有一部分的客户端需要 Rebalance。权重。可以手动调整各个节点的负载情况,甚至可以做成自动的,基于监控当某些节点负载较高就自动调低权重,负载较低的可以提高权重。还有一个问题是:当我们在重启部分应用进行升级时,在该节点上的客户端怎么处理?由于我们有心跳机制,当心跳不通之后就可以认为该节点出现问题了。那就得重新请求注册鉴权模块获取一个可用的节点。在弱网情况下同样适用。如果这时客户端正在发送消息,则需要将消息保存到本地等待获取到新的节点之后再次发送。有状态连接在这样的场景中不像是 HTTP 那样是无状态的,我们得明确的知道各个客户端和连接的关系。在上文的单机版中我们将这个关系保存到本地的缓存中,但在分布式环境中显然行不通了。比如在平台向客户端推送消息的时候,它得首先知道这个客户端的通道保存在哪台节点上。借助我们以前的经验,这样的问题自然得引入一个第三方中间件用来存放这个关系。也就是架构图中的存放路由关系的 Redis,在客户端接入 push-server 时需要将当前客户端唯一标识和服务节点的 ip+port 存进 Redis。同时在客户端下线时候得在 Redis 中删掉这个连接关系。这样在理想情况下各个节点内存中的 map 关系加起来应该正好等于 Redis 中的数据。伪代码如下:这里存放路由关系的时候会有并发问题,最好是换为一个 lua 脚本。推送路由设想这样一个场景:管理员需要给最近注册的客户端推送一个系统消息会怎么做?结合架构图假设这批客户端有 10W 个,首先我们需要将这批号码通过平台下的 Nginx 下发到一个推送路由中。为了提高效率甚至可以将这批号码再次分散到每个 push-route 中。拿到具体号码之后再根据号码的数量启动多线程的方式去之前的路由 Redis 中获取客户端所对应的 push-server。再通过 HTTP 的方式调用 push-server 进行真正的消息下发(Netty 也很好的支持 HTTP 协议)。推送成功之后需要将结果更新到数据库中,不在线的客户端可以根据业务再次推送等。消息流转也许有些场景对于客户端上行的消息非常看重,需要做持久化,并且消息量非常大。在 push-sever 做业务显然不合适,这时完全可以选择 Kafka 来解耦。将所有上行的数据直接往 Kafka 里丢后就不管了。再由消费程序将数据取出写入数据库中即可。其实这块内容也很值得讨论,可以先看这篇了解下:强如 Disruptor 也发生内存溢出?后续谈到 Kafka 再做详细介绍。分布式问题分布式解决了性能问题但却带来了其他麻烦。应用监控比如如何知道线上几十个 push-server 节点的健康状况?这时就得监控系统发挥作用了,我们需要知道各个节点当前的内存使用情况、GC。以及操作系统本身的内存使用,毕竟 Netty 大量使用了堆外内存。同时需要监控各个节点当前的在线数,以及 Redis 中的在线数。理论上这两个数应该是相等的。这样也可以知道系统的使用情况,可以灵活的维护这些节点数量。日志处理日志记录也变得异常重要了,比如哪天反馈有个客户端一直连不上,你得知道问题出在哪里。最好是给每次请求都加上一个 traceID 记录日志,这样就可以通过这个日志在各个节点中查看到底是卡在了哪里。以及 ELK 这些工具都得用起来才行。总结本次是结合我日常经验得出的,有些坑可能在工作中并没有踩到,所有还会有一些遗漏的地方。就目前来看想做一个稳定的推送系统其实是比较麻烦的,其中涉及到的点非常多,只有真正做过之后才会知道。看完之后觉得有帮助的还请不吝转发分享。欢迎关注公众号一起交流: ...

September 25, 2018 · 1 min · jiezi

消息队列二三事

最近在看kafka的代码,就免不了想看看消息队列的一些要点:服务质量(QOS)、性能、扩展性等等,下面一一探索这些概念,并谈谈在特定的消息队列如kafka或者mosquito中是如何具体实现这些概念的。服务质量服务语义服务质量一般可以分为三个级别,下面说明它们不同语义。At most once至多一次,消息可能丢失,但绝不会重复传输。生产者:完全依赖底层TCP/IP的传输可靠性,不做特殊处理,所谓“发送即忘”。kafka中设置acks=0。消费者:先保存消费进度,再处理消息。kafka中设置消费者自动提交偏移量并设置较短的提交时间间隔。At least once至少一次,消息绝不会丢,但是可能会重复。生产者:要做消息防丢失的保证。kafka中设置acks=1 或 all并设置retries>0。消费者:先处理消息,再保存消费进度。kafka中设置消费者自动提交偏移量并设置很长的提交时间间隔,或者直接关闭自动提交偏移量,处理消息后手动调用同步模式的偏移量提交。Exactly once精确一次,每条消息肯定会被传输一次且仅一次。这个级别光靠消息队列本身并不好保证,有可能要依赖外部组件。生产者:要做消息防丢失的保证。kafka中设置acks=1 或 all并设置retries>0。mosquito中通过四步握手与DUP、MessageID等标识来实现单次语义。消费者:要做消息防重复的保证,有多种方案,如:在保存消费进度和处理消息这两个操作中引入两阶段提交协议;让消息幂等;让消费处理与进度保存处于一个事务中来保证原子性。kafka中关闭自动提交偏移量,并设置自定义的再平衡监听器,监听到分区发生变化时从外部组件读取或者存储偏移量,保证自己或者其他消费者在更换分区时能读到最新的偏移量从而避免重复。总之就是结合ConsumerRebalanceListener、seek和一个外部系统(如支持事务的数据库)共同来实现单次语义。此外,kafka还提供了GUID以便用户自行实现去重。kafka 0.11版本通过3个大的改动支持EOS:1.幂等的producer;2. 支持事务;3. 支持EOS的流式处理(保证读-处理-写全链路的EOS)。这三个级别可靠性依次增加,但是延迟和带宽占用也会增加,所以实际情况中,要依据业务类型做出权衡。可靠性上面的三个语义不仅需要生产者和消费者的配合实现,还要broker本身的可靠性来进行保证。可靠性就是只要broker向producer发出确认,就一定要保证这个消息可以被consumer获取。kafka 中一个topic有多个partition,每个partition又有多个replica,所有replica中有一个leader,ISR是一定要同步leader后才能返回提交成功的replica集,OSR内的replica尽力的去同步leader,可能数据版本会落后。在kafka工作的过程中,如果某个replica同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中。可以配置min.insync.replicas指定ISR中的replica最小数量,默认该值为1。LEO是分区的最新数据的offset,当数据写入leader后,LEO就立即执行该最新数据,相当于最新数据标识位。HW是当写入的数据被同步到所有的ISR中的副本后,数据才认为已提交,HW更新到该位置,HW之前的数据才可以被消费者访问,保证没有同步完成的数据不会被消费者访问到,相当于所有副本同步数据标识位。每个partition的所有replica需要进行leader选举(依赖ZooKeeper)。在leader宕机后,只能从ISR列表中选取新的leader,无论ISR中哪个副本被选为新的leader,它都知道HW之前的数据,可以保证在切换了leader后,消费者可以继续看到HW之前已经提交的数据。当ISR中所有replica都宕机该partition就不可用了,可以设置unclean.leader.election.enable=true,该选项使得kafka选择任何一个活的replica成为leader然后继续工作,此replica可能不在ISR中,就可能导致数据丢失。所以实际使用中需要进行可用性与可靠性的权衡。kafka建议数据可靠存储不依赖于数据强制刷盘(会影响整体性能),而是依赖于replica。顺序消费顺序消费是指消费者处理消息的顺序与生产者投放消息的顺序一致。主要可能破坏顺序的场景是生产者投放两条消息AB,然后A失败重投递导致消费者拿到的消息是BA。kafka中能保证分区内部消息的有序性,其做法是设置max.in.flight.requests.per.connection=1,也就是说生产者在未得到broker对消息A的确认情况下是不会发送消息B的,这样就能保证broker存储的消息有序,自然消费者请求到的消息也是有序的。但是我们明显能感觉到这会降低吞吐量,因为消息不能并行投递了,而且会阻塞等待,也没法发挥 batch 的威力。如果想要整个topic有序,那就只能一个topic一个partition了,一个consumer group也就只有一个consumer了。这样就违背了kafka高吞吐的初衷。重复消费重复消费是指一个消息被消费者重复消费了。 这个问题也是上面第三个语义需要解决的。一般的消息系统如kafka或者类似的rocketmq都不能也不提倡在系统内部解决,而是配合第三方组件,让用户自己去解决。究其原因还是解决问题的成本与解决问题后获得的价值不匹配,所以干脆不解决,就像操作系统对待死锁一样,采取“鸵鸟政策”。但是kafka 0.11还是处理了这个问题,见发行说明,维护者是想让用户无可挑剔嘛 [笑cry]。性能衡量一个消息系统的性能有许多方面,最常见的就是下面几个指标。连接数是指系统在同一时刻能支持多少个生产者或者消费者的连接总数。连接数和broker采用的网络IO模型直接相关,常见模型有:单线程、连接每线程、Reactor、Proactor等。单线程一时刻只能处理一个连接,连接每线程受制于server的线程数量,Reactor是目前主流的高性能网络IO模型,Proactor由于操作系统对真异步的支持不太行所以尚未流行。kafka的broker采用了类似于Netty的Reactor模型:1(1个Acceptor线程)+N(N个Processor线程)+M(M个Work线程)。其中Acceptor负责监听新的连接请求,同时注册OPACCEPT事件,将新的连接按照RoundRobin的方式交给某个Processor线程处理。每个Processor都有一个NIO selector,向 Acceptor分配的 SocketChannel 注册 OPREAD、OPWRITE事件,对socket进行读写。N由num.networker.threads决定。Worker负责具体的业务逻辑如:从requestQueue中读取请求、数据存储到磁盘、把响应放进responseQueue中等等。M的大小由num.io.threads决定。Reactor模型一般基于IO多路复用(如select,epoll),是非阻塞的,所以少量的线程能处理大量的连接。如果大量的连接都是idle的,那么Reactor使用epoll的效率是杠杠的,如果大量的连接都是活跃的,此时如果没有Proactor的支持就最好把epoll换成select或者poll。具体做法是-Djava.nio.channels.spi.SelectorProvider把sun.nio.ch包下面的EPollSelectorProvider换成PollSelectorProvider。QPS是指系统每秒能处理的请求数量。QPS通常可以体现吞吐量(该术语很广,可以用TPS/QPS、PV、UV、业务数/小时等单位体现)的大小。kafka中由于可以采用 batch 的方式(还可以压缩),所以每秒钟可以处理的请求很多(因为减少了解析量、网络往复次数、磁盘IO次数等)。另一方面,kafka每一个topic都有多个partition,所以同一个topic下可以并行(注意不是并发哟)服务多个生产者和消费者,这也提高了吞吐量。平均响应时间平均响应时间是指每个请求获得响应需要的等待时间。kafka中处理请求的瓶颈(也就是最影响响应时间的因素)最有可能出现在哪些地方呢?网络? 有可能,但是这个因素总体而言不是kafka能控制的,kafka可以对消息进行编码压缩并批量提交,减少带宽占用;磁盘? 很有可能,所以kafka从分利用OS的pagecache,并且对磁盘采用顺序写,这样能大大提升磁盘的写入速度。同时kafka还使用了零拷贝技术,把普通的拷贝过程:disk->read buffer->app buffer->socket buffer->NIC buffer 中,内核buffer到用户buffer的拷贝过程省略了,加快了处理速度。此外还有文件分段技术,每个partition都分为多个segment,避免了大文件操作的同时提高了并行度。CPU? 不大可能,因为消息队列的使用并不涉及大量的计算,常见消耗有线程切换、编解码、压缩解压、内存拷贝等,这些在大数据处理中一般不是瓶颈。并发数是指系统同时能处理的请求数量数。一般而言,QPS = 并发数/平均响应时间 或者说 并发数 = QPS*平均响应时间。 这个参数一般只能估计或者计算,没法直接测。顾名思义,机器性能越好当然并发数越高咯。此外注意用上多线程技术并且提高代码的并行度、优化IO模型、减少减少内存分配和释放等手段都是可以提高并发数的。扩展性消息系统的可扩展性是指要为系统组件添加的新的成员的时候比较容易。kafka中扩展性的基石就是topic采用的partition机制。第一,Kafka允许Partition在cluster中的Broker之间移动,以此来解决数据倾斜问题。第二,支持自定义的Partition算法,比如你可以将同一个Key的所有消息都路由到同一个Partition上去(来获得顺序)。第三,partition的所有replica通过ZooKeeper来进行集群管理,可以动态增减副本。第四,partition也支持动态增减。对于producer,不存在扩展问题,只要broker还够你连接就行。对于consumer,一个consumer group中的consumer可以增减,但是最好不要超过一个topic的partition数量,因为多余的consumer并不能提升处理速度,一个partition在同一时刻只能被一个consumer group中的一个consumer消费代码上的可扩展性就属于设计模式的领域了,这里不谈。参考《kafka技术内幕》Kafka的存储机制以及可靠性Kafka 0.11.0.0 是如何实现 Exactly-once 语义的查看原文,来自mageekchiu。总结不到位的地方请不吝赐教。

August 30, 2018 · 1 min · jiezi

Kafka源码系列之源码分析zookeeper在kafka的作用

以kafka0.8.2.2源码为例给大家进行讲解的。纯属个人爱好,希望大家对不足之处批评指正。__一,zookeeper在分布式集群的作用____1,数据发布与订阅(配置中心)__发布与订阅模型,即所谓的配置中心,顾名思义就是讲发布者将数据发布到zk节点上,共订阅者动态获取数据,实现配置的集中式管理和动态更新。例如,全局的配置信息,服务服务框架的地址列表就非常适合使用。__2,负载均衡__即软件负载均衡。最典型的是消息中间件的生产、消费者负载均衡。 ...

May 30, 2018 · 1 min · jiezi

kafka [B cannot be cast to java.lang.String

自己写了一个Kafka发送消息的demo,但是发送消息的时候,却报了kafka [B cannot be cast to java.lang.String的错误,后来找到了解决办法原来是因为在定义config文件的时候,针对 serializer.class部分,错误的当成了StringEncoder,其实修改成默认的encoder就行了 props.put("serializer.class", "kafka.serializer.DefaultEncoder");

October 24, 2017 · 1 min · jiezi