乐趣区

聊聊apache-gossip的ActiveGossiper

本文主要研究一下 apache gossip 的 ActiveGossiper

AbstractActiveGossiper

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java

/**
 * The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner
 */
public abstract class AbstractActiveGossiper {protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);

  protected final GossipManager gossipManager;
  protected final GossipCore gossipCore;
  private final Histogram sharedDataHistogram;
  private final Histogram sendPerNodeDataHistogram;
  private final Histogram sendMembershipHistogram;
  private final Random random;
  private final GossipSettings gossipSettings;

  public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
    this.gossipManager = gossipManager;
    this.gossipCore = gossipCore;
    sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
    sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
    sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time"));
    random = new Random();
    gossipSettings = gossipManager.getSettings();}

  public void init() {}

  public void shutdown() {}

  public final void sendShutdownMessage(LocalMember me, LocalMember target){if (target == null){return;}
    ShutdownMessage m = new ShutdownMessage();
    m.setNodeId(me.getId());
    m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
    gossipCore.sendOneWay(m, target.getUri());
  }

  //......

  /**
   * Performs the sending of the membership list, after we have incremented our own heartbeat.
   */
  protected void sendMembershipList(LocalMember me, LocalMember member) {if (member == null){return;}
    long startTime = System.currentTimeMillis();
    me.setHeartbeat(System.nanoTime());
    UdpActiveGossipMessage message = new UdpActiveGossipMessage();
    message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
    message.setUuid(UUID.randomUUID().toString());
    message.getMembers().add(convert(me));
    for (LocalMember other : gossipManager.getMembers().keySet()) {message.getMembers().add(convert(other));
    }
    Response r = gossipCore.send(message, member.getUri());
    if (r instanceof ActiveGossipOk){//maybe count metrics here} else {LOGGER.debug("Message" + message + "generated response" + r);
    }
    sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
  }

  protected final Member convert(LocalMember member){Member gm = new Member();
    gm.setCluster(member.getClusterName());
    gm.setHeartbeat(member.getHeartbeat());
    gm.setUri(member.getUri().toASCIIString());
    gm.setId(member.getId());
    gm.setProperties(member.getProperties());
    return gm;
  }

  /**
   *
   * @param memberList
   *          An immutable list
   * @return The chosen LocalGossipMember to gossip with.
   */
  protected LocalMember selectPartner(List<LocalMember> memberList) {
    LocalMember member = null;
    if (memberList.size() > 0) {int randomNeighborIndex = random.nextInt(memberList.size());
      member = memberList.get(randomNeighborIndex);
    }
    return member;
  }
}
  • AbstractActiveGossiper 的构造器需要传入 gossipManager 及 gossipCore;它定义了 sendShutdownMessage、sendMembershipList、selectPartner 等方法
  • selectPartner 方法在 memberList 不为空的情况下随机生成 randomNeighborIndex 选择出一个 LocalMember
  • sendMembershipList 方法首先设置 me 的 heartbeat,然后创建 UdpActiveGossipMessage,该 message 的 members 首先是当前的 localMember,然后再添加 gossipManager.getMembers(),最后通过 gossipCore.send 发送给选中的 member

ActiveGossipMessageHandler

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java

public class ActiveGossipMessageHandler implements MessageHandler {
  
  /**
   * @param gossipCore context.
   * @param gossipManager context.
   * @param base message reference.
   * @return boolean indicating success.
   */
  @Override
  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {List<Member> remoteGossipMembers = new ArrayList<>();
    RemoteMember senderMember = null;
    UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
    for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
      URI u;
      try {u = new URI(activeGossipMessage.getMembers().get(i).getUri());
      } catch (URISyntaxException e) {GossipCore.LOGGER.debug("Gossip message with faulty URI", e);
        continue;
      }
      RemoteMember member = new RemoteMember(activeGossipMessage.getMembers().get(i).getCluster(),
              u,
              activeGossipMessage.getMembers().get(i).getId(),
              activeGossipMessage.getMembers().get(i).getHeartbeat(),
              activeGossipMessage.getMembers().get(i).getProperties());
      if (i == 0) {senderMember = member;}
      if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) {UdpNotAMemberFault f = new UdpNotAMemberFault();
        f.setException("Not a member of this cluster" + i);
        f.setUriFrom(activeGossipMessage.getUriFrom());
        f.setUuid(activeGossipMessage.getUuid());
        GossipCore.LOGGER.warn(f);
        gossipCore.sendOneWay(f, member.getUri());
        continue;
      }
      remoteGossipMembers.add(member);
    }
    UdpActiveGossipOk o = new UdpActiveGossipOk();
    o.setUriFrom(activeGossipMessage.getUriFrom());
    o.setUuid(activeGossipMessage.getUuid());
    gossipCore.sendOneWay(o, senderMember.getUri());
    gossipCore.mergeLists(senderMember, remoteGossipMembers);
    return true;
  }
}
  • 当目标 member 接收到 UdpActiveGossipMessage 的时候,由 ActiveGossipMessageHandler 来处理该消息;它首先从 activeGossipMessage.getMembers(),转换为 RemoteMember,添加到 remoteGossipMembers,之后通过 gossipCore.sendOneWay 给发送方回复 UdpActiveGossipOk,最后执行 gossipCore.mergeLists(senderMember, remoteGossipMembers)

GossipCore

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java

public class GossipCore implements GossipCoreConstants {

  class LatchAndBase {
    private final CountDownLatch latch;
    private volatile Base base;
    
    LatchAndBase(){latch = new CountDownLatch(1);
    }
    
  }
  public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
  private final GossipManager gossipManager;
  private ConcurrentHashMap<String, LatchAndBase> requests;
  private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
  private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
  private final Meter messageSerdeException;
  private final Meter transmissionException;
  private final Meter transmissionSuccess;
  private final DataEventManager eventManager;
  
  public GossipCore(GossipManager manager, MetricRegistry metrics){
    this.gossipManager = manager;
    requests = new ConcurrentHashMap<>();
    perNodeData = new ConcurrentHashMap<>();
    sharedData = new ConcurrentHashMap<>();
    eventManager = new DataEventManager(metrics);
    metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
    metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() ->  sharedData.size());
    metrics.register(REQUEST_SIZE, (Gauge<Integer>)() ->  requests.size());
    messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
    transmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
    transmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
  }

  public void receive(Base base) {if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {LOGGER.warn("received message can not be handled");
    }
  }

  /**
   * Sends a blocking message.
   * todo: move functionality to TransportManager layer.
   * @param message
   * @param uri
   * @throws RuntimeException if data can not be serialized or in transmission error
   */
  private void sendInternal(Base message, URI uri) {byte[] json_bytes;
    try {json_bytes = gossipManager.getProtocolManager().write(message);
    } catch (IOException e) {messageSerdeException.mark();
      throw new RuntimeException(e);
    }
    try {gossipManager.getTransportManager().send(uri, json_bytes);
      transmissionSuccess.mark();} catch (IOException e) {transmissionException.mark();
      throw new RuntimeException(e);
    }
  }

  public Response send(Base message, URI uri){if (LOGGER.isDebugEnabled()){LOGGER.debug("Sending" + message);
      LOGGER.debug("Current request queue" + requests);
    }

    final Trackable t;
    LatchAndBase latchAndBase = null;
    if (message instanceof Trackable){t = (Trackable) message;
      latchAndBase = new LatchAndBase();
      requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
    } else {t = null;}
    sendInternal(message, uri);
    if (latchAndBase == null){return null;}
    
    try {boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
      if (complete){return (Response) latchAndBase.base;
      } else{return null;}
    } catch (InterruptedException e) {throw new RuntimeException(e);
    } finally {if (latchAndBase != null){requests.remove(t.getUuid() + "/" + t.getUriFrom());
      }
    }
  }

  /**
   * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used
   * when the protocol for the message is not to wait for a response
   * @param message the message to send
   * @param u the uri to send it to
   */
  public void sendOneWay(Base message, URI u) {
    try {sendInternal(message, u);
    } catch (RuntimeException ex) {LOGGER.debug("Send one way failed", ex);
    }
  }

  public void handleResponse(String k, Base v) {LatchAndBase latch = requests.get(k);
    latch.base = v;
    latch.latch.countDown();}

  /**
   * Merge lists from remote members and update heartbeats
   *
   * @param senderMember
   * @param remoteList
   *
   */
  public void mergeLists(RemoteMember senderMember, List<Member> remoteList) {if (LOGGER.isDebugEnabled()){debugState(senderMember, remoteList);
    }
    for (LocalMember i : gossipManager.getDeadMembers()) {if (i.getId().equals(senderMember.getId())) {LOGGER.debug(gossipManager.getMyself() + "contacted by dead member" + senderMember.getUri());
        i.recordHeartbeat(senderMember.getHeartbeat());
        i.setHeartbeat(senderMember.getHeartbeat());
        //TODO consider forcing an UP here
      }
    }
    for (Member remoteMember : remoteList) {if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {continue;}
      LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(),
      remoteMember.getUri(),
      remoteMember.getId(),
      remoteMember.getHeartbeat(),
      remoteMember.getProperties(),
      gossipManager.getSettings().getWindowSize(),
      gossipManager.getSettings().getMinimumSamples(),
      gossipManager.getSettings().getDistribution());
      aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
      Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP);
      if (result != null){for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){if (localMember.getKey().getId().equals(remoteMember.getId())){localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
            localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
            localMember.getKey().setProperties(remoteMember.getProperties());
          }
        }
      }
    }
    if (LOGGER.isDebugEnabled()){debugState(senderMember, remoteList);
    }
  }

  //......

}
  • GossipCore 的构造器需要 GossipManager 参数,它定义了 receive、send、sendOneWay、handleResponse、mergeLists 等方法
  • mergeLists 方法主要是将接收到的 remoteList 转换为 LocalMember,然后通过的 putIfAbsent 方法与 gossipManager.getMembers()进行合并
  • 合并的同时会更新已有 localMember 的 heartbeat,recordHeartbeat 方法会忽略小于等于 latestHeartbeatMs 的值

GossipManager

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java

public abstract class GossipManager {public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
  
  // this mapper is used for ring and user-data persistence only. NOT messages.
  public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {
    private static final long serialVersionUID = 1L;
  {enableDefaultTyping();
    configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
  }};

  private final ConcurrentSkipListMap<LocalMember, GossipState> members;
  private final LocalMember me;
  private final GossipSettings settings;
  private final AtomicBoolean gossipServiceRunning;
  
  private TransportManager transportManager;
  private ProtocolManager protocolManager;
  
  private final GossipCore gossipCore;
  private final DataReaper dataReaper;
  private final Clock clock;
  private final ScheduledExecutorService scheduledServiced;
  private final MetricRegistry registry;
  private final RingStatePersister ringState;
  private final UserDataPersister userDataState;
  private final GossipMemberStateRefresher memberStateRefresher;
  
  private final MessageHandler messageHandler;
  private final LockManager lockManager;

  public GossipManager(String cluster,
                       URI uri, String id, Map<String, String> properties, GossipSettings settings,
                       List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
                       MessageHandler messageHandler) {
    this.settings = settings;
    this.messageHandler = messageHandler;

    clock = new SystemClock();
    me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
            settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
    gossipCore = new GossipCore(this, registry);
    this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry);
    dataReaper = new DataReaper(gossipCore, clock);
    members = new ConcurrentSkipListMap<>();
    for (Member startupMember : gossipMembers) {if (!startupMember.equals(me)) {LocalMember member = new LocalMember(startupMember.getClusterName(),
                startupMember.getUri(), startupMember.getId(),
                clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
                settings.getMinimumSamples(), settings.getDistribution());
        //TODO should members start in down state?
        members.put(member, GossipState.DOWN);
      }
    }
    gossipServiceRunning = new AtomicBoolean(true);
    this.scheduledServiced = Executors.newScheduledThreadPool(1);
    this.registry = registry;
    this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this);
    this.userDataState = new UserDataPersister(
        gossipCore,
        GossipManager.buildPerNodeDataPath(this),
        GossipManager.buildSharedDataPath(this));
    this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData);
    readSavedRingState();
    readSavedDataState();}

  /**
   * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
   * thread and start the receiver thread.
   */
  public void init() {
    
    // protocol manager and transport managers are specified in settings.
    // construct them here via reflection.
    
    protocolManager = ReflectionUtils.constructWithReflection(settings.getProtocolManagerClass(),
        new Class<?>[] { GossipSettings.class, String.class, MetricRegistry.class},
        new Object[] { settings, me.getId(), this.getRegistry()}
    );
    
    transportManager = ReflectionUtils.constructWithReflection(settings.getTransportManagerClass(),
        new Class<?>[] { GossipManager.class, GossipCore.class},
        new Object[] { this, gossipCore}
    );
    
    // start processing gossip messages.
    transportManager.startEndpoint();
    transportManager.startActiveGossiper();
    
    dataReaper.init();
    if (settings.isPersistRingState()) {scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
    }
    if (settings.isPersistDataState()) {scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
    }
    memberStateRefresher.init();
    LOGGER.debug("The GossipManager is started.");
  }

  /**
   * Shutdown the gossip service.
   */
  public void shutdown() {gossipServiceRunning.set(false);
    lockManager.shutdown();
    gossipCore.shutdown();
    transportManager.shutdown();
    dataReaper.close();
    memberStateRefresher.shutdown();
    scheduledServiced.shutdown();
    try {scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
    } catch (InterruptedException e) {LOGGER.error(e);
    }
    scheduledServiced.shutdownNow();}

  //......

}
  • GossipManager 使用 ConcurrentSkipListMap 维护了 LocalMember 与 GossipState 的映射的 members,同时该构造器创建了 RingStatePersister、UserDataPersister、GossipMemberStateRefresher
  • init 方法调用了 transportManager.startEndpoint()及 startActiveGossiper 方法,同时通过 scheduleAtFixedRate 注册了 RingStatePersister、UserDataPersister 这两个定时任务,另外还执行了 memberStateRefresher.init()
  • shutdown 方法执行了 gossipCore.shutdown()、transportManager.shutdown()、memberStateRefresher.shutdown()等

GossipMemberStateRefresher

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java

public class GossipMemberStateRefresher {public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);

  private final Map<LocalMember, GossipState> members;
  private final GossipSettings settings;
  private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();
  private final Clock clock;
  private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
  private final ExecutorService listenerExecutor;
  private final ScheduledExecutorService scheduledExecutor;
  private final BlockingQueue<Runnable> workQueue;

  public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,
                                    GossipListener listener,
                                    BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
    this.members = members;
    this.settings = settings;
    listeners.add(listener);
    this.findPerNodeGossipData = findPerNodeGossipData;
    clock = new SystemClock();
    workQueue = new ArrayBlockingQueue<>(1024);
    listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue,
            new ThreadPoolExecutor.DiscardOldestPolicy());
    scheduledExecutor = Executors.newScheduledThreadPool(1);
  }

  public void init() {scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS);
  }

  public void run() {
    try {runOnce();
    } catch (RuntimeException ex) {LOGGER.warn("scheduled state had exception", ex);
    }
  }

  public void runOnce() {for (Entry<LocalMember, GossipState> entry : members.entrySet()) {boolean userDown = processOptimisticShutdown(entry);
      if (userDown)
        continue;

      Double phiMeasure = entry.getKey().detect(clock.nanoTime());
      GossipState requiredState;

      if (phiMeasure != null) {requiredState = calcRequiredState(phiMeasure);
      } else {requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue());
      }

      if (entry.getValue() != requiredState) {members.put(entry.getKey(), requiredState);
        /* Call listeners asynchronously */
        for (GossipListener listener: listeners)
          listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState));
      }
    }
  }

  public GossipState calcRequiredState(Double phiMeasure) {if (phiMeasure > settings.getConvictThreshold())
      return GossipState.DOWN;
    else
      return GossipState.UP;
  }

  public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) {long now = clock.nanoTime();
    long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
    if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {return GossipState.DOWN;} else {return state;}
  }

  /**
   * If we have a special key the per-node data that means that the node has sent us
   * a pre-emptive shutdown message. We process this so node is seen down sooner
   *
   * @param l member to consider
   * @return true if node forced down
   */
  public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) {PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
    if (m == null) {return false;}
    ShutdownMessage s = (ShutdownMessage) m.getPayload();
    if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {members.put(l.getKey(), GossipState.DOWN);
      if (l.getValue() == GossipState.UP) {for (GossipListener listener: listeners)
          listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));
      }
      return true;
    }
    return false;
  }

  public void register(GossipListener listener) {listeners.add(listener);
  }

  public void shutdown() {scheduledExecutor.shutdown();
    try {scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {LOGGER.debug("Issue during shutdown", e);
    }
    listenerExecutor.shutdown();
    try {listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {LOGGER.debug("Issue during shutdown", e);
    }
    listenerExecutor.shutdownNow();}
}
  • GossipMemberStateRefresher 的 init 方法通过 scheduledExecutor.scheduleAtFixedRate 注册了 GossipMemberStateRefresher 的定时任务 ( 每隔 100ms 执行)
  • runOnce 方法遍历 GossipManager 传入的 members,然后挨个调用 LocalMember 的 detect 方法计算 phiMeasure,如果该值不为 null 则执行 calcRequiredState,否则执行 calcRequiredStateCleanupInterval 来计算 requiredState;如果 state 发生变更则更新然后异步回调 GossipListener 的 gossipEvent 方法
  • calcRequiredState 方法判断 phiMeasure 是否大于 convictThreshold(默认为 10),大于则返回 GossipState.DOWN,否则返回 GossipState.UP;calcRequiredStateCleanupInterval 方法则判断当前时间是否大于 cleanupInterval+member.getHeartbeat(),大于则返回 GossipState.DOWN,否则返回原有的 state

AbstractTransportManager

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java

/**
 * Manage the protcol threads (active and passive gossipers).
 */
public abstract class AbstractTransportManager implements TransportManager {public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class);
  
  private final ExecutorService gossipThreadExecutor;
  private final AbstractActiveGossiper activeGossipThread;
  protected final GossipManager gossipManager;
  protected final GossipCore gossipCore;
  
  public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
    this.gossipManager = gossipManager;
    this.gossipCore = gossipCore;
    gossipThreadExecutor = Executors.newCachedThreadPool();
    activeGossipThread = ReflectionUtils.constructWithReflection(gossipManager.getSettings().getActiveGossipClass(),
        new Class<?>[]{GossipManager.class, GossipCore.class, MetricRegistry.class},
        new Object[]{gossipManager, gossipCore, gossipManager.getRegistry()
        });
  }

  // shut down threads etc.
  @Override
  public void shutdown() {gossipThreadExecutor.shutdown();
    if (activeGossipThread != null) {activeGossipThread.shutdown();
    }
    try {boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
      if (!result) {
        // common when blocking patterns are used to read data from a socket.
        LOGGER.warn("executor shutdown timed out");
      }
    } catch (InterruptedException e) {LOGGER.error(e);
    }
    gossipThreadExecutor.shutdownNow();}

  @Override
  public void startActiveGossiper() {activeGossipThread.init();
  }

  @Override
  public abstract void startEndpoint();}
  • AbstractTransportManager 的 startActiveGossiper 会调用 activeGossipThread.init();这里 activeGossipThread 为 AbstractActiveGossiper 的子类,这里我们看下 SimpleActiveGossiper

SimpleActiveGossiper

incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java

/**
 * Base implementation gossips randomly to live nodes periodically gossips to dead ones
 *
 */
public class SimpleActiveGossiper extends AbstractActiveGossiper {

  private ScheduledExecutorService scheduledExecutorService;
  private final BlockingQueue<Runnable> workQueue;
  private ThreadPoolExecutor threadService;
  
  public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
                              MetricRegistry registry) {super(gossipManager, gossipCore, registry);
    scheduledExecutorService = Executors.newScheduledThreadPool(2);
    workQueue = new ArrayBlockingQueue<Runnable>(1024);
    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
            new ThreadPoolExecutor.DiscardOldestPolicy());
  }

  @Override
  public void init() {super.init();
    scheduledExecutorService.scheduleAtFixedRate(() -> {threadService.execute(() -> {sendToALiveMember();
      });
    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(() -> {sendToDeadMember();
    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(() -> sendPerNodeData(gossipManager.getMyself(),
                    selectPartner(gossipManager.getLiveMembers())),
            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
    scheduledExecutorService.scheduleAtFixedRate(() -> sendSharedData(gossipManager.getMyself(),
                    selectPartner(gossipManager.getLiveMembers())),
            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
  }
  
  @Override
  public void shutdown() {super.shutdown();
    scheduledExecutorService.shutdown();
    try {scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {LOGGER.debug("Issue during shutdown", e);
    }
    sendShutdownMessage();
    threadService.shutdown();
    try {threadService.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {LOGGER.debug("Issue during shutdown", e);
    }
  }

  protected void sendToALiveMember(){LocalMember member = selectPartner(gossipManager.getLiveMembers());
    sendMembershipList(gossipManager.getMyself(), member);
  }
  
  protected void sendToDeadMember(){LocalMember member = selectPartner(gossipManager.getDeadMembers());
    sendMembershipList(gossipManager.getMyself(), member);
  }
  
  /**
   * sends an optimistic shutdown message to several clusters nodes
   */
  protected void sendShutdownMessage(){List<LocalMember> l = gossipManager.getLiveMembers();
    int sendTo = l.size() < 3 ? 1 : l.size() / 2;
    for (int i = 0; i < sendTo; i++) {threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
    }
  }
}
  • SimpleActiveGossiper 继承了 AbstractActiveGossiper,它覆盖了 init 方法,这里通过 scheduledExecutorService 的 scheduleAtFixedRate 注册了 sendToALiveMember、sendToDeadMember、sendPerNodeData、sendSharedData 四个定时任务 ( 每隔 gossipInterval 执行)
  • shutdown 方法主要是执行 scheduledExecutorService.shutdown()、sendShutdownMessage()、threadService.shutdown()
  • sendToALiveMember 首先通过父类的 selectPartner 方法来从 gossipManager.getLiveMembers()选择一个 liveMember,之后通过 sendMembershipList 来向它发送 membershipList 信息;sendToDeadMember 首先首先通过父类的 selectPartner 方法来从 gossipManager.getDeadMembers()选择一个 liveMember,之后通过 sendMembershipList 来向它发送 membershipList 信息

小结

  • AbstractTransportManager 的 startActiveGossiper 会调用 activeGossipThread.init();这里 activeGossipThread 为 AbstractActiveGossiper 的子类,这里假设为 SimpleActiveGossiper
  • SimpleActiveGossiper 的 init 方法,这里通过 scheduledExecutorService 的 scheduleAtFixedRate 注册了 sendToALiveMember、sendToDeadMember、sendPerNodeData、sendSharedData 四个定时任务 ( 每隔 gossipInterval 执行)
  • sendToALiveMember 首先通过父类的 selectPartner 方法来从 gossipManager.getLiveMembers()选择一个 liveMember,之后通过 sendMembershipList 来向它发送 membershipList 信息;sendToDeadMember 首先首先通过父类的 selectPartner 方法来从 gossipManager.getDeadMembers()选择一个 liveMember,之后通过 sendMembershipList 来向它发送 membershipList 信息
  • AbstractActiveGossiper 提供了 selectPartner、sendMembershipList 方法方法;selectPartner 方法在 memberList 不为空的情况下随机生成 randomNeighborIndex 选择出一个 LocalMember;sendMembershipList 方法首先设置 me 的 heartbeat,然后创建 UdpActiveGossipMessage,该 message 的 members 首先是当前的 localMember,然后再添加 gossipManager.getMembers(),最后通过 gossipCore.send 发送给选中的 member
  • ActiveGossipMessageHandler 用于处理 UdpActiveGossipMessage;它首先从 activeGossipMessage.getMembers(),转换为 RemoteMember,添加到 remoteGossipMembers,之后通过 gossipCore.sendOneWay 给发送方回复 UdpActiveGossipOk,最后执行 gossipCore.mergeLists(senderMember, remoteGossipMembers)
  • GossipCore 的 mergeLists 方法主要是将接收到的 remoteList 转换为 LocalMember,然后通过的 putIfAbsent 方法与 gossipManager.getMembers()进行合并;合并的同时会更新已有 localMember 的 heartbeat,recordHeartbeat 方法会忽略小于等于 latestHeartbeatMs 的值
  • GossipManager 使用 ConcurrentSkipListMap 维护了 LocalMember 与 GossipState 的映射的 members,同时该构造器创建了 RingStatePersister、UserDataPersister、GossipMemberStateRefresher;init 方法调用了 transportManager.startEndpoint()及 startActiveGossiper 方法,同时通过 scheduleAtFixedRate 注册了 RingStatePersister、UserDataPersister 这两个定时任务,另外还执行了 memberStateRefresher.init()
  • GossipMemberStateRefresher 的 init 方法通过 scheduledExecutor.scheduleAtFixedRate 注册了 GossipMemberStateRefresher 的定时任务 ( 每隔 100ms 执行 );runOnce 方法遍历 GossipManager 传入的 members,然后挨个调用 LocalMember 的 detect 方法计算 phiMeasure,如果该值不为 null 则执行 calcRequiredState,否则执行 calcRequiredStateCleanupInterval 来计算 requiredState;如果 state 发生变更则更新然后异步回调 GossipListener 的 gossipEvent 方法;calcRequiredState 方法判断 phiMeasure 是否大于 convictThreshold( 默认为 10),大于则返回 GossipState.DOWN,否则返回 GossipState.UP;calcRequiredStateCleanupInterval 方法则判断当前时间是否大于 cleanupInterval+member.getHeartbeat(),大于则返回 GossipState.DOWN,否则返回原有的 state

每次这样全量 sendMembershipList 在 memberList 非常多的情况下可能会有效率方面的问题

doc

  • GossipCore
  • GossipManager
  • AbstractActiveGossiper
  • SimpleActiveGossiper
  • GossipMemberStateRefresher
退出移动版