共计 4563 个字符,预计需要花费 12 分钟才能阅读完成。
最近在看 kafka 的代码,就免不了想看看消息队列的一些要点:服务质量(QOS)、性能、扩展性等等,下面一一探索这些概念,并谈谈在特定的消息队列如 kafka 或者 mosquito 中是如何具体实现这些概念的。
服务质量
服务语义
服务质量一般可以分为三个级别,下面说明它们不同语义。
At most once
至多一次,消息可能丢失,但绝不会重复传输。生产者:完全依赖底层 TCP/IP 的传输可靠性,不做特殊处理,所谓“发送即忘”。kafka 中设置 acks=0。消费者:先保存消费进度,再处理消息。kafka 中设置消费者自动提交偏移量并设置较短的提交时间间隔。
At least once
至少一次,消息绝不会丢,但是可能会重复。生产者:要做消息防丢失的保证。kafka 中设置 acks=1 或 all 并设置 retries>0。消费者:先处理消息,再保存消费进度。kafka 中设置消费者自动提交偏移量并设置很长的提交时间间隔,或者直接关闭自动提交偏移量,处理消息后手动调用同步模式的偏移量提交。
Exactly once
精确一次,每条消息肯定会被传输一次且仅一次。这个级别光靠消息队列本身并不好保证,有可能要依赖外部组件。生产者:要做消息防丢失的保证。kafka 中设置 acks=1 或 all 并设置 retries>0。mosquito 中通过四步握手与 DUP、MessageID 等标识来实现单次语义。消费者:要做消息防重复的保证,有多种方案,如:在保存消费进度和处理消息这两个操作中引入两阶段提交协议;让消息幂等;让消费处理与进度保存处于一个事务中来保证原子性。kafka 中关闭自动提交偏移量,并设置自定义的再平衡监听器,监听到分区发生变化时从外部组件读取或者存储偏移量,保证自己或者其他消费者在更换分区时能读到最新的偏移量从而避免重复。总之就是结合 ConsumerRebalanceListener、seek 和一个外部系统(如支持事务的数据库)共同来实现单次语义。此外,kafka 还提供了 GUID 以便用户自行实现去重。kafka 0.11 版本通过 3 个大的改动支持 EOS:1. 幂等的 producer;2. 支持事务;3. 支持 EOS 的流式处理 (保证读 - 处理 - 写全链路的 EOS)。这三个级别可靠性依次增加,但是延迟和带宽占用也会增加,所以实际情况中,要依据业务类型做出权衡。
可靠性
上面的三个语义不仅需要生产者和消费者的配合实现,还要 broker 本身的可靠性来进行保证。可靠性就是只要 broker 向 producer 发出确认,就一定要保证这个消息可以被 consumer 获取。
kafka 中一个 topic 有多个 partition,每个 partition 又有多个 replica,所有 replica 中有一个 leader,ISR 是一定要同步 leader 后才能返回提交成功的 replica 集,OSR 内的 replica 尽力的去同步 leader,可能数据版本会落后。在 kafka 工作的过程中,如果某个 replica 同步速度慢于 replica.lag.time.max.ms 指定的阈值,则被踢出 ISR 存入 OSR,如果后续速度恢复可以回到 ISR 中。可以配置 min.insync.replicas 指定 ISR 中的 replica 最小数量,默认该值为 1。LEO 是分区的最新数据的 offset,当数据写入 leader 后,LEO 就立即执行该最新数据,相当于最新数据标识位。HW 是当写入的数据被同步到所有的 ISR 中的副本后,数据才认为已提交,HW 更新到该位置,HW 之前的数据才可以被消费者访问,保证没有同步完成的数据不会被消费者访问到,相当于所有副本同步数据标识位。
每个 partition 的所有 replica 需要进行 leader 选举(依赖 ZooKeeper)。在 leader 宕机后,只能从 ISR 列表中选取新的 leader,无论 ISR 中哪个副本被选为新的 leader,它都知道 HW 之前的数据,可以保证在切换了 leader 后,消费者可以继续看到 HW 之前已经提交的数据。当 ISR 中所有 replica 都宕机该 partition 就不可用了,可以设置 unclean.leader.election.enable=true,该选项使得 kafka 选择任何一个活的 replica 成为 leader 然后继续工作,此 replica 可能不在 ISR 中,就可能导致数据丢失。所以实际使用中需要进行可用性与可靠性的权衡。
kafka 建议数据可靠存储不依赖于数据强制刷盘(会影响整体性能),而是依赖于 replica。
顺序消费
顺序消费是指消费者处理消息的顺序与生产者投放消息的顺序一致。主要可能破坏顺序的场景是生产者投放两条消息 AB,然后 A 失败重投递导致消费者拿到的消息是 BA。
kafka 中能保证分区内部消息的有序性,其做法是设置 max.in.flight.requests.per.connection=1,也就是说生产者在未得到 broker 对消息 A 的确认情况下是不会发送消息 B 的,这样就能保证 broker 存储的消息有序,自然消费者请求到的消息也是有序的。但是我们明显能感觉到这会降低吞吐量,因为消息不能并行投递了,而且会阻塞等待,也没法发挥 batch 的威力。如果想要整个 topic 有序,那就只能一个 topic 一个 partition 了,一个 consumer group 也就只有一个 consumer 了。这样就违背了 kafka 高吞吐的初衷。
重复消费
重复消费是指一个消息被消费者重复消费了。这个问题也是上面第三个语义需要解决的。
一般的消息系统如 kafka 或者类似的 rocketmq 都不能也不提倡在系统内部解决,而是配合第三方组件,让用户自己去解决。究其原因还是解决问题的成本与解决问题后获得的价值不匹配,所以干脆不解决,就像操作系统对待死锁一样,采取“鸵鸟政策”。但是 kafka 0.11 还是处理了这个问题,见发行说明,维护者是想让用户无可挑剔嘛 [笑 cry]。
性能
衡量一个消息系统的性能有许多方面,最常见的就是下面几个指标。
连接数
是指系统在同一时刻能支持多少个生产者或者消费者的连接总数。连接数和 broker 采用的网络 IO 模型直接相关,常见模型有:单线程、连接每线程、Reactor、Proactor 等。单线程一时刻只能处理一个连接,连接每线程受制于 server 的线程数量,Reactor 是目前主流的高性能网络 IO 模型,Proactor 由于操作系统对真异步的支持不太行所以尚未流行。
kafka 的 broker 采用了类似于 Netty 的 Reactor 模型:1(1 个 Acceptor 线程)+N(N 个 Processor 线程)+M(M 个 Work 线程)。其中 Acceptor 负责监听新的连接请求,同时注册 OPACCEPT 事件,将新的连接按照 RoundRobin 的方式交给某个 Processor 线程处理。每个 Processor 都有一个 NIO selector,向 Acceptor 分配的 SocketChannel 注册 OPREAD、OPWRITE 事件,对 socket 进行读写。N 由 num.networker.threads 决定。Worker 负责具体的业务逻辑如:从 requestQueue 中读取请求、数据存储到磁盘、把响应放进 responseQueue 中等等。M 的大小由 num.io.threads 决定。
Reactor 模型一般基于 IO 多路复用(如 select,epoll),是非阻塞的,所以少量的线程能处理大量的连接。如果大量的连接都是 idle 的,那么 Reactor 使用 epoll 的效率是杠杠的,如果大量的连接都是活跃的,此时如果没有 Proactor 的支持就最好把 epoll 换成 select 或者 poll。具体做法是 -Djava.nio.channels.spi.SelectorProvider 把 sun.nio.ch 包下面的 EPollSelectorProvider 换成 PollSelectorProvider。
QPS
是指系统每秒能处理的请求数量。QPS 通常可以体现吞吐量(该术语很广,可以用 TPS/QPS、PV、UV、业务数 / 小时等单位体现)的大小。
kafka 中由于可以采用 batch 的方式(还可以压缩),所以每秒钟可以处理的请求很多(因为减少了解析量、网络往复次数、磁盘 IO 次数等)。另一方面,kafka 每一个 topic 都有多个 partition,所以同一个 topic 下可以并行(注意不是并发哟)服务多个生产者和消费者,这也提高了吞吐量。
平均响应时间
平均响应时间是指每个请求获得响应需要的等待时间。
kafka 中处理请求的瓶颈(也就是最影响响应时间的因素)最有可能出现在哪些地方呢?网络?有可能,但是这个因素总体而言不是 kafka 能控制的,kafka 可以对消息进行编码压缩并批量提交,减少带宽占用;磁盘?很有可能,所以 kafka 从分利用 OS 的 pagecache,并且对磁盘采用顺序写,这样能大大提升磁盘的写入速度。同时 kafka 还使用了零拷贝技术,把普通的拷贝过程:disk->read buffer->app buffer->socket buffer->NIC buffer 中,内核 buffer 到用户 buffer 的拷贝过程省略了,加快了处理速度。此外还有文件分段技术,每个 partition 都分为多个 segment,避免了大文件操作的同时提高了并行度。CPU?不大可能,因为消息队列的使用并不涉及大量的计算,常见消耗有线程切换、编解码、压缩解压、内存拷贝等,这些在大数据处理中一般不是瓶颈。
并发数
是指系统同时能处理的请求数量数。一般而言,QPS = 并发数 / 平均响应时间 或者说 并发数 = QPS* 平均响应时间。
这个参数一般只能估计或者计算,没法直接测。顾名思义,机器性能越好当然并发数越高咯。此外注意用上多线程技术并且提高代码的并行度、优化 IO 模型、减少减少内存分配和释放等手段都是可以提高并发数的。
扩展性
消息系统的可扩展性是指要为系统组件添加的新的成员的时候比较容易。
kafka 中扩展性的基石就是 topic 采用的 partition 机制。第一,Kafka 允许 Partition 在 cluster 中的 Broker 之间移动,以此来解决数据倾斜问题。第二,支持自定义的 Partition 算法,比如你可以将同一个 Key 的所有消息都路由到同一个 Partition 上去(来获得顺序)。第三,partition 的所有 replica 通过 ZooKeeper 来进行集群管理,可以动态增减副本。第四,partition 也支持动态增减。
对于 producer,不存在扩展问题,只要 broker 还够你连接就行。对于 consumer,一个 consumer group 中的 consumer 可以增减,但是最好不要超过一个 topic 的 partition 数量,因为多余的 consumer 并不能提升处理速度,一个 partition 在同一时刻只能被一个 consumer group 中的一个 consumer 消费
代码上的可扩展性就属于设计模式的领域了,这里不谈。
参考
《kafka 技术内幕》Kafka 的存储机制以及可靠性 Kafka 0.11.0.0 是如何实现 Exactly-once 语义的
查看原文,来自 mageekchiu。总结不到位的地方请不吝赐教。