1. 调配策略的作用

咱们在剖析生产者的时候有专门写过文章剖析生产者的分区调配策略

Kafka生产者的3种分区策略

生成者的调配策略是把咱们产生的音讯抉择一个适合的分区去发送,

那么明天咱们要解说一下 消费者的分区调配策略 他要做的事件是

同一个生产组中 给不同消费者调配可能生产的分区数;

同一个生产组中,一个分区只会被一个消费者生产。

2. 调配策略的抉择

2.1 调配策略配置

每个生产组客户端都能够配置一个partition.assignment.strategy属性 并且能够配置多个本人反对的调配策略,例如:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RoundRobinAssignor

默认策略是 org.apache.kafka.clients.consumer.RoundRobinAssignor

2.2 抉择适合的策略

既然每个客户端成员都能够配置多个本人反对的调配策略,  那么GroupCoordinator(生产组协调器)应用哪个调配策略去调配这些资源呢?

必定是须要生产组上面的所有成员都应用同一种调配策略来进行调配。所以GroupCoordinator就面临着抉择哪个调配策略。

抉择的逻辑如下

  1. 抉择所有Member都反对的调配策略
  2. 在1的根底上,优先选择每个partition.assignment.strategy配置靠前的策略。

请看上面的2个例子

caseconsumer-0consumer-1consumer-2选中策略
case-1roundrobin,rangrang,roundrobin,strickroundrobin,rangroundrobin
case-2strick,roundrobin,rangrang,roundrobinstrick ,rangrang

Case-1

  1. 所有反对的调配策略为:roundrobin,rang
  2. 每个consumer都在1的根底上,给本人排最后面的投票, consumer-0投roundrobin, consumer-1投rang, consumer-3投roundrobin;这样算下来 roundrobin是有2票的, 那么就抉择roundrobin为调配策略;

Case-2

  1. 所有反对的调配策略为:rang
  2. 都不必投票, 间接抉择rang入选

如果新Member退出Group的时候, 带上的调配策略跟现有Group中所有Member(Group有Member的状况下)都反对的协定都不穿插

那么就会抛出异样:INCONSISTENT_GROUP_PROTOCOL

[2022-09-08 14:34:12,508] INFO [Consumer clientId=client2, groupId=consumer0] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.[2022-09-08 14:34:12,511] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)

这个协定的抉择的代码逻辑在 GroupMetadata#selectProtocol

调用的机会是以后发动JoinGroup的Member都实现JoinGroup,并调用onCompleteJoin

具体详情能够看 :  Kafka消费者JoinGroupRequest流程解析

3. 调配策略计算和流传

3.1 调配策略计算机会

既然咱们曾经晓得了分区调配策略的抉择, 那么什么时候会触发这个策略的逻辑计算呢?

如果你有看过之前的文章: Kafka消费者JoinGroupRequest流程解析 那么对此就必定会有肯定的理解

当所有的Member(成员)发动JoinGroup申请, 并且组协调器(GroupCoordinator)也都解决失常,就会回调以后发动JoinGroup申请的Member(成员)

其中有个最特地的就是, 组协调器(GroupCoordinator)会把所有的Member(成员)的元信息打包一并返回给那个Leader Member, 而Follow Member是不会返回的。

Leader Member 承受到回调并拿到这个元信息之后, 就开始去计算每个成员应该被调配到的分区。

代码定位

ConsumerCoordinator#performAssignment

@Override    protected Map<String, ByteBuffer> performAssignment(String leaderId,                                                        String assignmentStrategy,                                                        List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {        ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);        if (assignor == null)            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);        //省略局部代码...                // 更新一下所有订阅的Topic的元信息        // 如果有变更的元信息则更新一下        updateGroupSubscription(allSubscribedTopics);         //省略局部代码...        Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();        if (protocol == RebalanceProtocol.COOPERATIVE) {            validateCooperativeAssignment(ownedPartitions, assignments);        }          //省略局部代码...    }

下面的代码次要是 依据调配策略,获取调配策略实例, 而后调用 assign办法进行计算,失去调配形式。

然而最终调用的计算逻辑是每个AbstractPartitionAssignor实现类的assign办法。

并且也能够实现自定义的调配策略.只须要实现接口AbstractPartitionAssignor就行。

3.2 调配策略流传

在 3.1 调配策略计算机会 中咱们晓得调配策略的计算机会, 那么计算好了之后如何告知其余的Member, 他们对应的调配状态呢?

当每个Member收到JoinGroup的回调之后, 他们会发动一个SyncGroupRequest, 其中Leader Member就会把刚刚计算好的调配策略, 一起当做入参发动申请。请看下图

下面发动的申请也只是告知了组协调器(GroupCoordinator)调配的状况, 最终还是须要组协调器(GroupCoordinator)来告知每个Member的。

那么这个告知的过程就是所有Member都同步实现后的回调 ;

具体请看:KafkaConsumer SyncGroupRequest详解

4. 图解所有调配策略

下面所有的铺垫都解说分明了,那么目前Kafka反对哪些调配策略呢?

咱们来一一剖析一下

4.1 RangeAssignor 范畴分区调配策略

partition.assignment.strategy=]org.apache.kafka.clients.consumer.RangeAssignor

这也是默认的调配策略

它是以单个Topic为一个维度来计算调配的, 他只负责将每一个Topic的分区尽可能平衡的调配给消费者

  1. 生产组外面所有消费者(Member)依照字母排序, 给Topic的分区依照分区号排序。
  2. 先计算每个分区起码平均分配多少个分区数, 而后余下的一一分 举个例子:Topic为Topic1 有11个分区;有3个消费者订阅 那么均匀每个 11/3=3余2, 那么后面两个能够分到4个分区,最初一个分到3个;[ 4, 4, 3 ]

    他们最终调配形式如下

    | 消费者 |

    Member:client1-ba0ebe99-cd09-42e9-87b9-11b6f828bfcaTopic1-0, Topic1-1, Topic1-2, Topic1-3
    Member:client2-cbfb4cf2-c878-41d2-852c-86d56dbb99c2Topic1-4, Topic1-5, Topic1-6, Topic1-7
    Member:client3-ad60e7a5-204f-4741-b66f-3da3acb0a2f9Topic1-8, Topic1-9, Topic1-10

    调配是先分完一个消费者再调配下一个的,跟遍历是有区别。clientId-1 先分到 [ 0 , 1 , 2 , 3 ] 号分区, 前面的接着分。[图片上传失败...(image-83ca39-1663553888622)]

图外面的Member就是消费者, 对生产组来说他外部的对象是Member

Range弊病

Range针对单个Topic的状况下显得比拟平衡, 然而如果Topic很多的话, Member排序靠前的可能会比Member排序靠后的负载多很多。

看,像这种状况, 3个Member都订阅了这4个Topic, 可是Member这么多分区愣是没有调配到1个

4.2 RoundRobinAssignor 轮询分区策略

把所有Member排序, 所有TopicPartition排序。轮训遍历调配

Member-3下线


RoundRobin的一些弊病

如果成员订阅的Topic不尽相同的时候, 最终后果也不可能会齐全平衡的。

如果图中的Memner-3比另外两个多订阅了Topic-4,那他总共就生产了6个分区了, 然而另外两个别离只生产了2个分区。

如果这里的Member-3把分区 Topic2-0、Topic3-1 分给另外两个那才是最平衡的状况。

那么有什么策略能解决这个问题吗?接下来咱们另外一个分区策略 -- 粘性分区

4.3 StickyAssignor 粘性分区策略

下面介绍的两种分区调配形式,多多少少都会有一些调配上的偏差,  而且每次重新分配的时候都是把所有的都从新来计算并调配一遍, 那么每次调配的后果都会偏差很多, 如果咱们在计算的时候可能思考上一次的分配情况,来尽量的缩小调配的变动,不失为一种优化计划。

咱们之前在讲生产者的时候也讲过粘性分区: 

Kafka生产者的3种分区策略

那么消费者的粘性分区策略是什么样子的呢?

指标:

  1. 分区的调配尽量的平衡
  2. 每一次重调配的后果尽量与上一次调配后果保持一致

当这两个指标发生冲突时,优先保障第一个指标。第一个指标是每个调配算法都尽量尝试去实现的,而第二个指标才真正体现出StickyAssignor个性的。

首先, StickyAssignor粘性分区在进行调配的时候,是以RoundRobinAssignor的调配逻辑来计算的,然而它又补救了RoundRobinAssignor的一些可能造成不平衡的弊病。

比方在讲RoundRobinAssignor弊病的那种case, 然而在StickyAssignor中就是下图的分配情况

把RoundRobinAssignor的弊病给优化了

体现粘性分区中央就在于重新分配的时候了, 还是下面的case(上图左边的StickAssignor), 如果 Member-2 离线了

粘性分区的计算形式把把离线的那个Member所属的分区调配给其余的Member, 在其余的Member已领有的分区不变的前提下,尽量的平衡。

Member-2 有3个分区, 能够分两个分区给Member-1,分1个分区给Member-3  最终调配图如下:

4.4 CooperativeStickyAssignor策略

下面剖析的StickyAssignor粘性分区策略,次要作用是保障消费者客户端在重均衡之后可能维持本来的调配计划。

然而StickyAssignor还是属于 RebalanceProtocol.EAGER  协定, 重均衡的时候须要每个客户端都要先放弃以后持有的资源。

为了解决这个问题, 所以就有了 CooperativeStickyAssignor调配策略

你能够了解为 CooperativeStickyAssignor 的调配策略跟StickyAssignor的策略差不多。

然而它在此基础上是用的RebalanceProtocol.COOPERATIVE协定。渐进式的重均衡。

后续专门写一篇文章来解说一下这一块内容,挖个坑0.0

4.5 自定义调配策略

咱们先看一下分区策略的类图

咱们想要自定义调配策略,只须要实现接口:

public interface ConsumerPartitionAssignor {    /**     * 返回序列化后的自定义数据     */    default ByteBuffer subscriptionUserData(Set<String> topics) {        return null;    }    /**     * 分区调配的计算逻辑     */    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);    /**     * 当组成员从领导者那里收到其调配时调用的回调     */    default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {    }    /**     * 指明应用的再均衡协定     * 默认应用RebalanceProtocol.EAGER协定, 另外一个可选项为 RebalanceProtocol.COOPERATIVE     */    default List<RebalanceProtocol> supportedProtocols() {        return Collections.singletonList(RebalanceProtocol.EAGER);    }    /**     * Return the version of the assignor which indicates how the user metadata encodings     * and the assignment algorithm gets evolved.     */    default short version() {        return (short) 0;    }    /**     * 分配器的名字     * 例如 RangeAssignor、RoundRobinAssignor、StickyAssignor、CooperativeStickyAssignor     * 对应的名字为     * range、roundrobin、sticky、cooperative-sticky     */    String name();

当然咱们也能够依据本人的需要来实现其余的抽象类

比方:AbstractStickyAssignor抽象类就是专门给粘性分区应用的抽象类

5. 重均衡协定

下面咱们讲的是分区策略, 然而分区策略实质上又分为两大类

  1. RebalanceProtocol.EAGER
  2. RebalanceProtocol.COOPERATIVE   合作重均衡,kafak2.4出的性能。

这两个区别是

EAGER 从新均衡协定要求消费者在参加从新均衡事件之前始终撤销其领有的所有分区。因而,它容许齐全改选调配

COOPERATIVE协定容许消费者在参加再均衡事件之前保留其以后领有的分区。调配者不应该立刻重新分配任何领有的分区,而是能够批示消费者须要撤销分区,以便能够在下一次从新均衡事件中将被撤销的分区重新分配给其余消费者

COOPERATIVE协定将一次全局重均衡,改成每次小规模重均衡,直至最终收敛均衡的过程。

COOPERATIVE无效的改良来在此之前EAGER协定重均衡而触发的stop-the-world(STW)

咱们下面讲的调配策略3种策略都是 RebalanceProtocol.EAGER  协定

  1. RangeAssignor 范畴分区调配策略
  2. RoundRobinAssignor 轮询分区策略
  3. StickyAssignor 粘性分区策略

CooperativeStickyAssignor调配策略是应用的 RebalanceProtocol.COOPERATIVE协定

对于更多的对于重均衡协定的解说,请看: Kafka 重均衡的两种协定解说