关于java:图解Kafka消费者分区分配策略

30次阅读

共计 6977 个字符,预计需要花费 18 分钟才能阅读完成。

1. 调配策略的作用

咱们在剖析生产者的时候有专门写过文章剖析生产者的分区调配策略

Kafka 生产者的 3 种分区策略

生成者的调配策略是把咱们产生的音讯抉择一个适合的分区去发送,

那么明天咱们要解说一下 消费者的分区调配策略 他要做的事件是

同一个生产组中 给不同消费者调配可能生产的分区数;

同一个生产组中, 一个分区只会被一个消费者生产。

2. 调配策略的抉择

2.1 调配策略配置

每个生产组客户端都能够配置一个 partition.assignment.strategy 属性 并且能够配置多个本人反对的调配策略,例如:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RoundRobinAssignor

默认策略是 org.apache.kafka.clients.consumer.RoundRobinAssignor

2.2 抉择适合的策略

既然每个客户端成员都能够配置多个本人反对的调配策略,  那么 GroupCoordinator(生产组协调器) 应用哪个调配策略去调配这些资源呢?

必定是须要生产组上面的所有成员都应用同一种调配策略来进行调配。所以 GroupCoordinator 就面临着抉择哪个调配策略。

抉择的逻辑如下

  1. 抉择所有 Member 都反对的调配策略
  2. 在 1 的根底上, 优先选择每个 partition.assignment.strategy 配置靠前的策略。

请看上面的 2 个例子

case consumer-0 consumer-1 consumer-2 选中策略
case-1 roundrobin,rang rang,roundrobin,strick roundrobin,rang roundrobin
case-2 strick,roundrobin,rang rang,roundrobin strick ,rang rang

Case-1

  1. 所有反对的调配策略为:roundrobin,rang
  2. 每个 consumer 都在 1 的根底上, 给本人排最后面的投票, consumer- 0 投 roundrobin, consumer- 1 投 rang, consumer- 3 投 roundrobin;这样算下来 roundrobin 是有 2 票的, 那么就抉择 roundrobin 为调配策略;

Case-2

  1. 所有反对的调配策略为:rang
  2. 都不必投票, 间接抉择 rang 入选

如果新 Member 退出 Group 的时候, 带上的调配策略跟现有 Group 中所有 Member(Group 有 Member 的状况下)都反对的协定都不穿插

那么就会抛出异样:INCONSISTENT_GROUP_PROTOCOL

[2022-09-08 14:34:12,508] INFO [Consumer clientId=client2, groupId=consumer0] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
[2022-09-08 14:34:12,511] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)

这个协定的抉择的代码逻辑在 GroupMetadata#selectProtocol

调用的机会是以后发动 JoinGroup 的 Member 都实现 JoinGroup,并调用onCompleteJoin

具体详情能够看 :  Kafka 消费者 JoinGroupRequest 流程解析

3. 调配策略计算和流传

3.1 调配策略计算机会

既然咱们曾经晓得了分区调配策略的抉择, 那么什么时候会触发这个策略的逻辑计算呢?

如果你有看过之前的文章:Kafka 消费者 JoinGroupRequest 流程解析 那么对此就必定会有肯定的理解

当所有的 Member(成员)发动 JoinGroup 申请, 并且组协调器 (GroupCoordinator) 也都解决失常, 就会回调以后发动 JoinGroup 申请的 Member(成员)

其中有个最特地的就是, 组协调器 (GroupCoordinator) 会把所有的 Member(成员)的元信息打包一并返回给那个 Leader Member, 而 Follow Member 是不会返回的。

Leader Member 承受到回调并拿到这个元信息之后, 就开始去计算每个成员应该被调配到的分区。

代码定位

ConsumerCoordinator#performAssignment

@Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                        String assignmentStrategy,
                                                        List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        if (assignor == null)
            throw new IllegalStateException("Coordinator selected invalid assignment protocol:" + assignmentStrategy);

        // 省略局部代码...
        
        // 更新一下所有订阅的 Topic 的元信息
        // 如果有变更的元信息则更新一下
        updateGroupSubscription(allSubscribedTopics);

         // 省略局部代码...

        Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();

        if (protocol == RebalanceProtocol.COOPERATIVE) {validateCooperativeAssignment(ownedPartitions, assignments);
        }

          // 省略局部代码...
    }

下面的代码次要是 依据调配策略, 获取调配策略实例, 而后调用 assign办法进行计算, 失去调配形式。

然而最终调用的计算逻辑是每个 AbstractPartitionAssignor 实现类的 assign 办法。

并且也能够实现自定义的调配策略. 只须要实现接口 AbstractPartitionAssignor 就行。

3.2 调配策略流传

在 3.1 调配策略计算机会 中咱们晓得调配策略的计算机会, 那么计算好了之后如何告知其余的 Member, 他们对应的调配状态呢?

当每个 Member 收到 JoinGroup 的回调之后, 他们会发动一个SyncGroupRequest, 其中 Leader Member 就会把刚刚计算好的调配策略, 一起当做入参发动申请。请看下图

下面发动的申请也只是告知了 组协调器 (GroupCoordinator) 调配的状况, 最终还是须要 组协调器 (GroupCoordinator) 来告知每个 Member 的。

那么这个告知的过程就是所有 Member 都同步实现后的回调;

具体请看:KafkaConsumer SyncGroupRequest 详解

4. 图解所有调配策略

下面所有的铺垫都解说分明了,那么目前 Kafka 反对哪些调配策略呢?

咱们来一一剖析一下

4.1 RangeAssignor 范畴分区调配策略

partition.assignment.strategy=]org.apache.kafka.clients.consumer.RangeAssignor

这也是默认的调配策略

它是以单个 Topic 为一个维度来计算调配的, 他只负责将每一个 Topic 的分区尽可能平衡的调配给消费者

  1. 生产组外面所有消费者 (Member) 依照字母排序,给 Topic 的分区依照分区号排序。
  2. 先计算每个分区起码平均分配多少个分区数, 而后余下的一一分 举个例子:Topic 为 Topic1 有 11 个分区;有 3 个消费者订阅 那么均匀每个 11/3= 3 余 2, 那么后面两个能够分到 4 个分区,最初一个分到 3 个;[4, 4, 3]

    他们最终调配形式如下

    | 消费者 |

    Member:client1-ba0ebe99-cd09-42e9-87b9-11b6f828bfca Topic1-0, Topic1-1, Topic1-2, Topic1-3
    Member:client2-cbfb4cf2-c878-41d2-852c-86d56dbb99c2 Topic1-4, Topic1-5, Topic1-6, Topic1-7
    Member:client3-ad60e7a5-204f-4741-b66f-3da3acb0a2f9 Topic1-8, Topic1-9, Topic1-10

    调配是先分完一个消费者再调配下一个的, 跟遍历是有区别。clientId-1 先分到 [0 , 1 , 2 , 3] 号分区, 前面的接着分。[图片上传失败 …(image-83ca39-1663553888622)]

图外面的 Member 就是消费者, 对生产组来说他外部的对象是 Member

Range 弊病

Range 针对单个 Topic 的状况下显得比拟平衡,然而如果 Topic 很多的话, Member 排序靠前的可能会比 Member 排序靠后的负载多很多。

看, 像这种状况, 3 个 Member 都订阅了这 4 个 Topic, 可是 Member 这么多分区愣是没有调配到 1 个

4.2 RoundRobinAssignor 轮询分区策略

把所有 Member 排序, 所有 TopicPartition 排序。轮训遍历调配

Member- 3 下线


RoundRobin 的一些弊病

如果成员订阅的 Topic 不尽相同的时候, 最终后果也不可能会齐全平衡的。

如果图中的 Memner- 3 比另外两个多订阅了 Topic-4,那他总共就生产了 6 个分区了, 然而另外两个别离只生产了 2 个分区。

如果这里的 Member- 3 把分区 Topic2-0、Topic3-1 分给另外两个那才是最平衡的状况。

那么有什么策略能解决这个问题吗?接下来咱们另外一个分区策略 — 粘性分区

4.3 StickyAssignor 粘性分区策略

下面介绍的两种分区调配形式, 多多少少都会有一些调配上的偏差,  而且每次重新分配的时候都是把所有的都从新来计算并调配一遍, 那么每次调配的后果都会偏差很多, 如果咱们在计算的时候可能思考上一次的分配情况, 来尽量的缩小调配的变动, 不失为一种优化计划。

咱们之前在讲生产者的时候也讲过粘性分区: 

Kafka 生产者的 3 种分区策略

那么消费者的粘性分区策略是什么样子的呢?

指标:

  1. 分区的调配尽量的平衡
  2. 每一次重调配的后果尽量与上一次调配后果保持一致

当这两个指标发生冲突时,优先保障第一个指标。第一个指标是每个调配算法都尽量尝试去实现的,而第二个指标才真正体现出 StickyAssignor 个性的。

首先, StickyAssignor 粘性分区 在进行调配的时候, 是以 RoundRobinAssignor 的调配逻辑来计算的, 然而它又补救了 RoundRobinAssignor 的一些可能造成不平衡的弊病。

比方在讲 RoundRobinAssignor 弊病的那种 case, 然而在 StickyAssignor 中就是下图的分配情况

把 RoundRobinAssignor 的弊病给优化了

体现粘性分区中央就在于重新分配的时候了, 还是下面的 case(上图左边的 StickAssignor), 如果 Member-2 离线了

粘性分区的计算形式把把离线的那个 Member 所属的分区调配给其余的 Member, 在其余的 Member 已领有的分区不变的前提下, 尽量的平衡。

Member-2 有 3 个分区, 能够分两个分区给 Member-1,分 1 个分区给 Member-3  最终调配图如下:

4.4 CooperativeStickyAssignor 策略

下面剖析的 StickyAssignor 粘性分区策略, 次要作用是保障消费者客户端在重均衡之后可能维持本来的调配计划。

然而 StickyAssignor 还是属于 RebalanceProtocol.EAGER  协定, 重均衡的时候须要每个客户端都要先放弃以后持有的资源。

为了解决这个问题, 所以就有了 CooperativeStickyAssignor调配策略

你能够了解为 CooperativeStickyAssignor 的调配策略跟 StickyAssignor 的策略差不多。

然而它在此基础上是用的 RebalanceProtocol.COOPERATIVE 协定。渐进式的重均衡。

后续专门写一篇文章来解说一下这一块内容, 挖个坑 0.0

4.5 自定义调配策略

咱们先看一下分区策略的类图

咱们想要自定义调配策略, 只须要实现接口:

public interface ConsumerPartitionAssignor {

    /**
     * 返回序列化后的自定义数据
     */
    default ByteBuffer subscriptionUserData(Set<String> topics) {return null;}

    /**
     * 分区调配的计算逻辑
     */
    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);

    /**
     * 当组成员从领导者那里收到其调配时调用的回调
     */
    default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { }

    /**
     * 指明应用的再均衡协定
     * 默认应用 RebalanceProtocol.EAGER 协定, 另外一个可选项为 RebalanceProtocol.COOPERATIVE
     */
    default List<RebalanceProtocol> supportedProtocols() {return Collections.singletonList(RebalanceProtocol.EAGER);
    }

    /**
     * Return the version of the assignor which indicates how the user metadata encodings
     * and the assignment algorithm gets evolved.
     */
    default short version() {return (short) 0;
    }

    /**
     * 分配器的名字
     * 例如 RangeAssignor、RoundRobinAssignor、StickyAssignor、CooperativeStickyAssignor
     * 对应的名字为
     * range、roundrobin、sticky、cooperative-sticky
     */
    String name();

当然咱们也能够依据本人的需要来实现其余的抽象类

比方:AbstractStickyAssignor抽象类就是专门给粘性分区应用的抽象类

5. 重均衡协定

下面咱们讲的是分区策略, 然而分区策略实质上又分为两大类

  1. RebalanceProtocol.EAGER
  2. RebalanceProtocol.COOPERATIVE   合作重均衡,kafak2.4 出的性能。

这两个区别是

EAGER 从新均衡协定要求消费者在参加从新均衡事件之前始终撤销其领有的所有分区。因而,它容许齐全改选调配

COOPERATIVE协定容许消费者在参加再均衡事件之前保留其以后领有的分区。调配者不应该立刻重新分配任何领有的分区,而是能够批示消费者须要撤销分区,以便能够在下一次从新均衡事件中将被撤销的分区重新分配给其余消费者

COOPERATIVE 协定将一次全局重均衡,改成每次小规模重均衡,直至最终收敛均衡的过程。

COOPERATIVE 无效的改良来在此之前 EAGER 协定重均衡而触发的 stop-the-world(STW)

咱们下面讲的调配策略 3 种策略都是 RebalanceProtocol.EAGER  协定

  1. RangeAssignor 范畴分区调配策略
  2. RoundRobinAssignor 轮询分区策略
  3. StickyAssignor 粘性分区策略

CooperativeStickyAssignor 调配策略是应用的 RebalanceProtocol.COOPERATIVE协定

对于更多的对于重均衡协定的解说, 请看: Kafka 重均衡的两种协定解说

正文完
 0