本文主要研究一下apache gossip的ActiveGossiper

AbstractActiveGossiper

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java

/** * The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner */public abstract class AbstractActiveGossiper {  protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);  protected final GossipManager gossipManager;  protected final GossipCore gossipCore;  private final Histogram sharedDataHistogram;  private final Histogram sendPerNodeDataHistogram;  private final Histogram sendMembershipHistogram;  private final Random random;  private final GossipSettings gossipSettings;  public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {    this.gossipManager = gossipManager;    this.gossipCore = gossipCore;    sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));    sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));    sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time"));    random = new Random();    gossipSettings = gossipManager.getSettings();  }  public void init() {  }  public void shutdown() {  }  public final void sendShutdownMessage(LocalMember me, LocalMember target){    if (target == null){      return;    }    ShutdownMessage m = new ShutdownMessage();    m.setNodeId(me.getId());    m.setShutdownAtNanos(gossipManager.getClock().nanoTime());    gossipCore.sendOneWay(m, target.getUri());  }  //......  /**   * Performs the sending of the membership list, after we have incremented our own heartbeat.   */  protected void sendMembershipList(LocalMember me, LocalMember member) {    if (member == null){      return;    }    long startTime = System.currentTimeMillis();    me.setHeartbeat(System.nanoTime());    UdpActiveGossipMessage message = new UdpActiveGossipMessage();    message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());    message.setUuid(UUID.randomUUID().toString());    message.getMembers().add(convert(me));    for (LocalMember other : gossipManager.getMembers().keySet()) {      message.getMembers().add(convert(other));    }    Response r = gossipCore.send(message, member.getUri());    if (r instanceof ActiveGossipOk){      //maybe count metrics here    } else {      LOGGER.debug("Message " + message + " generated response " + r);    }    sendMembershipHistogram.update(System.currentTimeMillis() - startTime);  }  protected final Member convert(LocalMember member){    Member gm = new Member();    gm.setCluster(member.getClusterName());    gm.setHeartbeat(member.getHeartbeat());    gm.setUri(member.getUri().toASCIIString());    gm.setId(member.getId());    gm.setProperties(member.getProperties());    return gm;  }  /**   *   * @param memberList   *          An immutable list   * @return The chosen LocalGossipMember to gossip with.   */  protected LocalMember selectPartner(List<LocalMember> memberList) {    LocalMember member = null;    if (memberList.size() > 0) {      int randomNeighborIndex = random.nextInt(memberList.size());      member = memberList.get(randomNeighborIndex);    }    return member;  }}
  • AbstractActiveGossiper的构造器需要传入gossipManager及gossipCore;它定义了sendShutdownMessage、sendMembershipList、selectPartner等方法
  • selectPartner方法在memberList不为空的情况下随机生成randomNeighborIndex选择出一个LocalMember
  • sendMembershipList方法首先设置me的heartbeat,然后创建UdpActiveGossipMessage,该message的members首先是当前的localMember,然后再添加gossipManager.getMembers(),最后通过gossipCore.send发送给选中的member

ActiveGossipMessageHandler

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java

public class ActiveGossipMessageHandler implements MessageHandler {    /**   * @param gossipCore context.   * @param gossipManager context.   * @param base message reference.   * @return boolean indicating success.   */  @Override  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {    List<Member> remoteGossipMembers = new ArrayList<>();    RemoteMember senderMember = null;    UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;    for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {      URI u;      try {        u = new URI(activeGossipMessage.getMembers().get(i).getUri());      } catch (URISyntaxException e) {        GossipCore.LOGGER.debug("Gossip message with faulty URI", e);        continue;      }      RemoteMember member = new RemoteMember(              activeGossipMessage.getMembers().get(i).getCluster(),              u,              activeGossipMessage.getMembers().get(i).getId(),              activeGossipMessage.getMembers().get(i).getHeartbeat(),              activeGossipMessage.getMembers().get(i).getProperties());      if (i == 0) {        senderMember = member;      }      if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) {        UdpNotAMemberFault f = new UdpNotAMemberFault();        f.setException("Not a member of this cluster " + i);        f.setUriFrom(activeGossipMessage.getUriFrom());        f.setUuid(activeGossipMessage.getUuid());        GossipCore.LOGGER.warn(f);        gossipCore.sendOneWay(f, member.getUri());        continue;      }      remoteGossipMembers.add(member);    }    UdpActiveGossipOk o = new UdpActiveGossipOk();    o.setUriFrom(activeGossipMessage.getUriFrom());    o.setUuid(activeGossipMessage.getUuid());    gossipCore.sendOneWay(o, senderMember.getUri());    gossipCore.mergeLists(senderMember, remoteGossipMembers);    return true;  }}
  • 当目标member接收到UdpActiveGossipMessage的时候,由ActiveGossipMessageHandler来处理该消息;它首先从activeGossipMessage.getMembers(),转换为RemoteMember,添加到remoteGossipMembers,之后通过gossipCore.sendOneWay给发送方回复UdpActiveGossipOk,最后执行gossipCore.mergeLists(senderMember, remoteGossipMembers)

GossipCore

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java

public class GossipCore implements GossipCoreConstants {  class LatchAndBase {    private final CountDownLatch latch;    private volatile Base base;        LatchAndBase(){      latch = new CountDownLatch(1);    }      }  public static final Logger LOGGER = Logger.getLogger(GossipCore.class);  private final GossipManager gossipManager;  private ConcurrentHashMap<String, LatchAndBase> requests;  private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;  private final ConcurrentHashMap<String, SharedDataMessage> sharedData;  private final Meter messageSerdeException;  private final Meter transmissionException;  private final Meter transmissionSuccess;  private final DataEventManager eventManager;    public GossipCore(GossipManager manager, MetricRegistry metrics){    this.gossipManager = manager;    requests = new ConcurrentHashMap<>();    perNodeData = new ConcurrentHashMap<>();    sharedData = new ConcurrentHashMap<>();    eventManager = new DataEventManager(metrics);    metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());    metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() ->  sharedData.size());    metrics.register(REQUEST_SIZE, (Gauge<Integer>)() ->  requests.size());    messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);    transmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);    transmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);  }  public void receive(Base base) {    if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {      LOGGER.warn("received message can not be handled");    }  }  /**   * Sends a blocking message.   * todo: move functionality to TransportManager layer.   * @param message   * @param uri   * @throws RuntimeException if data can not be serialized or in transmission error   */  private void sendInternal(Base message, URI uri) {    byte[] json_bytes;    try {      json_bytes = gossipManager.getProtocolManager().write(message);    } catch (IOException e) {      messageSerdeException.mark();      throw new RuntimeException(e);    }    try {      gossipManager.getTransportManager().send(uri, json_bytes);      transmissionSuccess.mark();    } catch (IOException e) {      transmissionException.mark();      throw new RuntimeException(e);    }  }  public Response send(Base message, URI uri){    if (LOGGER.isDebugEnabled()){      LOGGER.debug("Sending " + message);      LOGGER.debug("Current request queue " + requests);    }    final Trackable t;    LatchAndBase latchAndBase = null;    if (message instanceof Trackable){      t = (Trackable) message;      latchAndBase = new LatchAndBase();      requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);    } else {      t = null;    }    sendInternal(message, uri);    if (latchAndBase == null){      return null;    }        try {      boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);      if (complete){        return (Response) latchAndBase.base;      } else{        return null;      }    } catch (InterruptedException e) {      throw new RuntimeException(e);    } finally {      if (latchAndBase != null){        requests.remove(t.getUuid() + "/" + t.getUriFrom());      }    }  }  /**   * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used   * when the protocol for the message is not to wait for a response   * @param message the message to send   * @param u the uri to send it to   */  public void sendOneWay(Base message, URI u) {    try {      sendInternal(message, u);    } catch (RuntimeException ex) {      LOGGER.debug("Send one way failed", ex);    }  }  public void handleResponse(String k, Base v) {    LatchAndBase latch = requests.get(k);    latch.base = v;    latch.latch.countDown();  }  /**   * Merge lists from remote members and update heartbeats   *   * @param senderMember   * @param remoteList   *   */  public void mergeLists(RemoteMember senderMember, List<Member> remoteList) {    if (LOGGER.isDebugEnabled()){      debugState(senderMember, remoteList);    }    for (LocalMember i : gossipManager.getDeadMembers()) {      if (i.getId().equals(senderMember.getId())) {        LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());        i.recordHeartbeat(senderMember.getHeartbeat());        i.setHeartbeat(senderMember.getHeartbeat());        //TODO consider forcing an UP here      }    }    for (Member remoteMember : remoteList) {      if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {        continue;      }      LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(),      remoteMember.getUri(),      remoteMember.getId(),      remoteMember.getHeartbeat(),      remoteMember.getProperties(),      gossipManager.getSettings().getWindowSize(),      gossipManager.getSettings().getMinimumSamples(),      gossipManager.getSettings().getDistribution());      aNewMember.recordHeartbeat(remoteMember.getHeartbeat());      Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP);      if (result != null){        for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){          if (localMember.getKey().getId().equals(remoteMember.getId())){            localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());            localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());            localMember.getKey().setProperties(remoteMember.getProperties());          }        }      }    }    if (LOGGER.isDebugEnabled()){      debugState(senderMember, remoteList);    }  }  //......}
  • GossipCore的构造器需要GossipManager参数,它定义了receive、send、sendOneWay、handleResponse、mergeLists等方法
  • mergeLists方法主要是将接收到的remoteList转换为LocalMember,然后通过的putIfAbsent方法与gossipManager.getMembers()进行合并
  • 合并的同时会更新已有localMember的heartbeat,recordHeartbeat方法会忽略小于等于latestHeartbeatMs的值

GossipManager

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java

public abstract class GossipManager {  public static final Logger LOGGER = Logger.getLogger(GossipManager.class);    // this mapper is used for ring and user-data persistence only. NOT messages.  public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {    private static final long serialVersionUID = 1L;  {    enableDefaultTyping();    configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);  }};  private final ConcurrentSkipListMap<LocalMember, GossipState> members;  private final LocalMember me;  private final GossipSettings settings;  private final AtomicBoolean gossipServiceRunning;    private TransportManager transportManager;  private ProtocolManager protocolManager;    private final GossipCore gossipCore;  private final DataReaper dataReaper;  private final Clock clock;  private final ScheduledExecutorService scheduledServiced;  private final MetricRegistry registry;  private final RingStatePersister ringState;  private final UserDataPersister userDataState;  private final GossipMemberStateRefresher memberStateRefresher;    private final MessageHandler messageHandler;  private final LockManager lockManager;  public GossipManager(String cluster,                       URI uri, String id, Map<String, String> properties, GossipSettings settings,                       List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,                       MessageHandler messageHandler) {    this.settings = settings;    this.messageHandler = messageHandler;    clock = new SystemClock();    me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,            settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());    gossipCore = new GossipCore(this, registry);    this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry);    dataReaper = new DataReaper(gossipCore, clock);    members = new ConcurrentSkipListMap<>();    for (Member startupMember : gossipMembers) {      if (!startupMember.equals(me)) {        LocalMember member = new LocalMember(startupMember.getClusterName(),                startupMember.getUri(), startupMember.getId(),                clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),                settings.getMinimumSamples(), settings.getDistribution());        //TODO should members start in down state?        members.put(member, GossipState.DOWN);      }    }    gossipServiceRunning = new AtomicBoolean(true);    this.scheduledServiced = Executors.newScheduledThreadPool(1);    this.registry = registry;    this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this);    this.userDataState = new UserDataPersister(        gossipCore,        GossipManager.buildPerNodeDataPath(this),        GossipManager.buildSharedDataPath(this));    this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData);    readSavedRingState();    readSavedDataState();  }  /**   * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip   * thread and start the receiver thread.   */  public void init() {        // protocol manager and transport managers are specified in settings.    // construct them here via reflection.        protocolManager = ReflectionUtils.constructWithReflection(        settings.getProtocolManagerClass(),        new Class<?>[] { GossipSettings.class, String.class, MetricRegistry.class },        new Object[] { settings, me.getId(), this.getRegistry() }    );        transportManager = ReflectionUtils.constructWithReflection(        settings.getTransportManagerClass(),        new Class<?>[] { GossipManager.class, GossipCore.class},        new Object[] { this, gossipCore }    );        // start processing gossip messages.    transportManager.startEndpoint();    transportManager.startActiveGossiper();        dataReaper.init();    if (settings.isPersistRingState()) {      scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);    }    if (settings.isPersistDataState()) {      scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);    }    memberStateRefresher.init();    LOGGER.debug("The GossipManager is started.");  }  /**   * Shutdown the gossip service.   */  public void shutdown() {    gossipServiceRunning.set(false);    lockManager.shutdown();    gossipCore.shutdown();    transportManager.shutdown();    dataReaper.close();    memberStateRefresher.shutdown();    scheduledServiced.shutdown();    try {      scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);    } catch (InterruptedException e) {      LOGGER.error(e);    }    scheduledServiced.shutdownNow();  }  //......}
  • GossipManager使用ConcurrentSkipListMap维护了LocalMember与GossipState的映射的members,同时该构造器创建了RingStatePersister、UserDataPersister、GossipMemberStateRefresher
  • init方法调用了transportManager.startEndpoint()及startActiveGossiper方法,同时通过scheduleAtFixedRate注册了RingStatePersister、UserDataPersister这两个定时任务,另外还执行了memberStateRefresher.init()
  • shutdown方法执行了gossipCore.shutdown()、transportManager.shutdown()、memberStateRefresher.shutdown()等

GossipMemberStateRefresher

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java

public class GossipMemberStateRefresher {  public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);  private final Map<LocalMember, GossipState> members;  private final GossipSettings settings;  private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();  private final Clock clock;  private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;  private final ExecutorService listenerExecutor;  private final ScheduledExecutorService scheduledExecutor;  private final BlockingQueue<Runnable> workQueue;  public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,                                    GossipListener listener,                                    BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {    this.members = members;    this.settings = settings;    listeners.add(listener);    this.findPerNodeGossipData = findPerNodeGossipData;    clock = new SystemClock();    workQueue = new ArrayBlockingQueue<>(1024);    listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue,            new ThreadPoolExecutor.DiscardOldestPolicy());    scheduledExecutor = Executors.newScheduledThreadPool(1);  }  public void init() {    scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS);  }  public void run() {    try {      runOnce();    } catch (RuntimeException ex) {      LOGGER.warn("scheduled state had exception", ex);    }  }  public void runOnce() {    for (Entry<LocalMember, GossipState> entry : members.entrySet()) {      boolean userDown = processOptimisticShutdown(entry);      if (userDown)        continue;      Double phiMeasure = entry.getKey().detect(clock.nanoTime());      GossipState requiredState;      if (phiMeasure != null) {        requiredState = calcRequiredState(phiMeasure);      } else {        requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue());      }      if (entry.getValue() != requiredState) {        members.put(entry.getKey(), requiredState);        /* Call listeners asynchronously */        for (GossipListener listener: listeners)          listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState));      }    }  }  public GossipState calcRequiredState(Double phiMeasure) {    if (phiMeasure > settings.getConvictThreshold())      return GossipState.DOWN;    else      return GossipState.UP;  }  public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) {    long now = clock.nanoTime();    long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);    if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {      return GossipState.DOWN;    } else {      return state;    }  }  /**   * If we have a special key the per-node data that means that the node has sent us   * a pre-emptive shutdown message. We process this so node is seen down sooner   *   * @param l member to consider   * @return true if node forced down   */  public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) {    PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);    if (m == null) {      return false;    }    ShutdownMessage s = (ShutdownMessage) m.getPayload();    if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {      members.put(l.getKey(), GossipState.DOWN);      if (l.getValue() == GossipState.UP) {        for (GossipListener listener: listeners)          listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));      }      return true;    }    return false;  }  public void register(GossipListener listener) {    listeners.add(listener);  }  public void shutdown() {    scheduledExecutor.shutdown();    try {      scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);    } catch (InterruptedException e) {      LOGGER.debug("Issue during shutdown", e);    }    listenerExecutor.shutdown();    try {      listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);    } catch (InterruptedException e) {      LOGGER.debug("Issue during shutdown", e);    }    listenerExecutor.shutdownNow();  }}
  • GossipMemberStateRefresher的init方法通过scheduledExecutor.scheduleAtFixedRate注册了GossipMemberStateRefresher的定时任务(每隔100ms执行)
  • runOnce方法遍历GossipManager传入的members,然后挨个调用LocalMember的detect方法计算phiMeasure,如果该值不为null则执行calcRequiredState,否则执行calcRequiredStateCleanupInterval来计算requiredState;如果state发生变更则更新然后异步回调GossipListener的gossipEvent方法
  • calcRequiredState方法判断phiMeasure是否大于convictThreshold(默认为10),大于则返回GossipState.DOWN,否则返回GossipState.UP;calcRequiredStateCleanupInterval方法则判断当前时间是否大于cleanupInterval+member.getHeartbeat(),大于则返回GossipState.DOWN,否则返回原有的state

AbstractTransportManager

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java

/** * Manage the protcol threads (active and passive gossipers). */public abstract class AbstractTransportManager implements TransportManager {    public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class);    private final ExecutorService gossipThreadExecutor;  private final AbstractActiveGossiper activeGossipThread;  protected final GossipManager gossipManager;  protected final GossipCore gossipCore;    public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) {    this.gossipManager = gossipManager;    this.gossipCore = gossipCore;    gossipThreadExecutor = Executors.newCachedThreadPool();    activeGossipThread = ReflectionUtils.constructWithReflection(      gossipManager.getSettings().getActiveGossipClass(),        new Class<?>[]{            GossipManager.class, GossipCore.class, MetricRegistry.class        },        new Object[]{            gossipManager, gossipCore, gossipManager.getRegistry()        });  }  // shut down threads etc.  @Override  public void shutdown() {    gossipThreadExecutor.shutdown();    if (activeGossipThread != null) {      activeGossipThread.shutdown();    }    try {      boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);      if (!result) {        // common when blocking patterns are used to read data from a socket.        LOGGER.warn("executor shutdown timed out");      }    } catch (InterruptedException e) {      LOGGER.error(e);    }    gossipThreadExecutor.shutdownNow();  }  @Override  public void startActiveGossiper() {    activeGossipThread.init();  }  @Override  public abstract void startEndpoint();}
  • AbstractTransportManager的startActiveGossiper会调用activeGossipThread.init();这里activeGossipThread为AbstractActiveGossiper的子类,这里我们看下SimpleActiveGossiper

SimpleActiveGossiper

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java

/** * Base implementation gossips randomly to live nodes periodically gossips to dead ones * */public class SimpleActiveGossiper extends AbstractActiveGossiper {  private ScheduledExecutorService scheduledExecutorService;  private final BlockingQueue<Runnable> workQueue;  private ThreadPoolExecutor threadService;    public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,                              MetricRegistry registry) {    super(gossipManager, gossipCore, registry);    scheduledExecutorService = Executors.newScheduledThreadPool(2);    workQueue = new ArrayBlockingQueue<Runnable>(1024);    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,            new ThreadPoolExecutor.DiscardOldestPolicy());  }  @Override  public void init() {    super.init();    scheduledExecutorService.scheduleAtFixedRate(() -> {      threadService.execute(() -> {        sendToALiveMember();      });    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);    scheduledExecutorService.scheduleAtFixedRate(() -> {      sendToDeadMember();    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);    scheduledExecutorService.scheduleAtFixedRate(            () -> sendPerNodeData(gossipManager.getMyself(),                    selectPartner(gossipManager.getLiveMembers())),            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);    scheduledExecutorService.scheduleAtFixedRate(            () -> sendSharedData(gossipManager.getMyself(),                    selectPartner(gossipManager.getLiveMembers())),            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);  }    @Override  public void shutdown() {    super.shutdown();    scheduledExecutorService.shutdown();    try {      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);    } catch (InterruptedException e) {      LOGGER.debug("Issue during shutdown", e);    }    sendShutdownMessage();    threadService.shutdown();    try {      threadService.awaitTermination(5, TimeUnit.SECONDS);    } catch (InterruptedException e) {      LOGGER.debug("Issue during shutdown", e);    }  }  protected void sendToALiveMember(){    LocalMember member = selectPartner(gossipManager.getLiveMembers());    sendMembershipList(gossipManager.getMyself(), member);  }    protected void sendToDeadMember(){    LocalMember member = selectPartner(gossipManager.getDeadMembers());    sendMembershipList(gossipManager.getMyself(), member);  }    /**   * sends an optimistic shutdown message to several clusters nodes   */  protected void sendShutdownMessage(){    List<LocalMember> l = gossipManager.getLiveMembers();    int sendTo = l.size() < 3 ? 1 : l.size() / 2;    for (int i = 0; i < sendTo; i++) {      threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));    }  }}
  • SimpleActiveGossiper继承了AbstractActiveGossiper,它覆盖了init方法,这里通过scheduledExecutorService的scheduleAtFixedRate注册了sendToALiveMember、sendToDeadMember、sendPerNodeData、sendSharedData四个定时任务(每隔gossipInterval执行)
  • shutdown方法主要是执行scheduledExecutorService.shutdown()、sendShutdownMessage()、threadService.shutdown()
  • sendToALiveMember首先通过父类的selectPartner方法来从gossipManager.getLiveMembers()选择一个liveMember,之后通过sendMembershipList来向它发送membershipList信息;sendToDeadMember首先首先通过父类的selectPartner方法来从gossipManager.getDeadMembers()选择一个liveMember,之后通过sendMembershipList来向它发送membershipList信息

小结

  • AbstractTransportManager的startActiveGossiper会调用activeGossipThread.init();这里activeGossipThread为AbstractActiveGossiper的子类,这里假设为SimpleActiveGossiper
  • SimpleActiveGossiper的init方法,这里通过scheduledExecutorService的scheduleAtFixedRate注册了sendToALiveMember、sendToDeadMember、sendPerNodeData、sendSharedData四个定时任务(每隔gossipInterval执行)
  • sendToALiveMember首先通过父类的selectPartner方法来从gossipManager.getLiveMembers()选择一个liveMember,之后通过sendMembershipList来向它发送membershipList信息;sendToDeadMember首先首先通过父类的selectPartner方法来从gossipManager.getDeadMembers()选择一个liveMember,之后通过sendMembershipList来向它发送membershipList信息
  • AbstractActiveGossiper提供了selectPartner、sendMembershipList方法方法;selectPartner方法在memberList不为空的情况下随机生成randomNeighborIndex选择出一个LocalMember;sendMembershipList方法首先设置me的heartbeat,然后创建UdpActiveGossipMessage,该message的members首先是当前的localMember,然后再添加gossipManager.getMembers(),最后通过gossipCore.send发送给选中的member
  • ActiveGossipMessageHandler用于处理UdpActiveGossipMessage;它首先从activeGossipMessage.getMembers(),转换为RemoteMember,添加到remoteGossipMembers,之后通过gossipCore.sendOneWay给发送方回复UdpActiveGossipOk,最后执行gossipCore.mergeLists(senderMember, remoteGossipMembers)
  • GossipCore的mergeLists方法主要是将接收到的remoteList转换为LocalMember,然后通过的putIfAbsent方法与gossipManager.getMembers()进行合并;合并的同时会更新已有localMember的heartbeat,recordHeartbeat方法会忽略小于等于latestHeartbeatMs的值
  • GossipManager使用ConcurrentSkipListMap维护了LocalMember与GossipState的映射的members,同时该构造器创建了RingStatePersister、UserDataPersister、GossipMemberStateRefresher;init方法调用了transportManager.startEndpoint()及startActiveGossiper方法,同时通过scheduleAtFixedRate注册了RingStatePersister、UserDataPersister这两个定时任务,另外还执行了memberStateRefresher.init()
  • GossipMemberStateRefresher的init方法通过scheduledExecutor.scheduleAtFixedRate注册了GossipMemberStateRefresher的定时任务(每隔100ms执行);runOnce方法遍历GossipManager传入的members,然后挨个调用LocalMember的detect方法计算phiMeasure,如果该值不为null则执行calcRequiredState,否则执行calcRequiredStateCleanupInterval来计算requiredState;如果state发生变更则更新然后异步回调GossipListener的gossipEvent方法;calcRequiredState方法判断phiMeasure是否大于convictThreshold(默认为10),大于则返回GossipState.DOWN,否则返回GossipState.UP;calcRequiredStateCleanupInterval方法则判断当前时间是否大于cleanupInterval+member.getHeartbeat(),大于则返回GossipState.DOWN,否则返回原有的state
每次这样全量sendMembershipList在memberList非常多的情况下可能会有效率方面的问题

doc

  • GossipCore
  • GossipManager
  • AbstractActiveGossiper
  • SimpleActiveGossiper
  • GossipMemberStateRefresher