共计 12291 个字符,预计需要花费 31 分钟才能阅读完成。
一、为什么须要音讯零碎
- 1. 解耦:
容许你独立的扩大或批改两边的处理过程,只有确保它们恪守同样的接口束缚。
- 2. 冗余:
音讯队列把数据进行长久化直到它们曾经被齐全解决,通过这一形式躲避了数据失落危险。许多音讯队列所采纳的 ” 插入 - 获取 - 删除 ” 范式中,在把一个音讯从队列中删除之前,须要你的解决零碎明确的指出该音讯曾经被处理完毕,从而确保你的数据被平安的保留直到你应用结束。
- 3. 扩展性:
因为音讯队列解耦了你的处理过程,所以增大音讯入队和解决的频率是很容易的,只有另外减少处理过程即可。
- 4. 灵活性 & 峰值解决能力:
在访问量剧增的状况下,利用依然须要持续发挥作用,然而这样的突发流量并不常见。如果为以能解决这类峰值拜访为规范来投入资源随时待命无疑是微小的节约。应用音讯队列可能使要害组件顶住突发的拜访压力,而不会因为突发的超负荷的申请而齐全解体。
- 5. 可恢复性:
零碎的一部分组件生效时,不会影响到整个零碎。音讯队列升高了过程间的耦合度,所以即便一个解决音讯的过程挂掉,退出队列中的音讯依然能够在零碎复原后被解决。
- 6. 程序保障:
在大多应用场景下,数据处理的程序都很重要。大部分音讯队列原本就是排序的,并且能保证数据会依照特定的程序来解决。(Kafka 保障一个 Partition 内的音讯的有序性)
- 7. 缓冲:
有助于管制和优化数据流通过零碎的速度,解决生产音讯和生产音讯的处理速度不统一的状况。
- 8. 异步通信:
很多时候,用户不想也不须要立刻解决音讯。音讯队列提供了异步解决机制,容许用户把一个音讯放入队列,但并不立刻解决它。想向队列中放入多少音讯就放多少,而后在须要的时候再去解决它们。
二、kafka 架构
2.1 拓扑构造
如下图:
图.1
2.2 相干概念
如图.1 中,kafka 相干名词解释如下:
1.producer:音讯生产者,公布音讯到 kafka 集群的终端或服务。2.broker:kafka 集群中蕴含的服务器。3.topic:每条公布到 kafka 集群的音讯属于的类别,即 kafka 是面向 topic 的。4.partition:partition 是物理上的概念,每个 topic 蕴含一个或多个 partition。kafka 调配的单位是 partition。5.consumer:从 kafka 集群中生产音讯的终端或服务。6.Consumer group:high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条音讯只能被 consumer group 中的一个 Consumer 生产,但能够被多个 consumer group 生产。7.replica:partition 的正本,保障 partition 的高可用。8.leader:replica 中的一个角色,producer 和 consumer 只跟 leader 交互。9.follower:replica 中的一个角色,从 leader 中复制数据。10.controller:kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。11.zookeeper:kafka 通过 zookeeper 来存储集群的 meta 信息。
2.3 zookeeper 节点
kafka 在 zookeeper 中的存储构造如下图所示:
图.2
三、producer 公布音讯
3.1 写入形式
producer 采纳 push 模式将音讯公布到 broker,每条音讯都被 append 到 patition 中,属于程序写磁盘(程序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
3.2 音讯路由
producer 发送音讯到 broker 时,会依据分区算法抉择将其存储到哪一个 partition。其路由机制为:
- 指定了 patition,则间接应用;
- 未指定 patition 但指定 key,通过对 key 的 value 进行 hash 选出一个 patition
- patition 和 key 都未指定,应用轮询选出一个 patition。
附上 java 客户端分区源码,高深莫测:
// 创立音讯实例
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException("Invalid timestamp" + timestamp);
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
}
// 计算 patition,如果指定了 patition 则间接应用,否则应用 key 计算
private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();
if (partition != null) {List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int lastPartition = partitions.size() - 1;
if (partition < 0 || partition > lastPartition) {throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
}
return partition;
}
return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
// 应用 key 选取 patition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();} else {return DefaultPartitioner.toPositive(nextValue) % numPartitions;
}
} else {
// 对 keyBytes 进行 hash 选出一个 patition
return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
3.3 写入流程
producer 写入音讯序列图如下所示:
图.3
流程阐明:
- producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
- producer 将音讯发送给该 leader
- leader 将音讯写入本地 log
- followers 从 leader pull 音讯,写入本地 log 后 leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 后,减少 HW(high watermark,最初 commit 的 offset)并向 producer 发送 ACK
3.4 producer delivery guarantee
个别状况下存在三种状况:
- At most once 音讯可能会丢,但绝不会反复传输
- At least one 音讯绝不会丢,但可能会反复传输
- Exactly once 每条音讯必定会被传输一次且仅传输一次
当 producer 向 broker 发送音讯时,一旦这条音讯被 commit,因为 replication 的存在,它就不会丢。然而如果 producer 发送数据给 broker 后,遇到网络问题而造成通信中断,那 Producer 就无奈判断该条音讯是否曾经 commit。尽管 Kafka 无奈确定网络故障期间产生了什么,然而 producer 能够生成一种相似于主键的货色,产生故障时幂等性的重试屡次,这样就做到了 Exactly once,但目前还并未实现。所以目前默认状况下一条音讯从 producer 到 broker 是确保了 At least once,可通过设置 producer 异步发送实现 At most once。
四、broker 保留音讯
4.1 存储形式
物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有音讯和索引文件),如下:
图.4
4.2 存储策略
无论音讯是否被生产,kafka 都会保留所有音讯。有两种策略能够删除旧数据:
- 基于工夫:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
须要留神的是,因为 Kafka 读取特定音讯的工夫复杂度为 O(1),即与文件大小无关,所以这里删除过期文件与进步 Kafka 性能无关。
4.3 topic 创立与删除
4.3.1 创立 topic
创立 topic 的序列图如下所示:
图.5
流程阐明:
- controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创立,则 controller 会通过 watch 失去该 topic 的 partition/replica 调配。
- controller 从 /brokers/ids 读取以后所有可用的 broker 列表,对于 set_p 中的每一个 partition:
2.1、从调配给该 partition 的所有 replica(称为 AR)中任选一个可用的 broker 作为新的 leader,并将 AR 设置为新的 ISR
2.2、将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
- controller 通过 RPC 向相干的 broker 发送 LeaderAndISRRequest。
4.3.2 删除 topic
删除 topic 的序列图如下所示:
图.6
流程阐明:
- controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 失去该 topic 的 partition/replica 调配。
- 若 delete.topic.enable=false,完结;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。
五、kafka HA
5.1 replication
如图.1 所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的状况下,一旦 broker 宕机,其上所有 patition 的数据都不可被生产,同时 producer 也不能再将数据存于其上的 patition。引入 replication 之后,同一个 partition 可能会有多个 replica,而这时须要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。
Kafka 调配 Replica 的算法如下:
-
- 将所有 broker(假如共 n 个 broker)和待调配的 partition 排序
-
- 将第 i 个 partition 调配到第(i mod n)个 broker 上
-
- 将第 i 个 partition 的第 j 个 replica 调配到第((i + j) mode n)个 broker 上
5.2 leader failover
当 partition 对应的 leader 宕机时,须要从 follower 中选举出新 leader。在选举新 leader 时,一个根本的准则是,新的 leader 必须领有旧 leader commit 过的所有音讯。
kafka 在 zookeeper 中(/brokers/…/state)动静保护了一个 ISR(in-sync replicas),由 3.3 节的写入流程可知 ISR 外面的所有 replica 都跟上了 leader,只有 ISR 外面的成员能力选为 leader。对于 f+1 个 replica,一个 partition 能够在容忍 f 个 replica 生效的状况下保障音讯不失落。
当所有 replica 都不工作时,有两种可行的计划:
-
- 期待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不失落,但工夫可能绝对较长。
-
- 抉择第一个活过来的 replica(不肯定是 ISR 成员)作为 leader。无奈保障数据不失落,但绝对不可用工夫较短。
kafka 0.8.* 应用第二种形式。
kafka 通过 Controller 来选举 leader,流程请参考 5.3 节。
5.3 broker failover
kafka broker failover 序列图如下所示:
图.7
流程阐明:
- controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
- controller 从 /brokers/ids 节点读取可用 broker
- controller 决定 set_p,该汇合蕴含宕机 broker 上的所有 partition
- 对 set_p 中的每一个 partition
- 4.1 从 /brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
- 4.2 决定新 leader(如 4.3 节所形容)
- 4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
- 通过 RPC 向相干 broker 发送 leaderAndISRRequest 命令
5.4 controller failover
当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 “/controller” 节点注册 watcher,当 controller 宕机时 zookeeper 中的长期节点隐没,所有存活的 broker 收到 fire 的告诉,每个 broker 都尝试创立新的 controller path,只有一个竞选胜利并入选为 controller。
当新的 controller 入选时,会触发 KafkaController.onControllerFailover 办法,在该办法中实现如下操作:
1. 读取并减少 Controller Epoch。2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。6. 通过 replicaStateMachine 在 Broker Ids Patch(/brokers/ids) 上注册 Watch。7. 初始化 ControllerContext 对象,设置以后所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR 等。8. 启动 replicaStateMachine 和 partitionStateMachine。9. 将 brokerState 状态设置为 RunningAsController。10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。11. 若 auto.leader.rebalance.enable=true(默认值是 true),则启动 partition-rebalance 线程。12. 若 delete.topic.enable=true 且 Delete Topic Patch(/admin/delete_topics) 中有值,则删除相应的 Topic。
6. consumer 生产音讯
6.1 consumer API
kafka 提供了两套 consumer API:
-
- The high-level Consumer API
-
- The SimpleConsumer API
其中 high-level consumer API 提供了一个从 kafka 生产数据的高层形象,而 SimpleConsumer API 则须要开发人员更多地关注细节。
6.1.1 The high-level consumer API
high-level consumer API 提供了 consumer group 的语义,一个音讯只能被 group 内的一个 consumer 所生产,且 consumer 生产音讯时不关注 offset,最初一个 offset 由 zookeeper 保留。
应用 high-level consumer API 能够是多线程的利用,该当留神:
-
- 如果生产线程大于 patition 数量,则有些线程将收不到音讯
-
- 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的音讯
-
- 如果一个线程生产多个 patition,则无奈保障你收到的音讯的程序,而一个 patition 内的音讯是有序的
6.1.2 The SimpleConsumer API
如果你想要对 patition 有更多的控制权,那就应该应用 SimpleConsumer API,比方:
-
- 屡次读取一个音讯
-
- 只生产一个 patition 中的局部音讯
-
- 应用事务来保障一个音讯仅被生产一次
然而应用此 API 时,partition、offset、broker、leader 等对你不再通明,须要本人去治理。你须要做大量的额定工作:
-
- 必须在应用程序中跟踪 offset,从而确定下一条应该生产哪条音讯
-
- 应用程序须要通过程序获知每个 Partition 的 leader 是谁
-
- 须要解决 leader 的变更
应用 SimpleConsumer API 的个别流程如下:
-
- 查找到一个“活着”的 broker,并且找出每个 partition 的 leader
-
- 找出每个 partition 的 follower
-
- 定义好申请,该申请应该能形容应用程序须要哪些数据
-
- fetch 数据
-
- 辨认 leader 的变动,并对之作出必要的响应
以下针对 high-level Consumer API 进行阐明。
6.2 consumer group
如 2.2 节所说,kafka 的调配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所生产(也就保障了一个音讯只能被 group 内的一个 consuemr 所生产),然而多个 group 能够同时生产这个 partition。
kafka 的设计指标之一就是同时实现离线解决和实时处理,依据这一个性,能够应用 spark/Storm 这些实时处理系统对音讯在线解决,同时应用 Hadoop 批处理零碎进行离线解决,还能够将数据备份到另一个数据中心,只须要保障这三者属于不同的 consumer group。如下图所示:
图.8
6.3 生产形式
consumer 采纳 pull 模式从 broker 中读取数据。
push 模式很难适应生产速率不同的消费者,因为音讯发送速率是由 broker 决定的。它的指标是尽可能以最快速度传递音讯,然而这样很容易造成 consumer 来不及解决音讯,典型的体现就是拒绝服务以及网络拥塞。而 pull 模式则能够依据 consumer 的生产能力以适当的速率生产音讯。
对于 Kafka 而言,pull 模式更适合,它可简化 broker 的设计,consumer 可自主管制生产音讯的速率,同时 consumer 能够本人管制生产形式——即可批量生产也可逐条生产,同时还能抉择不同的提交形式从而实现不同的传输语义。
6.4 consumer delivery guarantee
如果将 consumer 设置为 autocommit,consumer 一旦读到数据立刻主动 commit。如果只探讨这一读取音讯的过程,那 Kafka 确保了 Exactly once。
但理论应用中应用程序并非在 consumer 读取完数据就完结了,而是要进行进一步解决,而数据处理与 commit 的程序在很大水平上决定了 consumer delivery guarantee:
- 1. 读完音讯先 commit 再解决音讯。
这种模式下,如果 consumer 在 commit 后还没来得及解决音讯就 crash 了,下次从新开始工作后就无奈读到刚刚已提交而未解决的音讯,这就对应于 At most once
- 2. 读完音讯先解决再 commit。
这种模式下,如果在解决完音讯之后 commit 之前 consumer crash 了,下次从新开始工作时还会解决刚刚未 commit 的音讯,实际上该音讯曾经被解决过了。这就对应于 At least once。
- 3. 如果肯定要做到 Exactly once,就须要协调 offset 和实际操作的输入。
精典的做法是引入两阶段提交。如果能让 offset 和操作输出存在同一个中央,会更简洁和通用。这种形式可能更好,因为许多输入零碎可能不反对两阶段提交。比方,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据自身一起写到 HDFS,那就能够保证数据的输入和 offset 的更新要么都实现,要么都不实现,间接实现 Exactly once。(目前就 high-level API 而言,offset 是存于 Zookeeper 中的,无奈存于 HDFS,而 SimpleConsuemr API 的 offset 是由本人去保护的,能够将之存于 HDFS 中)
总之,Kafka 默认保障 At least once,并且容许通过设置 producer 异步提交来实现 At most once(见文章《kafka consumer 避免数据失落》)。而 Exactly once 要求与内部存储系统合作,侥幸的是 kafka 提供的 offset 能够十分间接非常容易得应用这种形式。
6.5 consumer rebalance
当有 consumer 退出或退出、以及 partition 的扭转(如 broker 退出或退出)时会触发 rebalance。consumer rebalance 算法如下:
-
- 将指标 topic 下的所有 partirtion 排序,存于 PT
-
- 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个 consumer 记为 Ci
-
- N=size(PT)/size(CG),向上取整
-
- 解除 Ci 对原来调配的 partition 的生产权(i 从 0 开始)
-
- 将第 i N 到(i+1)N- 1 个 partition 调配给 Ci
在 0.8.* 版本,每个 consumer 都只负责调整本人所生产的 partition,为了保障整个 consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。这会导致以下几个问题:
- 1.Herd effect
任何 broker 或者 consumer 的增减都会触发所有的 consumer 的 rebalance
- 2.Split Brain
每个 consumer 别离独自通过 zookeeper 判断哪些 broker 和 consumer 宕机了,那么不同 consumer 在同一时刻从 zookeeper 看到的 view 就可能不一样,这是由 zookeeper 的个性决定的,这就会造成不正确的 reblance 尝试。
-
- 调整后果不可控
所有的 consumer 都并不知道其它 consumer 的 rebalance 是否胜利,这可能会导致 kafka 工作在一个不正确的状态。
- 调整后果不可控
基于以上问题,kafka 设计者思考在 0.9.* 版本开始应用核心 coordinator 来管制 consumer rebalance,而后又从简便性和验证要求两方面思考,打算在 consumer 客户端实现调配计划。(见文章《Kafka Detailed Consumer Coordinator Design》和《Kafka Client-side Assignment Proposal》),此处不再赘述。
七、注意事项
7.1 producer 无奈发送音讯的问题
最开始在本机搭建了 kafka 伪集群,本地 producer 客户端胜利公布音讯至 broker。随后在服务器上搭建了 kafka 集群,在本机连贯该集群,producer 却无奈公布音讯到 broker(奇怪也没有抛错)。最开始狐疑是 iptables 没凋谢,于是凋谢端口,后果还不行(又开始是代码问题、版本问题等等,倒腾了很久)。最初没方法,一项一项查看 server.properties 配置,发现以下两个配置:
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
以上说的就是 advertised.listeners 是 broker 给 producer 和 consumer 连贯应用的,如果没有设置,就应用 listeners,而如果 host_name 没有设置的话,就应用 java.net.InetAddress.getCanonicalHostName() 办法返回的主机名。
批改办法:
- listeners=PLAINTEXT://121.10.26.XXX:9092
- advertised.listeners=PLAINTEXT://121.10.26.XXX:9092
批改后重启服务,失常工作。
作者:cyfonly
出处:http://www.cnblogs.com/cyfonly/