本文主要研究一下scalecube-cluster的GossipProtocol

GossipProtocol

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocol.java

/** * Gossip Protocol component responsible for spreading information (gossips) over the cluster * members using infection-style dissemination algorithms. It provides reliable cross-cluster * broadcast. */public interface GossipProtocol {  /** Starts running gossip protocol. After started it begins to receive and send gossip messages */  void start();  /** Stops running gossip protocol and releases occupied resources. */  void stop();  /**   * Spreads given message between cluster members.   *   * @return future result with gossip id once gossip fully spread.   */  Mono<String> spread(Message message);  /** Listens for gossips from other cluster members. */  Flux<Message> listen();}
  • GossipProtocol接口定义了start、stop、spread、listen方法

GossipProtocolImpl

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

public final class GossipProtocolImpl implements GossipProtocol {  private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class);  // Qualifiers  public static final String GOSSIP_REQ = "sc/gossip/req";  // Injected  private final Member localMember;  private final Transport transport;  private final GossipConfig config;  // Local State  private long currentPeriod = 0;  private long gossipCounter = 0;  private Map<String, GossipState> gossips = new HashMap<>();  private Map<String, MonoSink<String>> futures = new HashMap<>();  private List<Member> remoteMembers = new ArrayList<>();  private int remoteMembersIndex = -1;  // Disposables  private final Disposable.Composite actionsDisposables = Disposables.composite();  // Subject  private final FluxProcessor<Message, Message> subject =      DirectProcessor.<Message>create().serialize();  private final FluxSink<Message> sink = subject.sink();  // Scheduled  private final Scheduler scheduler;  /**   * Creates new instance of gossip protocol with given memberId, transport and settings.   *   * @param localMember local cluster member   * @param transport cluster transport   * @param membershipProcessor membership event processor   * @param config gossip protocol settings   * @param scheduler scheduler   */  public GossipProtocolImpl(      Member localMember,      Transport transport,      Flux<MembershipEvent> membershipProcessor,      GossipConfig config,      Scheduler scheduler) {    this.transport = Objects.requireNonNull(transport);    this.config = Objects.requireNonNull(config);    this.localMember = Objects.requireNonNull(localMember);    this.scheduler = Objects.requireNonNull(scheduler);    // Subscribe    actionsDisposables.addAll(        Arrays.asList(            membershipProcessor //                .publishOn(scheduler)                .subscribe(this::onMemberEvent, this::onError),            transport                .listen()                .publishOn(scheduler)                .filter(this::isGossipReq)                .subscribe(this::onGossipReq, this::onError)));  }  @Override  public void start() {    actionsDisposables.add(        scheduler.schedulePeriodically(            this::doSpreadGossip,            config.getGossipInterval(),            config.getGossipInterval(),            TimeUnit.MILLISECONDS));  }  @Override  public void stop() {    // Stop accepting gossip requests and spreading gossips    actionsDisposables.dispose();    // Stop publishing events    sink.complete();  }  @Override  public Mono<String> spread(Message message) {    return Mono.fromCallable(() -> message)        .subscribeOn(scheduler)        .flatMap(msg -> Mono.create(sink -> futures.put(createAndPutGossip(msg), sink)));  }  @Override  public Flux<Message> listen() {    return subject.onBackpressureBuffer();  }  private void onMemberEvent(MembershipEvent event) {    Member member = event.member();    if (event.isRemoved()) {      remoteMembers.remove(member);    }    if (event.isAdded()) {      remoteMembers.add(member);    }  }  private void onGossipReq(Message message) {    long period = this.currentPeriod;    GossipRequest gossipRequest = message.data();    for (Gossip gossip : gossipRequest.gossips()) {      GossipState gossipState = gossips.get(gossip.gossipId());      if (gossipState == null) { // new gossip        gossipState = new GossipState(gossip, period);        gossips.put(gossip.gossipId(), gossipState);        sink.next(gossip.message());      }      gossipState.addToInfected(gossipRequest.from());    }  }  private boolean isGossipReq(Message message) {    return GOSSIP_REQ.equals(message.qualifier());  }  private String createAndPutGossip(Message message) {    long period = this.currentPeriod;    Gossip gossip = new Gossip(generateGossipId(), message);    GossipState gossipState = new GossipState(gossip, period);    gossips.put(gossip.gossipId(), gossipState);    return gossip.gossipId();  }  //......}
  • GossipProtocolImpl实现了GossipProtocol接口,它维护了名为gossips的gossipId与GossipState的map,以及remoteMembers列表
  • 它的构造器订阅了membershipProcessor,触发onMemberEvent方法,该方法根据MembershipEvent来对remoteMembers进行添加或移除member;订阅了transport.listen(),过滤出GossipReq,触发onGossipReq方法,该方法合并GossipRequest的gossips到本地的gossips,对于新的gossip的message则发送到sink,并维护该gossip的gossipState,将请求的memberId添加到infected中;spread方法则将message放入到本地的gossips中
  • start方法每隔gossipInterval执行doSpreadGossip方法;spread方法则通过createAndPutGossip创建Gossip并放入gossips中

doSpreadGossip

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

public final class GossipProtocolImpl implements GossipProtocol {  //......  private List<Member> remoteMembers = new ArrayList<>();  private int remoteMembersIndex = -1;  private void doSpreadGossip() {    // Increment period    long period = currentPeriod++;    // Check any gossips exists    if (gossips.isEmpty()) {      return; // nothing to spread    }    try {      // Spread gossips to randomly selected member(s)      selectGossipMembers().forEach(member -> spreadGossipsTo(period, member));      // Sweep gossips      sweepGossips(period);    } catch (Exception ex) {      LOGGER.warn("Exception at doSpreadGossip[{}]: {}", period, ex.getMessage(), ex);    }  }  private void spreadGossipsTo(long period, Member member) {    // Select gossips to send    List<Gossip> gossips = selectGossipsToSend(period, member);    if (gossips.isEmpty()) {      return; // nothing to spread    }    // Send gossip request    Address address = member.address();    gossips        .stream()        .map(this::buildGossipRequestMessage)        .forEach(            message ->                transport                    .send(address, message)                    .subscribe(                        null,                        ex ->                            LOGGER.debug(                                "Failed to send GossipReq[{}]: {} to {}, cause: {}",                                period,                                message,                                address,                                ex.toString())));  }  private List<Gossip> selectGossipsToSend(long period, Member member) {    int periodsToSpread =        ClusterMath.gossipPeriodsToSpread(config.getGossipRepeatMult(), remoteMembers.size() + 1);    return gossips        .values()        .stream()        .filter(            gossipState -> gossipState.infectionPeriod() + periodsToSpread >= period) // max rounds        .filter(gossipState -> !gossipState.isInfected(member.id())) // already infected        .map(GossipState::gossip)        .collect(Collectors.toList());  }  private List<Member> selectGossipMembers() {    int gossipFanout = config.getGossipFanout();    if (remoteMembers.size() < gossipFanout) { // select all      return remoteMembers;    } else { // select random members      // Shuffle members initially and once reached top bound      if (remoteMembersIndex < 0 || remoteMembersIndex + gossipFanout > remoteMembers.size()) {        Collections.shuffle(remoteMembers);        remoteMembersIndex = 0;      }      // Select members      List<Member> selectedMembers =          gossipFanout == 1              ? Collections.singletonList(remoteMembers.get(remoteMembersIndex))              : remoteMembers.subList(remoteMembersIndex, remoteMembersIndex + gossipFanout);      // Increment index and return result      remoteMembersIndex += gossipFanout;      return selectedMembers;    }  }  private Message buildGossipRequestMessage(Gossip gossip) {    GossipRequest gossipRequest = new GossipRequest(gossip, localMember.id());    return Message.withData(gossipRequest)        .qualifier(GOSSIP_REQ)        .sender(localMember.address())        .build();  }  private void sweepGossips(long period) {    // Select gossips to sweep    int periodsToSweep =        ClusterMath.gossipPeriodsToSweep(config.getGossipRepeatMult(), remoteMembers.size() + 1);    Set<GossipState> gossipsToRemove =        gossips            .values()            .stream()            .filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep)            .collect(Collectors.toSet());    // Check if anything selected    if (gossipsToRemove.isEmpty()) {      return; // nothing to sweep    }    // Sweep gossips    LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove);    for (GossipState gossipState : gossipsToRemove) {      gossips.remove(gossipState.gossip().gossipId());      MonoSink<String> sink = futures.remove(gossipState.gossip().gossipId());      if (sink != null) {        sink.success(gossipState.gossip().gossipId());      }    }  }  //......}
  • doSpreadGossip方法首先递增currentPeriod,然后执行selectGossipMembers,遍历该member执行spreadGossipsTo,最后执行sweepGossips
  • selectGossipMembers方法会根据gossipFanout配置随机选择gossipFanout个member,这里维护了remoteMembersIndex,具体是对remoteMembers进行subList,当remoteMembersIndex小于0或remoteMembersIndex + gossipFanout > remoteMembers.size()时会Collections.shuffle(remoteMembers)并重置remoteMembersIndex为0,之后对remoteMembersIndex加上gossipFanout
  • spreadGossipsTo方法首先执行selectGossipsToSend获取要发送的gossips,然后通过buildGossipRequestMessage构造GOSSIP_REQ消息,最后通过transport.send方法发送
  • sweepGossips方法则选取periodsToSweep,然后从gossips移除period > gossipState.infectionPeriod() + periodsToSweep的gossipState

小结

  • GossipProtocol接口定义了start、stop、spread、listen方法;GossipProtocolImpl实现了GossipProtocol接口,它维护了名为gossips的gossipId与GossipState的map,以及remoteMembers列表
  • GossipProtocolImpl的构造器订阅了membershipProcessor,触发onMemberEvent方法,该方法根据MembershipEvent来对remoteMembers进行添加或移除member;订阅了transport.listen(),过滤出GossipReq,触发onGossipReq方法,该方法合并GossipRequest的gossips到本地的gossips,对于新的gossip的message则发送到sink,并维护该gossip的gossipState,将请求的memberId添加到infected中;spread方法则将message放入到本地的gossips中
  • GossipProtocolImpl的start方法每隔gossipInterval执行doSpreadGossip方法;spread方法则通过createAndPutGossip创建Gossip并放入gossips中;doSpreadGossip方法首先递增currentPeriod,然后执行selectGossipMembers,遍历该member执行spreadGossipsTo,最后执行sweepGossips
这里GossipProtocolImpl注册了onMemberEvent及onGossipReq,其中onMemberEvent用于监听MembershipEvent,并根据该event来维护remoteMembers列表;onGossipReq则是监听其他member的doSpreadGossip方法发送过来的GossipReq消息,合并该消息的gossips到本地的gossips;而doSpreadGossip方法则是每隔gossipInterval执行,根据gossipFanout配置随机选择gossipFanout个member,然后针对每个member选择要发送的gossips进行spread(onGossipReq及spread方法会更改gossips,而每隔gossipInterval触发的doSpreadGossip则从gossips选择待spread的消息进行发送)

doc

  • Gossip
  • GossipProtocol
  • GossipProtocolImpl