关于源码分析:SOFARegistry-源码|数据分片之核心路由表-SlotTable-剖析

45次阅读

共计 23224 个字符,预计需要花费 59 分钟才能阅读完成。

文|程征征(花名:泽睿 )

高德软件开发工程师

负责高德新场景业务摸索开发与保护 对畛域驱动、网络通讯、数据一致性有肯定的钻研与实际

本文 23009 字 浏览约 25 分钟

第一次关注 SOFA 社区是在开发一个故障剔除组件时,发现 SOFARPC 中也有相似的组件。在 SOFARPC 的设计中,入口采纳了一种无缝插入的设计形式,使得在不毁坏凋谢关闭准则前提下,引入单机故障剔除能力。并且是基于内核设计和总线设计,做到可插拔、零侵入,整个故障剔除模块是通过 SPI 动静加载的。统计信息的收集也是通过事件驱动的形式,在 RPC 同步或异步调用实现后,会向事件总线 EventBus 发送对应事件。事件总线接管到对应的事件,以执行后续的故障剔除逻辑。

基于以上优良的设计,我也将其纳为己用,也因而开启了在 SOFA 社区的开源摸索之路。陆续钻研了 SOFABoot、SOFARPC 以及 MOSN 等,自我感觉每一个我的项目的代码程度都很高,对我本人的代码晋升有很大的帮忙。

SOFARegistry 是一个开源的注册核心提供了服务的公布注册订阅等性能,反对海量的服务注册订阅申请。作为一个名源码爱好者,尽管看过 SOFA 的架构文章大抵理解其中的设计哲学,然而因为没有从代码中理解过细节,实际上也是只知其一; 不知其二。恰好借助 SOFARegistry 开拓的源码剖析流动,基于本人的趣味抉择了 SlotTable 这个工作。

SOFARegistry 对于服务数据是分片进行存储的,因而每一个 data server 只会承当一部分的服务数据,具体哪份数据存储在哪个 data server 是有一个称为 SlotTable 的路由表提供的,session 能够通过 SlotTable 对对应的 data derver 进行读写服务数据,slot 对应的 data follower 能够通过 SlotTable 寻址 leader 进行数据同步。

保护 SlotTable 是由 Meta 的 leader 负责的,Meta 会保护 data 的列表,会利用这份列表以及 data 上报的监控数据创立 SlotTable,后续 data 的高低线会触发 Meta 批改 SlotTable,SlotTable 会通过心跳分发给集群中各个节点。

贡献者前言

SOFARegistry 对于服务数据是分片进行存储的,因而每一个 data server 只会承当一部分的服务数据,具体哪份数据存储在哪个 data server 是有一个称为 SlotTable 的路由表提供的,session 能够通过 SlotTable 对对应的 data derver 进行读写服务数据,slot 对应的 data follower 能够通过 SlotTable 寻址 leader 进行数据同步。

保护 SlotTable 是由 Meta 的 leader 负责的,Meta 会保护 data 的列表,会利用这份列表以及 data 上报的监控数据创立 SlotTable,后续 data 的高低线会触发 Meta 批改 SlotTable,SlotTable 会通过心跳分发给集群中各个节点。

SlotTable 在 SOFARegistry 是十分外围的概念,简称路由表。简略来说 SOFARegistry 是须要将公布订阅数据存储在不同的机器节点上,能力保证数据存储的横向扩大。那么不同机器上到底存储哪些数据,就是 SlotTable 来保留的。

SlotTable 保留了 Slot 和机器节点之间的映射关系,数据通过 Hash 定位到某一个 Slot 上,通过 Slot 找到对应的机器 node 节点,将数据存储到对应的机器上。在这过程中有很多细节须要咱们理解。比如说每一个 Slot 对应 leader 和 follow 节点是如何调配的。如果机器负载不均衡该如何均衡,SlotTable 的更新是如何进行的呢? 这些都是很乏味的细节实现。

1. DataServer 更新 SlotTable 路由表过程。

如上图所示 session 和 data 节点定时会向 Meta 节点上报心跳、Meta 节点保护了 data 以及 session 节点列表信息、并且在心跳申请中将返回 SlotTable 路由表信息、data 节点将路由表 SlotTable 保留在本地中。

2. SlotTable 更新均衡算法

由前文可知、SOFARegistry 采纳了数据分片存储在 DataServer 节点之上、那么随之而来的问题就是数据如何分片呢?

SOFARegistry 采纳预调配的形式。

传统的一致性 Hash 算法有数据分布范畴不固定的个性,该个性使得服务注册数据在服务器节点宕机、下线、扩容之后,须要从新存储排布,这为数据的同步带来了艰难。大多数的数据同步操作是利用操作日志记录的内容来进行的,传统的一致性 Hash 算法中,数据的操作日志是以节点分片来划分的,节点变动导致数据分布范畴的变动。

在计算机领域,大多数难题都能够通过减少一个中间层来解决,那么对于数据分布范畴不固定所导致的数据同步难题,也能够通过同样的思路来解决。

这里的问题在于,当节点下线后,若再以以后存活节点 ID 一致性 Hash 值去同步数据,就会导致已失效节点的数据操作日志无奈获取到,既然数据存储在会变动的中央无奈进行数据同步,那么如果把数据存储在不会变动的中央是否就能保证数据同步的可行性呢?答案是必定的,这个中间层就是预分片层,通过把数据与预分片这个不会变动的层互相对应就能解决这个数据同步的难题。

目前业界次要代表我的项目如 Dynamo、Casandra、Tair、Codis、Redis cluster 等,都采纳了预分片机制来实现这个不会变动的层。

当时将数据存储范畴等分为 N 个 slot 槽位,数据间接与 slot 绝对应,数据的操作日志与相应的 solt 对应,slot 的数目不会因为节点的高低线而产生变动,由此保障了数据同步的可行性。除此之外,还须要引进“路由表”的概念,如图 13,“路由表”负责寄存每个节点和 N 个 slot 的映射关系,并保障尽量把所有 slot 平均地调配给每个节点。这样,当节点高低线时,只须要批改路由表内容即可。放弃 slot 不变,即保障了弹性扩缩容,也大大降低了数据同步的难度。

实际上上述 Slot节点 的映射关系在源码中以 SlotTable 和 Slot 的形式进行表白。源码如下代码块所示。


public final class SlotTable implements Serializable {public static final SlotTable INIT = new SlotTable(-1, Collections.emptyList());
  // 最初一次更新的工夫 epoch
  private final long epoch;
  // 保留了 所有的 slot 信息;slotId ---> slot 对象的映射
  private final Map<Integer, Slot> slots;
}
public final class Slot implements Serializable, Cloneable {
  public enum Role {
    Leader,
    Follower,
  }
  private final int id;
  // 以后 slot 的 leader 节点
  private final String leader;
  // 最近更新工夫
  private final long leaderEpoch;
  // 以后 slot 的 follow 节点
  private final Set<String> followers;
}

因为节点在动态变化中、所以 Slot 和 节点的映射也在时刻变动中、那么咱们接下来的重点就是 SlotTable 的变更过程。SlotTable 的变更是在 Meta 节点中触发、当有服务高低线的时候会触发 SlotTable 的变更、除此之外也会定期执执行 SlotTable 的变更。

SlotTable 的整个同步更新步骤如图所示。

代码参考
com.alipay.sofa.registry.server.Meta.slot.arrange.ScheduledSlotArranger#arrangeSync.

SlotTable 的定期变更是通过在初始化 ScheduledSlotArranger 时候实例化守护线程一直的 定期执行 外部工作 Arranger 的 arrangeSync 办法来实现 SlotTable 变更的。大抵流程如下所示。

因为负责 SlotTable 的更新是在 MetaServer 中的主节点更新的。

所以更新 SlotTable 的第一步就是判断是否是主节点。主节点才负责真正的 SlotTable 变更步骤。

第二步是获取最新的 DataServer 节点,因为 重新分配 SlotTable 实质上是 对 DataServer 节点和 slot 槽位之间的映射关系进行重新分配。所以必定须要获取到以后正在存活的 DataServer 节点信息,从而不便的对之进行 slot 调配。

(这里获取正在存活的 DataServer 也就是有和 MetaServer 维持心跳的 DataServer, 底层是从
com.alipay.sofa.registry.server.Meta.lease.impl.SimpleLeaseManager中获取,感兴趣能够查看相干源码)。

第三部是调配前置校验,实际上一些边界条件的判断、例如 DataServer 是否为空、DataServer 的大小是否大于配置的 minDataNodeNum,只有满足这些条件才进行变更。

第四步 执行 trayArrageSlot 办法、进入到该办法外部之中。

首先获取过程外部锁、实际上是一个 ReentrantLock,这里次要是为了防止定时工作屡次同时执行 SlotTable 的调配工作。

private final Lock lock = new ReentrantLock();

随后便是依据以后的 Data 节点信息创立 SlotTableBuilder、这里的 SlotTableBuilder 又是何方神圣呢?回到 SlotTable 更新的形式、个别是创立一个新的 SlotTable 对象、而后用这个新创建的对象去代替老的 SlotTable 对象、从而实现变更 SlotTable 操作、个别不会间接对老的 SlotTable 间接进行增删该 操作、这样并发导致的一致性问题很难管制。所以基于此、SlotTableBuilder 从它的名称就能够看出 它是 SlotTable 的创建者、外部聚合了 SlotBuilder 对象。其实和 SlotTable 相似的、SlotTable 外部聚合了 Slot 信息。

在查看 SlotTable 变更算法之前、咱们先理解一下 SlotTableBuilder 的创立过程。SlotBuilder 的构造如下所示。

public class SlotTableBuilder {
  // 以后正在创立的 Slot 信息
  private final Map<Integer, SlotBuilder> buildingSlots = Maps.newHashMapWithExpectedSize(256);
  // 反向查问索引数据、通过 节点查问该节点目前负责哪些 slot 的数据的治理。private final Map<String, DataNodeSlot> reverseMap = Maps.newHashMap();
  //slot 槽的个数
  private final int slotNums;
  //follow 节点的数量
  private final int followerNums;
  // 最近一次更新的工夫
  private long epoch;
}

SlotTableBuilder 能够看出外部聚合了一个 buildingSlots、标识正在创立的 Slot。因为 SlotTable 是由 Slot 形成的、这点也很容易了解。除此之外 SlotTableBuilder 外部也聚合了一个 reverseMap,代表反向查问索引,这个映射的 key 是 dataServer、value 是 DataNodeSlot 对象. DataNodeSlot 源码如下。

/**
    通过 Slot 找 leader 和 follows.
    实质上是通过节点找 Slot,以后节点作为 leaders 的 slot、和以以后节点作为 follower 的节点.
    也就是说 以后我这个节点、我在那些 slot 中作为 leader,对应的是 Set<Integer> leaders.
    以及我以后这个节点在哪些 slot 中作为 follow,对应存储在 Set<Integer> follows.
**/
public final class DataNodeSlot  {
  private final String dataNode;
  private final Set<Integer> leaders = Sets.newTreeSet();
  private final Set<Integer> followers = Sets.newTreeSet();}

用一张图来表白 DataNodeSlot 如下所示。可见它和图 1 是刚好相同的映射。通过节点查找 与该节点有关联的 slot 信息、因为前面要常常用到这一层查问、所以间接将这种关系保留下来。为了前面陈说不便、这里统计几种陈说形式。

  1. 节点被作为 leader 的 slot 汇合咱们称为 : 节点 leader 的 slot 汇合。
  2. 节点被作为 follow 的 slot 汇合咱们称为 : 节点 follow 的 slot 汇合。
  3. SlotTable 关联的所有节点统称为: SlotTable 的节点列表

再回到 SlotTableBuilder 创立

private SlotTableBuilder createSlotTableBuilder(SlotTable slotTable, 
                                                List<String> currentDataNodeIps,
                                                int slotNum,int replicas) {
    // 通过 NodeComparator 包装以后新增的、删除的的节点.
    NodeComparator comparator = new NodeComparator(slotTable.getDataServers(), currentDataNodeIps);
    SlotTableBuilder slotTableBuilder = new SlotTableBuilder(slotTable, slotNum, replicas);
    // 执行 slotTableBuilder 的初始化
    slotTableBuilder.init(currentDataNodeIps);

    // 在这里将曾经下线的 data 节点删除掉、// 其中曾经删除的 是通过 NodeComparator 外部的 getDiff 办法 实现的。comparator.getRemoved().forEach(slotTableBuilder::removeDataServerSlots);
    return slotTableBuilder;
}

办法参数 SlotTable 是通过 SlotManager 对象获取到旧的的 SlotTable 对象。currentDataNodeIps 代表以后存活的 dataServer (通过心跳维持和 MetaServer 的连贯) 而后传入 createSlotTableBuilder 办法外部。createSlotTableBuilder 办法外部通过 NodeComparator 对象计算并且包装了 旧的 “SlotTable 的节点列表 ” 与 传入的 currentDataNodeIps 之前的差别值。包含以后 CurrentDataNodeIps 中 新增和删除的 DataServer。随之调用 SlotTableBuilder 的 init 办法。执行 SlotTableBuilder 的初始化。

SlotTableBuilder 的 init 源码如下。

 public void init(List<String> dataServers) {for (int slotId = 0; slotId < slotNums; slotId++) {Slot slot = initSlotTable == null ? null : initSlotTable.getSlot(slotId);
      if (slot == null) {getOrCreate(slotId);
        continue;
      }
     //1. 从新新建一个 SlotBuilder 将原来的 Slot 外面的数据拷贝过去
     //2. 拷贝 leader 节点
      SlotBuilder slotBuilder =
          new SlotBuilder(slotId, followerNums, slot.getLeader(), slot.getLeaderEpoch());
     //3. 拷贝 follow 节点。slotBuilder.addFollower(initSlotTable.getSlot(slotId).getFollowers())
      buildingSlots.put(slotId, slotBuilder);
    }
     //4. 初始化反向查问索引数据、通过 节点查问该节点目前治理哪些 slot
    initReverseMap(dataServers);
  }

由下面的代码能够看出实际上 init 做了这么一件事件: 初始化 SlotBuilder 外部的 slotBuilder 对象、并且将原来旧的 SlotTable 的 leader 和 follow 节点全副拷贝过来了。留神在实例化 SlotTableBuilder 的时候传入了旧的 SlotTable 也就是这里的 initSlotTable 对象。

init 办法最初一步的 initReverseMap 从名称能够看出构建了一个实例化反向路由表、反向查找表、从 Node 节点到 Slot 的查找性能、因为在之后的解决当中常常会用到 某一个 data 节点负责了那些 slot 的 leader 角色、以及哪些 slot 的 follow 角色. 所以这里做了一层索引解决。

再回到 ScheduledSlotArranger 类中 createSlotTableBuilder 办法最初一步,此时 SlotTableBulder 外部曾经实现了 旧的 SlotTable 的数据拷贝。

comparator.getRemoved().forEach(slotTableBuilder::removeDataServerSlots);  

上文咱们说过 comparator 对象外部保留了 新的 dataServer 和旧的 ‘SlotTable 的节点列表 ’ 比拟信息。

所以在新的 dataServer 中曾经删除的节点、咱们须要从 SlotTableBuilder 中删除。外部的删除逻辑也是迭代所有的 SlotBuilder 比拟 leader 和以后节点是否雷同、雷同则删除、follow 同理。

public void removeDataServerSlots(String dataServer) {for (SlotBuilder slotBuilder : buildingSlots.values()) {
      // 删除该 SlotBuilder  follow 节点中的 dataServer
      slotBuilder.removeFollower(dataServer)
      // 如果该 SlotBuilder 的 leader 节点是 dataServer,// 那么设置该 slotBuilder 的 leader 节点为空、须要从新进行调配
      if (dataServer.equals(slotBuilder.getLeader())) {slotBuilder.setLeader(null);
      }
    }
    reverseMap.remove(dataServer);
}

总结来说创立 SlotTableBuilder 的过程就是依据旧的 SlotTable 实例化 SlotTableBuilder (外部的 SlotBuilder)、计算 旧的 ‘SlotTable 的节点列表 ’ 和以后最新的 dataServer 的差别值、更新 SlotTableBuilder 外部的 SlotBuilder 相干的 leader 和 follow 值。

到这一步实际上曾经做完了 SlotTableBuilder 的构建过程。到这里想想接下来该做什么呢?
能够想想,如果咱们触发 SlotTable 重新分配的是某一个 dataA 节点下线了,那么在 slotTableBuilder::removeDataServerSlots 这一步会将咱们正在创立的 SlotTableBuilder 中的 dataA 所治理的 Slot 的 leader 或者 follow 删除掉,那么该 Slot 的 leader 或者 follow 很可能就会变成空。也就是说该 Slot 没有 data 节点解决申请。于是咱们依据以后 SlotBuilder 中是否有为实现调配的 Slot 来决定是否进行重新分配操作, 是否有未实现调配的 Slot 代码块如下。

  public boolean hasNoAssignedSlots() {for (SlotBuilder slotBuilder : buildingSlots.values()) {if (StringUtils.isEmpty(slotBuilder.getLeader())) {
        // 以后 Slot 的 leader 节点为空
        return true;
      }
      if (slotBuilder.getFollowerSize() < followerNums) {
        // 以后 Slot 的 follow 节点的个数小于配置的 followerNums
        return true;
      }
    }
    return false;
  }

创立实现 SlotTableBuilder 并且有没有实现调配的 Slot, 执行真正的调配过程,如下图所示。

由图可知调配过程最初委托给 DefaultSlotAssigner,DefaultSlotAssigner 在构造方法中实例化了 以后正在创立的 SlotTableBuilder /currentDataServers 的视图 /MigrateSlotGroup, 其中 MigrateSlotGroup

外部保留的是那些短少 leader 以及 followSlot

public class MigrateSlotGroup {
   // 哪些 Slot 短少 leader
  private final Set<Integer> leaders = Sets.newHashSet();
  // 哪些 Slot 短少 follow 以及短少的个数
  private final Map<Integer, Integer> lackFollowers = Maps.newHashMap();}

assign 代码如下. 代码中先调配 短少 leader 的 slot、随后调配短少 follow 的 slot

public SlotTable assign() {BalancePolicy balancePolicy = new NaiveBalancePolicy();
    final int ceilAvg =
        MathUtils.divideCeil(slotTableBuilder.getSlotNums(), currentDataServers.size());
    final int high = balancePolicy.getHighWaterMarkSlotLeaderNums(ceilAvg);
    
    // 调配短少 leader 的 slot
    if (tryAssignLeaderSlots(high)) {slotTableBuilder.incrEpoch();
    } 
    // 调配短少 follow 的 slot
    if (assignFollowerSlots()) {slotTableBuilder.incrEpoch();
    } 
    return slotTableBuilder.build();}

leader 节点调配

进入 tryAssignLeaderSlots 办法外部查看具体调配算法细节。通过代码正文的形式来解释具体实现。

private boolean tryAssignLeaderSlots(int highWatermark) {
    // 依照 follows 节点的数量 从大到小排序 0 比拟非凡排在最初面,0 为什么比拟非凡呢、因为无论怎么调配、// 最终抉择进去的 leader 肯定不是该 slot 的 follow、因为该 slot 的 follow 为空
    // 优先安顿 follow 节点比拟少的 Slot
    // 其实这点也能够想明确的。这些没有 leader 的 slot 调配程序必定是要依据 follow 节点越少的优先调配最好
    // 以避免这个 follow 也挂了、那么数据就有可能会失落了。List<Integer> leaders =
        migrateSlotGroup.getLeadersByScore(new FewerFollowerFirstStrategy(slotTableBuilder));
    for (int slotId : leaders) {List<String> currentDataNodes = Lists.newArrayList(currentDataServers);
       // 抉择 nextLeader 节点算法?
      String nextLeader =
          Selectors.slotLeaderSelector(highWatermark, slotTableBuilder, slotId)
              .select(currentDataNodes);
      // 判断 nextLeader 是否是以后 slot 的 follow 节点 将 follow 节点晋升为主节点的。boolean nextLeaderWasFollower = isNextLeaderFollowerOfSlot(slotId, nextLeader);
      // 将以后 slot 的 leader 节点用抉择进去的 nextLeader 替换
      slotTableBuilder.replaceLeader(slotId, nextLeader);
      if (nextLeaderWasFollower) {
        // 因为以后 Slot 将 follow 节点晋升为 leader 节点了、那么该 Slot 必定 follows 个数又不够了、须要再次调配 follow 节点
        migrateSlotGroup.addFollower(slotId);
      }
    }
    return true;
  }

下面调配 leader 代码中外围抉择 nextLeader 办法。

 String nextLeader =
          Selectors.slotLeaderSelector(highWatermark, slotTableBuilder, slotId)
              .select(currentDataNodes);

通过 Selectors 抉择一个 适合的 leader 节点。

持续追踪 DefaultSlotLeaderSelector.select 办法外部。同理咱们采纳代码正文的形式来解释具体实现。

public String select(Collection<String> candidates) {
  //candidates: 以后所有的候选节点,也是 tryAssignLeaderSlots 办法传入的 currentDataServers
  Set<String> currentFollowers = slotTableBuilder.getOrCreate(slotId).getFollowers();
  Collection<String> followerCandidates = Lists.newArrayList(candidates);
  followerCandidates.retainAll(currentFollowers);
  // 通过 followerCandidates.retainAll(currentFollowers)) 之后 followerCandidates 
  // 仅仅保留 以后 Slot 的 follow 节点
  // 并且采取了一个策略是 以后 follow 节点作为其余 Slot 的 leader 起码的优先、// 用直白的话来说。// 以后 follower 越是没有被当做其余 Slot 的 leader 节点、那么
  // 证实他就是越 '闲' 的。必然优先思考抉择它作为 leader 节点。String leader = new LeastLeaderFirstSelector(slotTableBuilder).select(followerCandidates);
  if (leader != null) {DataNodeSlot dataNodeSlot = slotTableBuilder.getDataNodeSlot(leader);
    if (dataNodeSlot.getLeaders().size() < highWaterMark) {return leader;}
  }
  // 从其余的机器中抉择一个,优先选择充当 leader 的 slot 个数起码的那一个 DataServer
  return new LeastLeaderFirstSelector(slotTableBuilder).select(candidates);
}

通过下面 select 办法源码正文置信能够很容易了解 SOFARegistry 的做法。总结来说,就是首先从 以后 slot 的 follow 节点中找出 leader,因为在此状况下不须要做数据迁徙,相当于主节点挂了,晋升备份节点为主节点实现高可用。然而具体抉择哪一个,SOFARegistry 采取的策略是
在所有的 follow 节点中找出最 “ 闲 ” 的那一个,然而如果它所有的 follow 节点作为 leader 节点治理的 Slot 个数大于 highWaterMark,那么证实该 Slot 的所有 follow 节点都太 ” 忙 ” 了,那么就会从全副存活的机器中抉择一个 “ 当作为 leader 节点治理的 Slot 个数 ” 起码的那一个,然而这种状况其实有数据同步开销的。

follow 节点调配

同理通过源码注解形式来详述

  private boolean assignFollowerSlots() {
    // 应用 FollowerEmergentScoreJury 排序得分策略表明
    // 某一个 slot 短少越多 follow、排序越靠前。List<MigrateSlotGroup.FollowerToAssign> followerToAssigns =
        migrateSlotGroup.getFollowersByScore(new FollowerEmergentScoreJury());
    int assignCount = 0;
    for (MigrateSlotGroup.FollowerToAssign followerToAssign : followerToAssigns) {
      // 以后待调配的 slotId
      final int slotId = followerToAssign.getSlotId();
      // 以后 slotId 槽中还有多少待调配的 follow 从节点。顺次迭代调配。for (int i = 0; i < followerToAssign.getAssigneeNums(); i++) {final List<String> candidates = Lists.newArrayList(currentDataServers);
        // 依据上文中的 DataNodeSlot 构造、根据 节点被作为 follow 的 slot 的个数从小到大排序。// follows 个数一样、依照起码作为 leader 节点进行排序。// 其实最终目标就是找到最 "闲" 的那一台机器。candidates.sort(Comparators.leastFollowersFirst(slotTableBuilder));
        boolean assigned = false;
        for (String candidate : candidates) {DataNodeSlot dataNodeSlot = slotTableBuilder.getDataNodeSlot(candidate);
          // 跳过曾经是它的 follow 或者 leader 节点的 Node 节点
          if (dataNodeSlot.containsFollower(slotId) || dataNodeSlot.containsLeader(slotId)) {continue;}
          // 给以后 slotId 增加候选 follow 节点。slotTableBuilder.addFollower(slotId, candidate);
          assigned = true;
          assignCount++;
          break;
        }
      }
    }
    return assignCount != 0;
  }

如之前所述、MigrateSlotGroup 保留了 须要进行重新分配 leader 以及 follow 的 Slot 信息。算法的次要步骤如下。

  1. 找到所有没有足够 follow 的 Slot 信息
  2. 依据 短少 follow 个数越多越优先准则排序
  3. 迭代所有短少 follow 的 Slot 信息 这里是 被 MigrateSlotGroup.FollowerToAssign 包装
  4. 外部循环迭代短少 follow 大小、增加给该 Slot 所需的 follow
  5. 对候选 dataServer 进行排序、依照“闲、忙“成都进行排序
  6. 执行增加 follow 节点

到此、我么曾经给短少 leader 或者 follow 的 Slot 实现了节点调配。

SlotTable 均衡算法

理解完 SlotTable 的变更过程以及算法之后、置信大家对此有了本人的了解。那么 SlotTable 的均衡过程其实也是相似的。详情能够参考源码com.alipay.sofa.registry.server.Meta.slot.balance.DefaultSlotBalancer。

因为在节点的频繁高低线过程中、势必会导致某一些节点的负载 (负责的 slot 治理数量) 过高、某些节点的负载又很低、这样须要一种动态平衡机制来保障节点的绝对负载平衡。

入口在 DefaultSlotBalancer.balance 办法外部

public SlotTable balance() {
    // 均衡 leader 节点
    if (balanceLeaderSlots()) {LOGGER.info("[balanceLeaderSlots] end");
      slotTableBuilder.incrEpoch();
      return slotTableBuilder.build();}
    if (balanceHighFollowerSlots()) {LOGGER.info("[balanceHighFollowerSlots] end");
      slotTableBuilder.incrEpoch();
      return slotTableBuilder.build();}
    if (balanceLowFollowerSlots()) {LOGGER.info("[balanceLowFollowerSlots] end");
      slotTableBuilder.incrEpoch();
      return slotTableBuilder.build();}
    // check the low watermark leader, the follower has balanced
    // just upgrade the followers in low data server
    if (balanceLowLeaders()) {LOGGER.info("[balanceLowLeaders] end");
      slotTableBuilder.incrEpoch();
      return slotTableBuilder.build();}
    return null;
}

因为篇幅限度、这里只剖析 leader 节点均衡过程。如上源码中的 balanceLeaderSlots() 其余过程和 它相似、感兴趣的读者也能够本人查找源码剖析。

进入 balanceLeaderSlots 办法外部。

  private boolean balanceLeaderSlots() {
    // 这里就是找到每一个节点 dataServer 作为 leader 的 slot 个数的最大天花板值 ----> 
    // 容易想到的计划必定是均匀形式、一共有 slotNum 个 slot、// 将这些 slot 的 leader 归属平均分配给 currentDataServer
    final int leaderCeilAvg = MathUtils.divideCeil(slotNum, currentDataServers.size());
    if (upgradeHighLeaders(leaderCeilAvg)) {
      // 如果有替换过 leader、那么就间接返回、不必进行 migrateHighLeaders 操作
      return true;
    }
    if (migrateHighLeaders(leaderCeilAvg)) {
      // 通过下面的 upgradeHighLeaders 操作
      // 不能找到 follow 进行迁徙、因为所有的 follow 也都很忙、在 exclude 当中、// 所以没法找到一个 follow 进行迁徙。那么咱们尝试迁徙 follow。// 因为 highLeader 的所有 follower 都是比较忙、所以须要将这些忙的节点进行迁徙、期待给这些 highLeader 所负责的 slot 替换一些比拟安闲的 follow
      return true;
    }
    return false;
  }

咱们重点关注 upgradeHighLeaders 办法、同理采纳源码注解的形式

 private boolean upgradeHighLeaders(int ceilAvg) {
    //"如果一个节点的 leader 的 slot 个数大于阈值、那么就会用指标 slot 的 follow 节点来替换以后 leader"  最多挪动 maxMove 次数
    final int maxMove = balancePolicy.getMaxMoveLeaderSlots();
    // 了解来说这块能够间接将节点的 leader 个数大于 ceilAvg 的 节点用其余节点替换就能够了、为什么还要再次向上取整呢?
    // 次要是避免 slotTable 呈现抖动,所以设定了触发变更的高低阈值 这里向上取整、是作为一个不均衡阈值来应用、// 就是只针对于不均衡多少 (这个多少能够管制) 的进行再均衡解决
    final int threshold = balancePolicy.getHighWaterMarkSlotLeaderNums(ceilAvg);
    int balanced = 0;
    Set<String> notSatisfies = Sets.newHashSet();
    // 循环执行替换操作、默认执行 maxMove 次
    while (balanced < maxMove) {
      int last = balanced;
      //1. 找到 哪些节点的 leader 个数 超过 threshold、并对这些节点依照 leader 的个数的从大到小排列。final List<String> highDataServers = findDataServersLeaderHighWaterMark(threshold);
      if (highDataServers.isEmpty()) {break;}
      // 没有任何 follow 节点能用来降职到 leader 节点
      if (notSatisfies.containsAll(highDataServers)) {break;}
      //2. 找到能够作为新的 leader 的 节点,然而不蕴含曾经不能增加任何 leader 的节点、因为这些节点的 leader 曾经超过阈值了。final Set<String> excludes = Sets.newHashSet(highDataServers);
      excludes.addAll(findDataServersLeaderHighWaterMark(threshold - 1));
      for (String highDataServer : highDataServers) {if (notSatisfies.contains(highDataServer)) {
          // 如果该节点曾经在不满足替换条件队列中、则不在进行查找可替换节点操作
          continue;
        }
        // 找到能够作为新的 leader 的 节点,然而不蕴含曾经不能增加任何 leader 的节点、因为这些节点的 leader 曾经超过阈值了。// 算法过程是: 
        //1. 从 highDataServer 所负责的所有 slot 中找到某一个 slot、这个 slot 满足一个条件就是: 该 slot 的 follow 节点中有一个最闲(也就是 节点的 leader 的最小)
        //2. 找到这个 slot、咱们只须要替换该 slot 的 leader 为找到的 follow
        
        // 其实站在宏观的角度来说就是将 highDataServer 节点 leader 的所有 slot 的 follow 节点依照闲忙水平进行排序、// 找到那个最闲的、而后让他当 leader。这样就替换了 highDataServer 当 leader 了
        Tuple<String, Integer> selected = selectFollower4LeaderUpgradeOut(highDataServer, excludes);
        if (selected == null) {
          // 没有找到任何 follow 节点用来代替 highDataServer 节点、所以该节点不满足可替换条件、退出到 notSatisfies 不可替换队列中. 以便于外层循环间接过滤。notSatisfies.add(highDataServer);
          continue;
        }
        // 找到 highDataServer 节点的某一个可替换的 slotId
        final int slotId = selected.o2;
        // 找到 slotId 替换 highDataServer 作为 leader 的节点 newLeaderDataServer
        final String newLeaderDataServer = selected.o1; 
        // 用 newLeaderDataServer 替换 slotId 旧的 leader 节点。slotTableBuilder.replaceLeader(slotId, newLeaderDataServer); 
        balanced++;
      }
      if (last == balanced) break;
    }
    return balanced != 0;
  }

进入要害查找可替换的 slotId 和新的 leader 节点的过程,同理采纳源码注解的形式。

  /*
    从 leaderDataServer 所 leader 的所有 slot 中、抉择一个能够替换的 slotId
    和新的 leader 来替换 leaderDataServer
   */
  private Tuple<String, Integer> selectFollower4LeaderUpgradeOut(String leaderDataServer, Set<String> excludes) {
    // 获取以后 leaderDataServer 节点 leader 或者 follow 的 slotId 视图。DataNodeSlot 构造咱们上文有说过。final DataNodeSlot dataNodeSlot = slotTableBuilder.getDataNodeSlot(leaderDataServer);

    Set<Integer> leaderSlots = dataNodeSlot.getLeaders();
    Map<String, List<Integer>> dataServers2Followers = Maps.newHashMap();
    //1. 从 dataNodeSlot 获取 leaderDataServer 节点 leader 的所有 slotId: leaderSlots
    for (int slot : leaderSlots) {
      //2. 从 slotTableBuilder 中找出以后 slot 的 follow
      List<String> followerDataServers = slotTableBuilder.getDataServersOwnsFollower(slot);
      //3. 去掉 excludes,失去候选节点,因为 excludes 必定不会是新的 leader 节点
      followerDataServers = getCandidateDataServers(excludes, null, followerDataServers);
      //4. 构建 候选节点到 slotId 汇合的映射关系。for (String followerDataServer : followerDataServers) {
        List<Integer> followerSlots =
            dataServers2Followers.computeIfAbsent(followerDataServer, k -> Lists.newArrayList());
        followerSlots.add(slot);
      }
    }
    if (dataServers2Followers.isEmpty()) {
      // 当 leaderDataServer 节点的 follow 都是 excludes 中的成员时候、那么就有可能是空的。return null;
    }
    List<String> dataServers = Lists.newArrayList(dataServers2Followers.keySet());
    // 依照 候选节点的 leader 的 slot 个数升序排序、也就是也就是找到那个最不忙的,感兴趣能够查看 leastLeadersFirst 办法外部实现。dataServers.sort(Comparators.leastLeadersFirst(slotTableBuilder));
    final String selectedDataServer = dataServers.get(0);
    List<Integer> followers = dataServers2Followers.get(selectedDataServer);
    return Tuple.of(selectedDataServer, followers.get(0));
  }

至此咱们实现了 高负载 leader 节点的替换、在此过程中如果有替换过、那么间接返回、如果没有替换过、咱们会继续执行 DefaultSlotBalancer 中的 migrateHighLeaders 操作。因为如果通过 DefaultSlotBalancer 中的 upgradeHighLeaders 操作之后没有进行过任何 leader 的替换、那么证实 高负载的 leader 节点同样它的 follow 节点也很忙、所以须要做得就是对这些忙的 follow 节点也要进行迁徙。咱们持续通过源码正文的形式来查看具体的过程。

private boolean migrateHighLeaders(int ceilAvg) {final int maxMove = balancePolicy.getMaxMoveFollowerSlots();
    final int threshold = balancePolicy.getHighWaterMarkSlotLeaderNums(ceilAvg);

    int balanced = 0;
    while (balanced < maxMove) {
      int last = balanced;
      // 1. find the dataNode which has leaders more than high water mark
      //    and sorted by leaders.num desc
      final List<String> highDataServers = findDataServersLeaderHighWaterMark(threshold);
      if (highDataServers.isEmpty()) {return false;}
      // 2. find the dataNode which could own a new leader
      // exclude the high
      final Set<String> excludes = Sets.newHashSet(highDataServers);
      // exclude the dataNode which could not add any leader
      excludes.addAll(findDataServersLeaderHighWaterMark(threshold - 1));
      final Set<String> newFollowerDataServers = Sets.newHashSet();
      // only balance highDataServer once at one round, avoid the follower moves multi times
      for (String highDataServer : highDataServers) {
        Triple<String, Integer, String> selected =
            selectFollower4LeaderMigrate(highDataServer, excludes, newFollowerDataServers);
        if (selected == null) {continue;}
        final String oldFollower = selected.getFirst();
        final int slotId = selected.getMiddle();
        final String newFollower = selected.getLast();
        slotTableBuilder.removeFollower(slotId, oldFollower);
        slotTableBuilder.addFollower(slotId, newFollower);
        newFollowerDataServers.add(newFollower);
        balanced++;
      }
      if (last == balanced) break;
    }
    return balanced != 0;
  }

3. session 和 data 节点如何应用路由表

上文咱们 理解了 SlotTable 路由表在心跳中从 Meta 节点获取并且更新到本地中、那么 session 和 data 节点如何应用路由表呢。首先咱们先看看 session 节点如何应用 SlotTable 路由表。session 节点承当着客户端的公布订阅申请,并且通过 SlotTable 路由表对 data 节点的数据进行读写; session 节点本地 SlotTable 路由表保留在 SlotTableCacheImpl。

public final class SlotTableCacheImpl implements SlotTableCache {
  // 不同计算 slot 地位的算法形象
  private final SlotFunction slotFunction = SlotFunctionRegistry.getFunc();

  // 本地路由表、心跳中从 Meta 节点获取到。private volatile SlotTable slotTable = SlotTable.INIT;
    
    
  // 依据 dataInfoId 获取 slotId
  @Override
  public int slotOf(String dataInfoId) {return slotFunction.slotOf(dataInfoId);
  }
}

源码中的 SlotFunctionRegistry 注册了两种算法实现。别离是 crc32 和 md5 实现、源码如下所示。

public final class SlotFunctionRegistry {private static final Map<String, SlotFunction> funcs = Maps.newConcurrentMap();

  static {register(Crc32cSlotFunction.INSTANCE);
    register(MD5SlotFunction.INSTANCE);
  }

  public static void register(SlotFunction func) {funcs.put(func.name(), func);
  }

  public static SlotFunction getFunc() {return funcs.get(SlotConfig.FUNC);
  }
}

轻易抉择某一个算法、例如 MD5SlotFunction、依据 dataInfoId 计算 slotId 的实现如下。

public final class MD5SlotFunction implements SlotFunction {public static final MD5SlotFunction INSTANCE = new MD5SlotFunction();

  private final int maxSlots;
  private final MD5HashFunction md5HashFunction = new MD5HashFunction();

  private MD5SlotFunction() {this.maxSlots = SlotConfig.SLOT_NUM;}
  // 计算 slotId 的最底层逻辑。可见也是通过取 hash 而后对 slot 槽个数取余
  @Override
  public int slotOf(Object o) {
    // make sure >=0
    final int hash = Math.abs(md5HashFunction.hash(o));
    return hash % maxSlots;
  }
}

理解了具体依据 DataInfoId 来通过 SlotTable 获取具体的数据 slotId,咱们来看看在 session 节点中何时触发计算 datInfoId 的 slotId。咱们能够想想,个别 session 节点应用来解决客户端的公布订阅申请,那么当有公布申请的时候,公布的数据同时也会向 data 节点写入公布的元数据,那么必定须要晓得该数据保留在哪一台机器上,此时就须要依据 dataInfoId 找到对应的 slotId,进而找到对应的 leader 节点,通过网络通讯工具将公布申请转发给该节点解决,session 数据接管公布申请解决 handler 为 PublisherHandler。

如下面时序图所示、在 DataNodeServiceImpl 最初的 commitReq 办法中会将公布申请增加到外部的 BlockingQueue 当中去,DataNodeServiceImpl 外部的 Worker 对象会生产 BlockingQueue 中外部执行真正的数据写入过程。具体源码请参考 :

private final class Worker implements Runnable {
    final BlockingQueue<Req> queue;
    Worker(BlockingQueue<Req> queue) {this.queue = queue;}

    @Override
    public void run() {for (; ;) {final Req firstReq = queue.poll(200, TimeUnit.MILLISECONDS);
          if (firstReq != null) {
            Map<Integer, LinkedList<Object>> reqs =
                drainReq(queue, sessionServerConfig.getDataNodeMaxBatchSize());
            // 因为 slot 的个数有可能大于 work/blockingQueue 的个数、所以
            // 并不是一个 slot 对应一个 work、那么一个 blockQueue 中可能存在发往多个 slot 的数据、这里
            // 有可能一次发送不完这么多数据、须要分批发送、将首先进入队列的优先发送了。LinkedList<Object> firstBatch = reqs.remove(firstReq.slotId);
            if (firstBatch == null) {firstBatch = Lists.newLinkedList();
            }
            firstBatch.addFirst(firstReq.req);
            request(firstReq.slotId, firstBatch);
            for (Map.Entry<Integer, LinkedList<Object>> batch : reqs.entrySet()) {request(batch.getKey(), batch.getValue());
            }
          }
        }
     }
 }

private boolean request(BatchRequest batch) {final Slot slot = getSlot(batch.getSlotId());
  batch.setSlotTableEpoch(slotTableCache.getEpoch());
  batch.setSlotLeaderEpoch(slot.getLeaderEpoch());
  sendRequest(new Request() {
        @Override
        public Object getRequestBody() {return batch;}

        @Override
        public URL getRequestUrl() {
          // 通过 slot 路由表找到对应的 leader data 节点,这
          // 个路由表是 心跳中从 Meta 节点获取来的。return getUrl(slot);
        }
      });
  return true;
}

借此源码解析的工作让我认真的钻研了 SlotTable 数据预分片机制,真正理解了一个工业级别的数据分片是如何实现的,高可用是如何做的,以及在性能实现中的各种取舍。

因而和大家分享这篇对 SOFARegistry SlotTable 的分析,欢送大家留言领导。

也有幸参加了 SOFARegistry 的社区会议,在社区前辈们的领导下认真理解了具体细节的设计考量,一起探讨 SOFARegistry 的将来倒退。

欢送退出,参加 SOFA 社区的源码解析

目前 SOFARegistry 源码解析工作已发完,Layotto 源码解析还有 1 个工作待认领,有趣味的敌人能够试试看。

Layotto

待认领工作:WebAssembly 相干

https://github.com/mosn/layotto/issues/427

之后也会推出其余我的项目的源码解析流动,敬请期待 …

正文完
 0