序本文主要研究一下apache gossip的ActiveGossiper
AbstractActiveGossiperincubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
/** * The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner */public abstract class AbstractActiveGossiper { protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class); protected final GossipManager gossipManager; protected final GossipCore gossipCore; private final Histogram sharedDataHistogram; private final Histogram sendPerNodeDataHistogram; private final Histogram sendMembershipHistogram; private final Random random; private final GossipSettings gossipSettings; public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { this.gossipManager = gossipManager; this.gossipCore = gossipCore; sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time")); sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time")); sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time")); random = new Random(); gossipSettings = gossipManager.getSettings(); } public void init() { } public void shutdown() { } public final void sendShutdownMessage(LocalMember me, LocalMember target){ if (target == null){ return; } ShutdownMessage m = new ShutdownMessage(); m.setNodeId(me.getId()); m.setShutdownAtNanos(gossipManager.getClock().nanoTime()); gossipCore.sendOneWay(m, target.getUri()); } //...... /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ protected void sendMembershipList(LocalMember me, LocalMember member) { if (member == null){ return; } long startTime = System.currentTimeMillis(); me.setHeartbeat(System.nanoTime()); UdpActiveGossipMessage message = new UdpActiveGossipMessage(); message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); message.setUuid(UUID.randomUUID().toString()); message.getMembers().add(convert(me)); for (LocalMember other : gossipManager.getMembers().keySet()) { message.getMembers().add(convert(other)); } Response r = gossipCore.send(message, member.getUri()); if (r instanceof ActiveGossipOk){ //maybe count metrics here } else { LOGGER.debug("Message " + message + " generated response " + r); } sendMembershipHistogram.update(System.currentTimeMillis() - startTime); } protected final Member convert(LocalMember member){ Member gm = new Member(); gm.setCluster(member.getClusterName()); gm.setHeartbeat(member.getHeartbeat()); gm.setUri(member.getUri().toASCIIString()); gm.setId(member.getId()); gm.setProperties(member.getProperties()); return gm; } /** * * @param memberList * An immutable list * @return The chosen LocalGossipMember to gossip with. */ protected LocalMember selectPartner(List<LocalMember> memberList) { LocalMember member = null; if (memberList.size() > 0) { int randomNeighborIndex = random.nextInt(memberList.size()); member = memberList.get(randomNeighborIndex); } return member; }}AbstractActiveGossiper的构造器需要传入gossipManager及gossipCore;它定义了sendShutdownMessage、sendMembershipList、selectPartner等方法selectPartner方法在memberList不为空的情况下随机生成randomNeighborIndex选择出一个LocalMembersendMembershipList方法首先设置me的heartbeat,然后创建UdpActiveGossipMessage,该message的members首先是当前的localMember,然后再添加gossipManager.getMembers(),最后通过gossipCore.send发送给选中的memberActiveGossipMessageHandlerincubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
...