本文主要研究一下scalecube-cluster的MembershipProtocol

MembershipProtocol

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java

/** * Cluster Membership Protocol component responsible for managing information about existing members * of the cluster. */public interface MembershipProtocol {  /**   * Starts running cluster membership protocol. After started it begins to receive and send cluster   * membership messages   */  Mono<Void> start();  /** Stops running cluster membership protocol and releases occupied resources. */  void stop();  /** Listen changes in cluster membership. */  Flux<MembershipEvent> listen();  /**   * Returns list of all members of the joined cluster. This will include all cluster members   * including local member.   *   * @return all members in the cluster (including local one)   */  Collection<Member> members();  /**   * Returns list of all cluster members of the joined cluster excluding local member.   *   * @return all members in the cluster (excluding local one)   */  Collection<Member> otherMembers();  /**   * Returns local cluster member which corresponds to this cluster instance.   *   * @return local member   */  Member member();  /**   * Returns cluster member with given id or null if no member with such id exists at joined   * cluster.   *   * @return member by id   */  Optional<Member> member(String id);  /**   * Returns cluster member by given address or null if no member with such address exists at joined   * cluster.   *   * @return member by address   */  Optional<Member> member(Address address);}
  • MembershipProtocol接口定义了start、stop、listen、members、otherMembers、member方法

MembershipProtocolImpl

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {  private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocolImpl.class);  private enum MembershipUpdateReason {    FAILURE_DETECTOR_EVENT,    MEMBERSHIP_GOSSIP,    SYNC,    INITIAL_SYNC,    SUSPICION_TIMEOUT  }  // Qualifiers  public static final String SYNC = "sc/membership/sync";  public static final String SYNC_ACK = "sc/membership/syncAck";  public static final String MEMBERSHIP_GOSSIP = "sc/membership/gossip";  private final Member localMember;  // Injected  private final Transport transport;  private final MembershipConfig config;  private final List<Address> seedMembers;  private final FailureDetector failureDetector;  private final GossipProtocol gossipProtocol;  private final MetadataStore metadataStore;  private final CorrelationIdGenerator cidGenerator;  // State  private final Map<String, MembershipRecord> membershipTable = new HashMap<>();  private final Map<String, Member> members = new HashMap<>();  // Subject  private final FluxProcessor<MembershipEvent, MembershipEvent> subject =      DirectProcessor.<MembershipEvent>create().serialize();  private final FluxSink<MembershipEvent> sink = subject.sink();  // Disposables  private final Disposable.Composite actionsDisposables = Disposables.composite();  // Scheduled  private final Scheduler scheduler;  private final Map<String, Disposable> suspicionTimeoutTasks = new HashMap<>();  /**   * Creates new instantiates of cluster membership protocol with given transport and config.   *   * @param localMember local cluster member   * @param transport cluster transport   * @param failureDetector failure detector   * @param gossipProtocol gossip protocol   * @param metadataStore metadata store   * @param config membership config parameters   * @param scheduler scheduler   * @param cidGenerator correlation id generator   */  public MembershipProtocolImpl(      Member localMember,      Transport transport,      FailureDetector failureDetector,      GossipProtocol gossipProtocol,      MetadataStore metadataStore,      MembershipConfig config,      Scheduler scheduler,      CorrelationIdGenerator cidGenerator) {    this.transport = Objects.requireNonNull(transport);    this.config = Objects.requireNonNull(config);    this.failureDetector = Objects.requireNonNull(failureDetector);    this.gossipProtocol = Objects.requireNonNull(gossipProtocol);    this.metadataStore = Objects.requireNonNull(metadataStore);    this.localMember = Objects.requireNonNull(localMember);    this.scheduler = Objects.requireNonNull(scheduler);    this.cidGenerator = Objects.requireNonNull(cidGenerator);    // Prepare seeds    seedMembers = cleanUpSeedMembers(config.getSeedMembers());    // Init membership table with local member record    membershipTable.put(localMember.id(), new MembershipRecord(localMember, ALIVE, 0));    // fill in the table of members with local member    members.put(localMember.id(), localMember);    actionsDisposables.addAll(        Arrays.asList(            // Listen to incoming SYNC and SYNC ACK requests from other members            transport                .listen() //                .publishOn(scheduler)                .subscribe(this::onMessage, this::onError),            // Listen to events from failure detector            failureDetector                .listen()                .publishOn(scheduler)                .subscribe(this::onFailureDetectorEvent, this::onError),            // Listen to membership gossips            gossipProtocol                .listen()                .publishOn(scheduler)                .subscribe(this::onMembershipGossip, this::onError)));  }  @Override  public Flux<MembershipEvent> listen() {    return subject.onBackpressureBuffer();  }  @Override  public Mono<Void> start() {    // Make initial sync with all seed members    return Mono.create(        sink -> {          // In case no members at the moment just schedule periodic sync          if (seedMembers.isEmpty()) {            schedulePeriodicSync();            sink.success();            return;          }          // If seed addresses are specified in config - send initial sync to those nodes          LOGGER.debug("Making initial Sync to all seed members: {}", seedMembers);          //noinspection unchecked          Mono<Message>[] syncs =              seedMembers                  .stream()                  .map(                      address -> {                        String cid = cidGenerator.nextCid();                        return transport                            .requestResponse(prepareSyncDataMsg(SYNC, cid), address)                            .filter(this::checkSyncGroup);                      })                  .toArray(Mono[]::new);          // Process initial SyncAck          Flux.mergeDelayError(syncs.length, syncs)              .take(1)              .timeout(Duration.ofMillis(config.getSyncTimeout()), scheduler)              .publishOn(scheduler)              .flatMap(message -> onSyncAck(message, true))              .doFinally(                  s -> {                    schedulePeriodicSync();                    sink.success();                  })              .subscribe(                  null,                  ex -> LOGGER.info("Exception on initial SyncAck, cause: {}", ex.toString()));        });  }  @Override  public void stop() {    // Stop accepting requests, events and sending sync    actionsDisposables.dispose();    // Cancel remove members tasks    for (String memberId : suspicionTimeoutTasks.keySet()) {      Disposable future = suspicionTimeoutTasks.get(memberId);      if (future != null && !future.isDisposed()) {        future.dispose();      }    }    suspicionTimeoutTasks.clear();    // Stop publishing events    sink.complete();  }  @Override  public Collection<Member> members() {    return new ArrayList<>(members.values());  }  @Override  public Collection<Member> otherMembers() {    return new ArrayList<>(members.values())        .stream()        .filter(member -> !member.equals(localMember))        .collect(Collectors.toList());  }  @Override  public Member member() {    return localMember;  }  @Override  public Optional<Member> member(String id) {    return Optional.ofNullable(members.get(id));  }  @Override  public Optional<Member> member(Address address) {    return new ArrayList<>(members.values())        .stream()        .filter(member -> member.address().equals(address))        .findFirst();  }  //......}
  • MembershipProtocolImpl实现了MembershipProtocol接口;它定义了MembershipUpdateReason枚举(FAILURE_DETECTOR_EVENT、MEMBERSHIP_GOSSIP、SYNC、INITIAL_SYNC、SUSPICION_TIMEOUT)
  • MembershipProtocolImpl的构造器监听了transport.listen()触发onMessage方法;监听了failureDetector.listen()触发onFailureDetectorEvent方法;监听了gossipProtocol.listen()触发onMembershipGossip方法
  • MembershipProtocolImpl的start方法在seedMembers.isEmpty()的时候会执行schedulePeriodicSync方法即每隔syncInterval执行doSync方法;当seedMembers不为空时则遍历seedMembers通过transport.requestResponse发送SYNC,执行成功则触发onSyncAck;stop方法则挨个销毁suspicionTimeoutTasks的future

onMessage

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {  //......  private void onMessage(Message message) {    if (checkSyncGroup(message)) {      if (SYNC.equals(message.qualifier())) {        onSync(message).subscribe(null, this::onError);      }      if (SYNC_ACK.equals(message.qualifier())) {        if (message.correlationId() == null) { // filter out initial sync          onSyncAck(message, false).subscribe(null, this::onError);        }      }    }  }  private boolean checkSyncGroup(Message message) {    if (message.data() instanceof SyncData) {      SyncData syncData = message.data();      return config.getSyncGroup().equals(syncData.getSyncGroup());    }    return false;  }  /** Merges incoming SYNC data, merges it and sending back merged data with SYNC_ACK. */  private Mono<Void> onSync(Message syncMsg) {    return Mono.defer(        () -> {          LOGGER.debug("Received Sync: {}", syncMsg);          return syncMembership(syncMsg.data(), false)              .doOnSuccess(                  avoid -> {                    Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());                    Address address = syncMsg.sender();                    transport                        .send(address, message)                        .subscribe(                            null,                            ex ->                                LOGGER.debug(                                    "Failed to send SyncAck: {} to {}, cause: {}",                                    message,                                    address,                                    ex.toString()));                  });        });  }  private Mono<Void> onSyncAck(Message syncAckMsg, boolean onStart) {    return Mono.defer(        () -> {          LOGGER.debug("Received SyncAck: {}", syncAckMsg);          return syncMembership(syncAckMsg.data(), onStart);        });  }  private Mono<Void> syncMembership(SyncData syncData, boolean onStart) {    return Mono.defer(        () -> {          MembershipUpdateReason reason =              onStart ? MembershipUpdateReason.INITIAL_SYNC : MembershipUpdateReason.SYNC;          return Mono.whenDelayError(              syncData                  .getMembership()                  .stream()                  .filter(r1 -> !r1.equals(membershipTable.get(r1.id())))                  .map(r1 -> updateMembership(r1, reason))                  .toArray(Mono[]::new));        });  }  //......}
  • onMessage方法首先通过checkSyncGroup检查一下是不是该syncGroup的消息;之后根据message.qualifier()是SYNC则执行onSync,是SYNC_ACK则执行onSyncAck
  • onSync方法则执行syncMembership,成功时向sender返回SYNC_ACK信息;onSyncAck方法也是调用syncMembership,只不过没有再向sender返回信息
  • syncMembership方法则根据syncData的membership来挨个执行updateMembership方法

onFailureDetectorEvent

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {  //......  /** Merges FD updates and processes them. */  private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {    MembershipRecord r0 = membershipTable.get(fdEvent.member().id());    if (r0 == null) { // member already removed      return;    }    if (r0.status() == fdEvent.status()) { // status not changed      return;    }    LOGGER.debug("Received status change on failure detector event: {}", fdEvent);    if (fdEvent.status() == ALIVE) {      // TODO: Consider to make more elegant solution      // Alive won't override SUSPECT so issue instead extra sync with member to force it spread      // alive with inc + 1      Message syncMsg = prepareSyncDataMsg(SYNC, null);      Address address = fdEvent.member().address();      transport          .send(address, syncMsg)          .subscribe(              null,              ex ->                  LOGGER.debug(                      "Failed to send {} to {}, cause: {}", syncMsg, address, ex.toString()));    } else {      MembershipRecord record =          new MembershipRecord(r0.member(), fdEvent.status(), r0.incarnation());      updateMembership(record, MembershipUpdateReason.FAILURE_DETECTOR_EVENT)          .subscribe(null, this::onError);    }  }  //......}
  • onFailureDetectorEvent方法根据FailureDetectorEvent判断该MembershipRecord的状态是否有变化,如果变为ALIVE则往fdEvent.member().address()发送SYNC信息;否则使用MembershipUpdateReason.FAILURE_DETECTOR_EVENT来updateMembership

onMembershipGossip

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {  //......  /** Merges received membership gossip (not spreading gossip further). */  private void onMembershipGossip(Message message) {    if (MEMBERSHIP_GOSSIP.equals(message.qualifier())) {      MembershipRecord record = message.data();      LOGGER.debug("Received membership gossip: {}", record);      updateMembership(record, MembershipUpdateReason.MEMBERSHIP_GOSSIP)          .subscribe(null, this::onError);    }  }  //......}
  • onMembershipGossip方法则针对message.qualifier()为MEMBERSHIP_GOSSIP的消息使用MembershipUpdateReason.MEMBERSHIP_GOSSIP来updateMembership

updateMembership

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {  //......  /**   * Try to update membership table with the given record.   *   * @param r1 new membership record which compares with existing r0 record   * @param reason indicating the reason for updating membership table   */  private Mono<Void> updateMembership(MembershipRecord r1, MembershipUpdateReason reason) {    return Mono.defer(        () -> {          Objects.requireNonNull(r1, "Membership record can't be null");          // Get current record          MembershipRecord r0 = membershipTable.get(r1.id());          // Check if new record r1 overrides existing membership record r0          if (!r1.isOverrides(r0)) {            return Mono.empty();          }          // If received updated for local member then increase incarnation and spread Alive gossip          if (r1.member().id().equals(localMember.id())) {            int currentIncarnation = Math.max(r0.incarnation(), r1.incarnation());            MembershipRecord r2 =                new MembershipRecord(localMember, r0.status(), currentIncarnation + 1);            membershipTable.put(localMember.id(), r2);            LOGGER.debug(                "Local membership record r0: {}, but received r1: {}, "                    + "spread with increased incarnation r2: {}",                r0,                r1,                r2);            spreadMembershipGossip(r2)                .subscribe(                    null,                    ex -> {                      // on-op                    });            return Mono.empty();          }          // Update membership          if (r1.isDead()) {            membershipTable.remove(r1.id());          } else {            membershipTable.put(r1.id(), r1);          }          // Schedule/cancel suspicion timeout task          if (r1.isSuspect()) {            scheduleSuspicionTimeoutTask(r1);          } else {            cancelSuspicionTimeoutTask(r1.id());          }          // Emit membership and regardless of result spread gossip          return emitMembershipEvent(r0, r1)              .doFinally(                  s -> {                    // Spread gossip (unless already gossiped)                    if (reason != MembershipUpdateReason.MEMBERSHIP_GOSSIP                        && reason != MembershipUpdateReason.INITIAL_SYNC) {                      spreadMembershipGossip(r1)                          .subscribe(                              null,                              ex -> {                                // no-op                              });                    }                  });        });  }  private Mono<Void> spreadMembershipGossip(MembershipRecord record) {    return Mono.defer(        () -> {          Message msg = Message.withData(record).qualifier(MEMBERSHIP_GOSSIP).build();          LOGGER.debug("Spead membreship: {} with gossip", msg);          return gossipProtocol              .spread(msg)              .doOnError(                  ex ->                      LOGGER.debug(                          "Failed to spread membership: {} with gossip, cause: {}",                          msg,                          ex.toString()))              .then();        });  }  private void scheduleSuspicionTimeoutTask(MembershipRecord record) {    long suspicionTimeout =        ClusterMath.suspicionTimeout(            config.getSuspicionMult(), membershipTable.size(), config.getPingInterval());    suspicionTimeoutTasks.computeIfAbsent(        record.id(),        id ->            scheduler.schedule(                () -> onSuspicionTimeout(id), suspicionTimeout, TimeUnit.MILLISECONDS));  }  private void onSuspicionTimeout(String memberId) {    suspicionTimeoutTasks.remove(memberId);    MembershipRecord record = membershipTable.get(memberId);    if (record != null) {      LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", record);      MembershipRecord deadRecord =          new MembershipRecord(record.member(), DEAD, record.incarnation());      updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT)          .subscribe(null, this::onError);    }  }  private void cancelSuspicionTimeoutTask(String memberId) {    Disposable future = suspicionTimeoutTasks.remove(memberId);    if (future != null && !future.isDisposed()) {      future.dispose();    }  }  private Mono<Void> emitMembershipEvent(MembershipRecord r0, MembershipRecord r1) {    return Mono.defer(        () -> {          final Member member = r1.member();          if (r1.isDead()) {            members.remove(member.id());            // removed            return Mono.fromRunnable(                () -> {                  Map<String, String> metadata = metadataStore.removeMetadata(member);                  sink.next(MembershipEvent.createRemoved(member, metadata));                });          }          if (r0 == null && r1.isAlive()) {            members.put(member.id(), member);            // added            return metadataStore                .fetchMetadata(member)                .doOnSuccess(                    metadata -> {                      metadataStore.updateMetadata(member, metadata);                      sink.next(MembershipEvent.createAdded(member, metadata));                    })                .onErrorResume(TimeoutException.class, e -> Mono.empty())                .then();          }          if (r0 != null && r0.incarnation() < r1.incarnation()) {            // updated            return metadataStore                .fetchMetadata(member)                .doOnSuccess(                    metadata1 -> {                      Map<String, String> metadata0 =                          metadataStore.updateMetadata(member, metadata1);                      sink.next(MembershipEvent.createUpdated(member, metadata0, metadata1));                    })                .onErrorResume(TimeoutException.class, e -> Mono.empty())                .then();          }          return Mono.empty();        });  }  //......}
  • updateMembership会对比传入的MembershipRecord与本地的localMember,如果是需要更新localMember则执行spreadMembershipGossip,之后根据MembershipRecord的状态做不同处理,比如isDead则从membershipTable移除,比如isSuspect则执行scheduleSuspicionTimeoutTask,否则执行cancelSuspicionTimeoutTask,最后执行emitMembershipEvent及spreadMembershipGossip
  • scheduleSuspicionTimeoutTask方法计算suspicionTimeout然后注册一个SuspicionTimeout的延时任务如果suspicionTimeoutTasks没有该record.id()的task的话;onSuspicionTimeout首先将该task从suspicionTimeoutTasks移除,然后使用MembershipUpdateReason.SUSPICION_TIMEOUT来updateMembership;cancelSuspicionTimeoutTask方法也是将该task从suspicionTimeoutTasks移除,并dispose该future
  • emitMembershipEvent方法这里主要是更新member在metadataStore的Metadata,如果是isDead则执行metadataStore.removeMetadata(member),其他的则看情况执行metadataStore.updateMetadata(member, metadata)

schedulePeriodicSync

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {  //......  private void schedulePeriodicSync() {    int syncInterval = config.getSyncInterval();    actionsDisposables.add(        scheduler.schedulePeriodically(            this::doSync, syncInterval, syncInterval, TimeUnit.MILLISECONDS));  }  private void doSync() {    Optional<Address> addressOptional = selectSyncAddress();    if (!addressOptional.isPresent()) {      return;    }    Address address = addressOptional.get();    Message message = prepareSyncDataMsg(SYNC, null);    LOGGER.debug("Send Sync: {} to {}", message, address);    transport        .send(address, message)        .subscribe(            null,            ex ->                LOGGER.debug(                    "Failed to send Sync: {} to {}, cause: {}", message, address, ex.toString()));  }  private Optional<Address> selectSyncAddress() {    List<Address> addresses =        Stream.concat(seedMembers.stream(), otherMembers().stream().map(Member::address))            .collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new));    Collections.shuffle(addresses);    if (addresses.isEmpty()) {      return Optional.empty();    } else {      int i = ThreadLocalRandom.current().nextInt(addresses.size());      return Optional.of(addresses.get(i));    }  }  private Message prepareSyncDataMsg(String qualifier, String cid) {    List<MembershipRecord> membershipRecords = new ArrayList<>(membershipTable.values());    SyncData syncData = new SyncData(membershipRecords, config.getSyncGroup());    return Message.withData(syncData)        .qualifier(qualifier)        .correlationId(cid)        .sender(localMember.address())        .build();  }  //......}
  • schedulePeriodically会注册doSync任务每隔syncInterval执行;doSync方法首先调用selectSyncAddress随机选择一个member来作为发送SYNC的目标,之后通过prepareSyncDataMsg构造sync消息,然后通过transport.send发送

小结

  • MembershipProtocol接口定义了start、stop、listen、members、otherMembers、member方法;MembershipProtocolImpl实现了MembershipProtocol接口;它定义了MembershipUpdateReason枚举(FAILURE_DETECTOR_EVENT、MEMBERSHIP_GOSSIP、SYNC、INITIAL_SYNC、SUSPICION_TIMEOUT)
  • MembershipProtocolImpl的构造器监听了transport.listen()触发onMessage方法;监听了failureDetector.listen()触发onFailureDetectorEvent方法;监听了gossipProtocol.listen()触发onMembershipGossip方法;MembershipProtocolImpl的start方法在seedMembers.isEmpty()的时候会执行schedulePeriodicSync方法即每隔syncInterval执行doSync方法;当seedMembers不为空时则遍历seedMembers通过transport.requestResponse发送SYNC,执行成功则触发onSyncAck;stop方法则挨个销毁suspicionTimeoutTasks的future
  • onMessage方法首先通过checkSyncGroup检查一下是不是该syncGroup的消息;之后根据message.qualifier()是SYNC则执行onSync,是SYNC_ACK则执行onSyncAck;onSync方法则执行syncMembership,成功时向sender返回SYNC_ACK信息;onSyncAck方法也是调用syncMembership,只不过没有再向sender返回信息;syncMembership方法则根据syncData的membership来挨个执行updateMembership方法
  • onFailureDetectorEvent方法根据FailureDetectorEvent判断该MembershipRecord的状态是否有变化,如果变为ALIVE则往fdEvent.member().address()发送SYNC信息;否则使用MembershipUpdateReason.FAILURE_DETECTOR_EVENT来updateMembership;onMembershipGossip方法则针对message.qualifier()为MEMBERSHIP_GOSSIP的消息使用MembershipUpdateReason.MEMBERSHIP_GOSSIP来updateMembership
  • updateMembership会对比传入的MembershipRecord与本地的localMember,如果是需要更新localMember则执行spreadMembershipGossip,之后根据MembershipRecord的状态做不同处理,比如isDead则从membershipTable移除,比如isSuspect则执行scheduleSuspicionTimeoutTask,否则执行cancelSuspicionTimeoutTask,最后执行emitMembershipEvent及spreadMembershipGossip
  • schedulePeriodically会注册doSync任务每隔syncInterval执行;doSync方法首先调用selectSyncAddress随机选择一个member来作为发送SYNC的目标,之后通过prepareSyncDataMsg构造sync消息,然后通过transport.send发送
MembershipProtocolImpl的start方法会注册doSync任务(每隔syncInterval执行),该任务会发送SYNC消息给随机选择出来的member,来sync全量的membershipRecords;onMessage方法接收到SYNC消息时执行syncMembership并在成功时返回SYNC_ACK,接收到SYNC_ACK时也是执行syncMembership;onFailureDetectorEvent及onMembershipGossip方法都会触发updateMembership方法来更新membershipTable必要是进行spreadMembershipGossip

doc

  • MembershipProtocol