序
本文主要研究一下 apache gossip 的 ActiveGossiper
AbstractActiveGossiper
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
/**
* The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner
*/
public abstract class AbstractActiveGossiper {protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);
protected final GossipManager gossipManager;
protected final GossipCore gossipCore;
private final Histogram sharedDataHistogram;
private final Histogram sendPerNodeDataHistogram;
private final Histogram sendMembershipHistogram;
private final Random random;
private final GossipSettings gossipSettings;
public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
this.gossipManager = gossipManager;
this.gossipCore = gossipCore;
sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time"));
random = new Random();
gossipSettings = gossipManager.getSettings();}
public void init() {}
public void shutdown() {}
public final void sendShutdownMessage(LocalMember me, LocalMember target){if (target == null){return;}
ShutdownMessage m = new ShutdownMessage();
m.setNodeId(me.getId());
m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
gossipCore.sendOneWay(m, target.getUri());
}
//......
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
protected void sendMembershipList(LocalMember me, LocalMember member) {if (member == null){return;}
long startTime = System.currentTimeMillis();
me.setHeartbeat(System.nanoTime());
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalMember other : gossipManager.getMembers().keySet()) {message.getMembers().add(convert(other));
}
Response r = gossipCore.send(message, member.getUri());
if (r instanceof ActiveGossipOk){//maybe count metrics here} else {LOGGER.debug("Message" + message + "generated response" + r);
}
sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
}
protected final Member convert(LocalMember member){Member gm = new Member();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
gm.setUri(member.getUri().toASCIIString());
gm.setId(member.getId());
gm.setProperties(member.getProperties());
return gm;
}
/**
*
* @param memberList
* An immutable list
* @return The chosen LocalGossipMember to gossip with.
*/
protected LocalMember selectPartner(List<LocalMember> memberList) {
LocalMember member = null;
if (memberList.size() > 0) {int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
}
return member;
}
}
- AbstractActiveGossiper 的构造器需要传入 gossipManager 及 gossipCore;它定义了 sendShutdownMessage、sendMembershipList、selectPartner 等方法
- selectPartner 方法在 memberList 不为空的情况下随机生成 randomNeighborIndex 选择出一个 LocalMember
- sendMembershipList 方法首先设置 me 的 heartbeat,然后创建 UdpActiveGossipMessage,该 message 的 members 首先是当前的 localMember,然后再添加 gossipManager.getMembers(),最后通过 gossipCore.send 发送给选中的 member
ActiveGossipMessageHandler
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
public class ActiveGossipMessageHandler implements MessageHandler {
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {List<Member> remoteGossipMembers = new ArrayList<>();
RemoteMember senderMember = null;
UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
URI u;
try {u = new URI(activeGossipMessage.getMembers().get(i).getUri());
} catch (URISyntaxException e) {GossipCore.LOGGER.debug("Gossip message with faulty URI", e);
continue;
}
RemoteMember member = new RemoteMember(activeGossipMessage.getMembers().get(i).getCluster(),
u,
activeGossipMessage.getMembers().get(i).getId(),
activeGossipMessage.getMembers().get(i).getHeartbeat(),
activeGossipMessage.getMembers().get(i).getProperties());
if (i == 0) {senderMember = member;}
if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) {UdpNotAMemberFault f = new UdpNotAMemberFault();
f.setException("Not a member of this cluster" + i);
f.setUriFrom(activeGossipMessage.getUriFrom());
f.setUuid(activeGossipMessage.getUuid());
GossipCore.LOGGER.warn(f);
gossipCore.sendOneWay(f, member.getUri());
continue;
}
remoteGossipMembers.add(member);
}
UdpActiveGossipOk o = new UdpActiveGossipOk();
o.setUriFrom(activeGossipMessage.getUriFrom());
o.setUuid(activeGossipMessage.getUuid());
gossipCore.sendOneWay(o, senderMember.getUri());
gossipCore.mergeLists(senderMember, remoteGossipMembers);
return true;
}
}
- 当目标 member 接收到 UdpActiveGossipMessage 的时候,由 ActiveGossipMessageHandler 来处理该消息;它首先从 activeGossipMessage.getMembers(),转换为 RemoteMember,添加到 remoteGossipMembers,之后通过 gossipCore.sendOneWay 给发送方回复 UdpActiveGossipOk,最后执行 gossipCore.mergeLists(senderMember, remoteGossipMembers)
GossipCore
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
public class GossipCore implements GossipCoreConstants {
class LatchAndBase {
private final CountDownLatch latch;
private volatile Base base;
LatchAndBase(){latch = new CountDownLatch(1);
}
}
public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
private final GossipManager gossipManager;
private ConcurrentHashMap<String, LatchAndBase> requests;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
private final Meter messageSerdeException;
private final Meter transmissionException;
private final Meter transmissionSuccess;
private final DataEventManager eventManager;
public GossipCore(GossipManager manager, MetricRegistry metrics){
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>();
eventManager = new DataEventManager(metrics);
metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size());
metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size());
messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
transmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
transmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
}
public void receive(Base base) {if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {LOGGER.warn("received message can not be handled");
}
}
/**
* Sends a blocking message.
* todo: move functionality to TransportManager layer.
* @param message
* @param uri
* @throws RuntimeException if data can not be serialized or in transmission error
*/
private void sendInternal(Base message, URI uri) {byte[] json_bytes;
try {json_bytes = gossipManager.getProtocolManager().write(message);
} catch (IOException e) {messageSerdeException.mark();
throw new RuntimeException(e);
}
try {gossipManager.getTransportManager().send(uri, json_bytes);
transmissionSuccess.mark();} catch (IOException e) {transmissionException.mark();
throw new RuntimeException(e);
}
}
public Response send(Base message, URI uri){if (LOGGER.isDebugEnabled()){LOGGER.debug("Sending" + message);
LOGGER.debug("Current request queue" + requests);
}
final Trackable t;
LatchAndBase latchAndBase = null;
if (message instanceof Trackable){t = (Trackable) message;
latchAndBase = new LatchAndBase();
requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
} else {t = null;}
sendInternal(message, uri);
if (latchAndBase == null){return null;}
try {boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
if (complete){return (Response) latchAndBase.base;
} else{return null;}
} catch (InterruptedException e) {throw new RuntimeException(e);
} finally {if (latchAndBase != null){requests.remove(t.getUuid() + "/" + t.getUriFrom());
}
}
}
/**
* Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used
* when the protocol for the message is not to wait for a response
* @param message the message to send
* @param u the uri to send it to
*/
public void sendOneWay(Base message, URI u) {
try {sendInternal(message, u);
} catch (RuntimeException ex) {LOGGER.debug("Send one way failed", ex);
}
}
public void handleResponse(String k, Base v) {LatchAndBase latch = requests.get(k);
latch.base = v;
latch.latch.countDown();}
/**
* Merge lists from remote members and update heartbeats
*
* @param senderMember
* @param remoteList
*
*/
public void mergeLists(RemoteMember senderMember, List<Member> remoteList) {if (LOGGER.isDebugEnabled()){debugState(senderMember, remoteList);
}
for (LocalMember i : gossipManager.getDeadMembers()) {if (i.getId().equals(senderMember.getId())) {LOGGER.debug(gossipManager.getMyself() + "contacted by dead member" + senderMember.getUri());
i.recordHeartbeat(senderMember.getHeartbeat());
i.setHeartbeat(senderMember.getHeartbeat());
//TODO consider forcing an UP here
}
}
for (Member remoteMember : remoteList) {if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {continue;}
LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(),
remoteMember.getUri(),
remoteMember.getId(),
remoteMember.getHeartbeat(),
remoteMember.getProperties(),
gossipManager.getSettings().getWindowSize(),
gossipManager.getSettings().getMinimumSamples(),
gossipManager.getSettings().getDistribution());
aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP);
if (result != null){for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){if (localMember.getKey().getId().equals(remoteMember.getId())){localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
localMember.getKey().setProperties(remoteMember.getProperties());
}
}
}
}
if (LOGGER.isDebugEnabled()){debugState(senderMember, remoteList);
}
}
//......
}
- GossipCore 的构造器需要 GossipManager 参数,它定义了 receive、send、sendOneWay、handleResponse、mergeLists 等方法
- mergeLists 方法主要是将接收到的 remoteList 转换为 LocalMember,然后通过的 putIfAbsent 方法与 gossipManager.getMembers()进行合并
- 合并的同时会更新已有 localMember 的 heartbeat,recordHeartbeat 方法会忽略小于等于 latestHeartbeatMs 的值
GossipManager
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
public abstract class GossipManager {public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
// this mapper is used for ring and user-data persistence only. NOT messages.
public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {
private static final long serialVersionUID = 1L;
{enableDefaultTyping();
configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
}};
private final ConcurrentSkipListMap<LocalMember, GossipState> members;
private final LocalMember me;
private final GossipSettings settings;
private final AtomicBoolean gossipServiceRunning;
private TransportManager transportManager;
private ProtocolManager protocolManager;
private final GossipCore gossipCore;
private final DataReaper dataReaper;
private final Clock clock;
private final ScheduledExecutorService scheduledServiced;
private final MetricRegistry registry;
private final RingStatePersister ringState;
private final UserDataPersister userDataState;
private final GossipMemberStateRefresher memberStateRefresher;
private final MessageHandler messageHandler;
private final LockManager lockManager;
public GossipManager(String cluster,
URI uri, String id, Map<String, String> properties, GossipSettings settings,
List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
MessageHandler messageHandler) {
this.settings = settings;
this.messageHandler = messageHandler;
clock = new SystemClock();
me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
gossipCore = new GossipCore(this, registry);
this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry);
dataReaper = new DataReaper(gossipCore, clock);
members = new ConcurrentSkipListMap<>();
for (Member startupMember : gossipMembers) {if (!startupMember.equals(me)) {LocalMember member = new LocalMember(startupMember.getClusterName(),
startupMember.getUri(), startupMember.getId(),
clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
//TODO should members start in down state?
members.put(member, GossipState.DOWN);
}
}
gossipServiceRunning = new AtomicBoolean(true);
this.scheduledServiced = Executors.newScheduledThreadPool(1);
this.registry = registry;
this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this);
this.userDataState = new UserDataPersister(
gossipCore,
GossipManager.buildPerNodeDataPath(this),
GossipManager.buildSharedDataPath(this));
this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData);
readSavedRingState();
readSavedDataState();}
/**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
*/
public void init() {
// protocol manager and transport managers are specified in settings.
// construct them here via reflection.
protocolManager = ReflectionUtils.constructWithReflection(settings.getProtocolManagerClass(),
new Class<?>[] { GossipSettings.class, String.class, MetricRegistry.class},
new Object[] { settings, me.getId(), this.getRegistry()}
);
transportManager = ReflectionUtils.constructWithReflection(settings.getTransportManagerClass(),
new Class<?>[] { GossipManager.class, GossipCore.class},
new Object[] { this, gossipCore}
);
// start processing gossip messages.
transportManager.startEndpoint();
transportManager.startActiveGossiper();
dataReaper.init();
if (settings.isPersistRingState()) {scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
}
if (settings.isPersistDataState()) {scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
}
memberStateRefresher.init();
LOGGER.debug("The GossipManager is started.");
}
/**
* Shutdown the gossip service.
*/
public void shutdown() {gossipServiceRunning.set(false);
lockManager.shutdown();
gossipCore.shutdown();
transportManager.shutdown();
dataReaper.close();
memberStateRefresher.shutdown();
scheduledServiced.shutdown();
try {scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {LOGGER.error(e);
}
scheduledServiced.shutdownNow();}
//......
}
- GossipManager 使用 ConcurrentSkipListMap 维护了 LocalMember 与 GossipState 的映射的 members,同时该构造器创建了 RingStatePersister、UserDataPersister、GossipMemberStateRefresher
- init 方法调用了 transportManager.startEndpoint()及 startActiveGossiper 方法,同时通过 scheduleAtFixedRate 注册了 RingStatePersister、UserDataPersister 这两个定时任务,另外还执行了 memberStateRefresher.init()
- shutdown 方法执行了 gossipCore.shutdown()、transportManager.shutdown()、memberStateRefresher.shutdown()等
GossipMemberStateRefresher
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
public class GossipMemberStateRefresher {public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);
private final Map<LocalMember, GossipState> members;
private final GossipSettings settings;
private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();
private final Clock clock;
private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
private final ExecutorService listenerExecutor;
private final ScheduledExecutorService scheduledExecutor;
private final BlockingQueue<Runnable> workQueue;
public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,
GossipListener listener,
BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
this.members = members;
this.settings = settings;
listeners.add(listener);
this.findPerNodeGossipData = findPerNodeGossipData;
clock = new SystemClock();
workQueue = new ArrayBlockingQueue<>(1024);
listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue,
new ThreadPoolExecutor.DiscardOldestPolicy());
scheduledExecutor = Executors.newScheduledThreadPool(1);
}
public void init() {scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS);
}
public void run() {
try {runOnce();
} catch (RuntimeException ex) {LOGGER.warn("scheduled state had exception", ex);
}
}
public void runOnce() {for (Entry<LocalMember, GossipState> entry : members.entrySet()) {boolean userDown = processOptimisticShutdown(entry);
if (userDown)
continue;
Double phiMeasure = entry.getKey().detect(clock.nanoTime());
GossipState requiredState;
if (phiMeasure != null) {requiredState = calcRequiredState(phiMeasure);
} else {requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue());
}
if (entry.getValue() != requiredState) {members.put(entry.getKey(), requiredState);
/* Call listeners asynchronously */
for (GossipListener listener: listeners)
listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState));
}
}
}
public GossipState calcRequiredState(Double phiMeasure) {if (phiMeasure > settings.getConvictThreshold())
return GossipState.DOWN;
else
return GossipState.UP;
}
public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) {long now = clock.nanoTime();
long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {return GossipState.DOWN;} else {return state;}
}
/**
* If we have a special key the per-node data that means that the node has sent us
* a pre-emptive shutdown message. We process this so node is seen down sooner
*
* @param l member to consider
* @return true if node forced down
*/
public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) {PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
if (m == null) {return false;}
ShutdownMessage s = (ShutdownMessage) m.getPayload();
if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {members.put(l.getKey(), GossipState.DOWN);
if (l.getValue() == GossipState.UP) {for (GossipListener listener: listeners)
listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));
}
return true;
}
return false;
}
public void register(GossipListener listener) {listeners.add(listener);
}
public void shutdown() {scheduledExecutor.shutdown();
try {scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {LOGGER.debug("Issue during shutdown", e);
}
listenerExecutor.shutdown();
try {listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {LOGGER.debug("Issue during shutdown", e);
}
listenerExecutor.shutdownNow();}
}
- GossipMemberStateRefresher 的 init 方法通过 scheduledExecutor.scheduleAtFixedRate 注册了 GossipMemberStateRefresher 的定时任务 (
每隔 100ms 执行
) - runOnce 方法遍历 GossipManager 传入的 members,然后挨个调用 LocalMember 的 detect 方法计算 phiMeasure,如果该值不为 null 则执行 calcRequiredState,否则执行 calcRequiredStateCleanupInterval 来计算 requiredState;如果 state 发生变更则更新然后异步回调 GossipListener 的 gossipEvent 方法
- calcRequiredState 方法判断 phiMeasure 是否大于 convictThreshold(
默认为 10
),大于则返回 GossipState.DOWN,否则返回 GossipState.UP;calcRequiredStateCleanupInterval 方法则判断当前时间是否大于 cleanupInterval+member.getHeartbeat(),大于则返回 GossipState.DOWN,否则返回原有的 state
AbstractTransportManager
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
/**
* Manage the protcol threads (active and passive gossipers).
*/
public abstract class AbstractTransportManager implements TransportManager {public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class);
private final ExecutorService gossipThreadExecutor;
private final AbstractActiveGossiper activeGossipThread;
protected final GossipManager gossipManager;
protected final GossipCore gossipCore;
public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
this.gossipCore = gossipCore;
gossipThreadExecutor = Executors.newCachedThreadPool();
activeGossipThread = ReflectionUtils.constructWithReflection(gossipManager.getSettings().getActiveGossipClass(),
new Class<?>[]{GossipManager.class, GossipCore.class, MetricRegistry.class},
new Object[]{gossipManager, gossipCore, gossipManager.getRegistry()
});
}
// shut down threads etc.
@Override
public void shutdown() {gossipThreadExecutor.shutdown();
if (activeGossipThread != null) {activeGossipThread.shutdown();
}
try {boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
if (!result) {
// common when blocking patterns are used to read data from a socket.
LOGGER.warn("executor shutdown timed out");
}
} catch (InterruptedException e) {LOGGER.error(e);
}
gossipThreadExecutor.shutdownNow();}
@Override
public void startActiveGossiper() {activeGossipThread.init();
}
@Override
public abstract void startEndpoint();}
- AbstractTransportManager 的 startActiveGossiper 会调用 activeGossipThread.init();这里 activeGossipThread 为 AbstractActiveGossiper 的子类,这里我们看下 SimpleActiveGossiper
SimpleActiveGossiper
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
/**
* Base implementation gossips randomly to live nodes periodically gossips to dead ones
*
*/
public class SimpleActiveGossiper extends AbstractActiveGossiper {
private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService;
public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
MetricRegistry registry) {super(gossipManager, gossipCore, registry);
scheduledExecutorService = Executors.newScheduledThreadPool(2);
workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
new ThreadPoolExecutor.DiscardOldestPolicy());
}
@Override
public void init() {super.init();
scheduledExecutorService.scheduleAtFixedRate(() -> {threadService.execute(() -> {sendToALiveMember();
});
}, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> {sendToDeadMember();
}, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> sendPerNodeData(gossipManager.getMyself(),
selectPartner(gossipManager.getLiveMembers())),
0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> sendSharedData(gossipManager.getMyself(),
selectPartner(gossipManager.getLiveMembers())),
0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
@Override
public void shutdown() {super.shutdown();
scheduledExecutorService.shutdown();
try {scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {LOGGER.debug("Issue during shutdown", e);
}
sendShutdownMessage();
threadService.shutdown();
try {threadService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {LOGGER.debug("Issue during shutdown", e);
}
}
protected void sendToALiveMember(){LocalMember member = selectPartner(gossipManager.getLiveMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
protected void sendToDeadMember(){LocalMember member = selectPartner(gossipManager.getDeadMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
/**
* sends an optimistic shutdown message to several clusters nodes
*/
protected void sendShutdownMessage(){List<LocalMember> l = gossipManager.getLiveMembers();
int sendTo = l.size() < 3 ? 1 : l.size() / 2;
for (int i = 0; i < sendTo; i++) {threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
}
}
}
- SimpleActiveGossiper 继承了 AbstractActiveGossiper,它覆盖了 init 方法,这里通过 scheduledExecutorService 的 scheduleAtFixedRate 注册了 sendToALiveMember、sendToDeadMember、sendPerNodeData、sendSharedData 四个定时任务 (
每隔 gossipInterval 执行
) - shutdown 方法主要是执行 scheduledExecutorService.shutdown()、sendShutdownMessage()、threadService.shutdown()
- sendToALiveMember 首先通过父类的 selectPartner 方法来从 gossipManager.getLiveMembers()选择一个 liveMember,之后通过 sendMembershipList 来向它发送 membershipList 信息;sendToDeadMember 首先首先通过父类的 selectPartner 方法来从 gossipManager.getDeadMembers()选择一个 liveMember,之后通过 sendMembershipList 来向它发送 membershipList 信息
小结
- AbstractTransportManager 的 startActiveGossiper 会调用 activeGossipThread.init();这里 activeGossipThread 为 AbstractActiveGossiper 的子类,这里假设为 SimpleActiveGossiper
- SimpleActiveGossiper 的 init 方法,这里通过 scheduledExecutorService 的 scheduleAtFixedRate 注册了 sendToALiveMember、sendToDeadMember、sendPerNodeData、sendSharedData 四个定时任务 (
每隔 gossipInterval 执行
) - sendToALiveMember 首先通过父类的 selectPartner 方法来从 gossipManager.getLiveMembers()选择一个 liveMember,之后通过 sendMembershipList 来向它发送 membershipList 信息;sendToDeadMember 首先首先通过父类的 selectPartner 方法来从 gossipManager.getDeadMembers()选择一个 liveMember,之后通过 sendMembershipList 来向它发送 membershipList 信息
- AbstractActiveGossiper 提供了 selectPartner、sendMembershipList 方法方法;selectPartner 方法在 memberList 不为空的情况下随机生成 randomNeighborIndex 选择出一个 LocalMember;sendMembershipList 方法首先设置 me 的 heartbeat,然后创建 UdpActiveGossipMessage,该 message 的 members 首先是当前的 localMember,然后再添加 gossipManager.getMembers(),最后通过 gossipCore.send 发送给选中的 member
- ActiveGossipMessageHandler 用于处理 UdpActiveGossipMessage;它首先从 activeGossipMessage.getMembers(),转换为 RemoteMember,添加到 remoteGossipMembers,之后通过 gossipCore.sendOneWay 给发送方回复 UdpActiveGossipOk,最后执行 gossipCore.mergeLists(senderMember, remoteGossipMembers)
- GossipCore 的 mergeLists 方法主要是将接收到的 remoteList 转换为 LocalMember,然后通过的 putIfAbsent 方法与 gossipManager.getMembers()进行合并;合并的同时会更新已有 localMember 的 heartbeat,recordHeartbeat 方法会忽略小于等于 latestHeartbeatMs 的值
- GossipManager 使用 ConcurrentSkipListMap 维护了 LocalMember 与 GossipState 的映射的 members,同时该构造器创建了 RingStatePersister、UserDataPersister、GossipMemberStateRefresher;init 方法调用了 transportManager.startEndpoint()及 startActiveGossiper 方法,同时通过 scheduleAtFixedRate 注册了 RingStatePersister、UserDataPersister 这两个定时任务,另外还执行了 memberStateRefresher.init()
- GossipMemberStateRefresher 的 init 方法通过 scheduledExecutor.scheduleAtFixedRate 注册了 GossipMemberStateRefresher 的定时任务 (
每隔 100ms 执行
);runOnce 方法遍历 GossipManager 传入的 members,然后挨个调用 LocalMember 的 detect 方法计算 phiMeasure,如果该值不为 null 则执行 calcRequiredState,否则执行 calcRequiredStateCleanupInterval 来计算 requiredState;如果 state 发生变更则更新然后异步回调 GossipListener 的 gossipEvent 方法;calcRequiredState 方法判断 phiMeasure 是否大于 convictThreshold(默认为 10
),大于则返回 GossipState.DOWN,否则返回 GossipState.UP;calcRequiredStateCleanupInterval 方法则判断当前时间是否大于 cleanupInterval+member.getHeartbeat(),大于则返回 GossipState.DOWN,否则返回原有的 state
每次这样全量 sendMembershipList 在 memberList 非常多的情况下可能会有效率方面的问题
doc
- GossipCore
- GossipManager
- AbstractActiveGossiper
- SimpleActiveGossiper
- GossipMemberStateRefresher