序
本文主要研究一下 scalecube-cluster 的 MembershipProtocol
MembershipProtocol
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java
/**
* Cluster Membership Protocol component responsible for managing information about existing members
* of the cluster.
*/
public interface MembershipProtocol {
/**
* Starts running cluster membership protocol. After started it begins to receive and send cluster
* membership messages
*/
Mono<Void> start();
/** Stops running cluster membership protocol and releases occupied resources. */
void stop();
/** Listen changes in cluster membership. */
Flux<MembershipEvent> listen();
/**
* Returns list of all members of the joined cluster. This will include all cluster members
* including local member.
*
* @return all members in the cluster (including local one)
*/
Collection<Member> members();
/**
* Returns list of all cluster members of the joined cluster excluding local member.
*
* @return all members in the cluster (excluding local one)
*/
Collection<Member> otherMembers();
/**
* Returns local cluster member which corresponds to this cluster instance.
*
* @return local member
*/
Member member();
/**
* Returns cluster member with given id or null if no member with such id exists at joined
* cluster.
*
* @return member by id
*/
Optional<Member> member(String id);
/**
* Returns cluster member by given address or null if no member with such address exists at joined
* cluster.
*
* @return member by address
*/
Optional<Member> member(Address address);
}
- MembershipProtocol 接口定义了 start、stop、listen、members、otherMembers、member 方法
MembershipProtocolImpl
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocolImpl.class);
private enum MembershipUpdateReason {
FAILURE_DETECTOR_EVENT,
MEMBERSHIP_GOSSIP,
SYNC,
INITIAL_SYNC,
SUSPICION_TIMEOUT
}
// Qualifiers
public static final String SYNC = "sc/membership/sync";
public static final String SYNC_ACK = "sc/membership/syncAck";
public static final String MEMBERSHIP_GOSSIP = "sc/membership/gossip";
private final Member localMember;
// Injected
private final Transport transport;
private final MembershipConfig config;
private final List<Address> seedMembers;
private final FailureDetector failureDetector;
private final GossipProtocol gossipProtocol;
private final MetadataStore metadataStore;
private final CorrelationIdGenerator cidGenerator;
// State
private final Map<String, MembershipRecord> membershipTable = new HashMap<>();
private final Map<String, Member> members = new HashMap<>();
// Subject
private final FluxProcessor<MembershipEvent, MembershipEvent> subject =
DirectProcessor.<MembershipEvent>create().serialize();
private final FluxSink<MembershipEvent> sink = subject.sink();
// Disposables
private final Disposable.Composite actionsDisposables = Disposables.composite();
// Scheduled
private final Scheduler scheduler;
private final Map<String, Disposable> suspicionTimeoutTasks = new HashMap<>();
/**
* Creates new instantiates of cluster membership protocol with given transport and config.
*
* @param localMember local cluster member
* @param transport cluster transport
* @param failureDetector failure detector
* @param gossipProtocol gossip protocol
* @param metadataStore metadata store
* @param config membership config parameters
* @param scheduler scheduler
* @param cidGenerator correlation id generator
*/
public MembershipProtocolImpl(
Member localMember,
Transport transport,
FailureDetector failureDetector,
GossipProtocol gossipProtocol,
MetadataStore metadataStore,
MembershipConfig config,
Scheduler scheduler,
CorrelationIdGenerator cidGenerator) {this.transport = Objects.requireNonNull(transport);
this.config = Objects.requireNonNull(config);
this.failureDetector = Objects.requireNonNull(failureDetector);
this.gossipProtocol = Objects.requireNonNull(gossipProtocol);
this.metadataStore = Objects.requireNonNull(metadataStore);
this.localMember = Objects.requireNonNull(localMember);
this.scheduler = Objects.requireNonNull(scheduler);
this.cidGenerator = Objects.requireNonNull(cidGenerator);
// Prepare seeds
seedMembers = cleanUpSeedMembers(config.getSeedMembers());
// Init membership table with local member record
membershipTable.put(localMember.id(), new MembershipRecord(localMember, ALIVE, 0));
// fill in the table of members with local member
members.put(localMember.id(), localMember);
actionsDisposables.addAll(
Arrays.asList(
// Listen to incoming SYNC and SYNC ACK requests from other members
transport
.listen() //
.publishOn(scheduler)
.subscribe(this::onMessage, this::onError),
// Listen to events from failure detector
failureDetector
.listen()
.publishOn(scheduler)
.subscribe(this::onFailureDetectorEvent, this::onError),
// Listen to membership gossips
gossipProtocol
.listen()
.publishOn(scheduler)
.subscribe(this::onMembershipGossip, this::onError)));
}
@Override
public Flux<MembershipEvent> listen() {return subject.onBackpressureBuffer();
}
@Override
public Mono<Void> start() {
// Make initial sync with all seed members
return Mono.create(
sink -> {
// In case no members at the moment just schedule periodic sync
if (seedMembers.isEmpty()) {schedulePeriodicSync();
sink.success();
return;
}
// If seed addresses are specified in config - send initial sync to those nodes
LOGGER.debug("Making initial Sync to all seed members: {}", seedMembers);
//noinspection unchecked
Mono<Message>[] syncs =
seedMembers
.stream()
.map(
address -> {String cid = cidGenerator.nextCid();
return transport
.requestResponse(prepareSyncDataMsg(SYNC, cid), address)
.filter(this::checkSyncGroup);
})
.toArray(Mono[]::new);
// Process initial SyncAck
Flux.mergeDelayError(syncs.length, syncs)
.take(1)
.timeout(Duration.ofMillis(config.getSyncTimeout()), scheduler)
.publishOn(scheduler)
.flatMap(message -> onSyncAck(message, true))
.doFinally(
s -> {schedulePeriodicSync();
sink.success();})
.subscribe(
null,
ex -> LOGGER.info("Exception on initial SyncAck, cause: {}", ex.toString()));
});
}
@Override
public void stop() {
// Stop accepting requests, events and sending sync
actionsDisposables.dispose();
// Cancel remove members tasks
for (String memberId : suspicionTimeoutTasks.keySet()) {Disposable future = suspicionTimeoutTasks.get(memberId);
if (future != null && !future.isDisposed()) {future.dispose();
}
}
suspicionTimeoutTasks.clear();
// Stop publishing events
sink.complete();}
@Override
public Collection<Member> members() {return new ArrayList<>(members.values());
}
@Override
public Collection<Member> otherMembers() {return new ArrayList<>(members.values())
.stream()
.filter(member -> !member.equals(localMember))
.collect(Collectors.toList());
}
@Override
public Member member() {return localMember;}
@Override
public Optional<Member> member(String id) {return Optional.ofNullable(members.get(id));
}
@Override
public Optional<Member> member(Address address) {return new ArrayList<>(members.values())
.stream()
.filter(member -> member.address().equals(address))
.findFirst();}
//......
}
- MembershipProtocolImpl 实现了 MembershipProtocol 接口;它定义了 MembershipUpdateReason 枚举(
FAILURE_DETECTOR_EVENT、MEMBERSHIP_GOSSIP、SYNC、INITIAL_SYNC、SUSPICION_TIMEOUT
) - MembershipProtocolImpl 的构造器监听了 transport.listen()触发 onMessage 方法;监听了 failureDetector.listen()触发 onFailureDetectorEvent 方法;监听了 gossipProtocol.listen()触发 onMembershipGossip 方法
- MembershipProtocolImpl 的 start 方法在 seedMembers.isEmpty()的时候会执行 schedulePeriodicSync 方法即每隔 syncInterval 执行 doSync 方法;当 seedMembers 不为空时则遍历 seedMembers 通过 transport.requestResponse 发送 SYNC,执行成功则触发 onSyncAck;stop 方法则挨个销毁 suspicionTimeoutTasks 的 future
onMessage
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
//......
private void onMessage(Message message) {if (checkSyncGroup(message)) {if (SYNC.equals(message.qualifier())) {onSync(message).subscribe(null, this::onError);
}
if (SYNC_ACK.equals(message.qualifier())) {if (message.correlationId() == null) { // filter out initial sync
onSyncAck(message, false).subscribe(null, this::onError);
}
}
}
}
private boolean checkSyncGroup(Message message) {if (message.data() instanceof SyncData) {SyncData syncData = message.data();
return config.getSyncGroup().equals(syncData.getSyncGroup());
}
return false;
}
/** Merges incoming SYNC data, merges it and sending back merged data with SYNC_ACK. */
private Mono<Void> onSync(Message syncMsg) {
return Mono.defer(() -> {LOGGER.debug("Received Sync: {}", syncMsg);
return syncMembership(syncMsg.data(), false)
.doOnSuccess(
avoid -> {Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());
Address address = syncMsg.sender();
transport
.send(address, message)
.subscribe(
null,
ex ->
LOGGER.debug("Failed to send SyncAck: {} to {}, cause: {}",
message,
address,
ex.toString()));
});
});
}
private Mono<Void> onSyncAck(Message syncAckMsg, boolean onStart) {
return Mono.defer(() -> {LOGGER.debug("Received SyncAck: {}", syncAckMsg);
return syncMembership(syncAckMsg.data(), onStart);
});
}
private Mono<Void> syncMembership(SyncData syncData, boolean onStart) {
return Mono.defer(() -> {
MembershipUpdateReason reason =
onStart ? MembershipUpdateReason.INITIAL_SYNC : MembershipUpdateReason.SYNC;
return Mono.whenDelayError(
syncData
.getMembership()
.stream()
.filter(r1 -> !r1.equals(membershipTable.get(r1.id())))
.map(r1 -> updateMembership(r1, reason))
.toArray(Mono[]::new));
});
}
//......
}
- onMessage 方法首先通过 checkSyncGroup 检查一下是不是该 syncGroup 的消息;之后根据 message.qualifier()是 SYNC 则执行 onSync,是 SYNC_ACK 则执行 onSyncAck
- onSync 方法则执行 syncMembership,成功时向 sender 返回 SYNC_ACK 信息;onSyncAck 方法也是调用 syncMembership,只不过没有再向 sender 返回信息
- syncMembership 方法则根据 syncData 的 membership 来挨个执行 updateMembership 方法
onFailureDetectorEvent
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
//......
/** Merges FD updates and processes them. */
private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {MembershipRecord r0 = membershipTable.get(fdEvent.member().id());
if (r0 == null) { // member already removed
return;
}
if (r0.status() == fdEvent.status()) { // status not changed
return;
}
LOGGER.debug("Received status change on failure detector event: {}", fdEvent);
if (fdEvent.status() == ALIVE) {
// TODO: Consider to make more elegant solution
// Alive won't override SUSPECT so issue instead extra sync with member to force it spread
// alive with inc + 1
Message syncMsg = prepareSyncDataMsg(SYNC, null);
Address address = fdEvent.member().address();
transport
.send(address, syncMsg)
.subscribe(
null,
ex ->
LOGGER.debug("Failed to send {} to {}, cause: {}", syncMsg, address, ex.toString()));
} else {
MembershipRecord record =
new MembershipRecord(r0.member(), fdEvent.status(), r0.incarnation());
updateMembership(record, MembershipUpdateReason.FAILURE_DETECTOR_EVENT)
.subscribe(null, this::onError);
}
}
//......
}
- onFailureDetectorEvent 方法根据 FailureDetectorEvent 判断该 MembershipRecord 的状态是否有变化,如果变为 ALIVE 则往 fdEvent.member().address()发送 SYNC 信息;否则使用 MembershipUpdateReason.FAILURE_DETECTOR_EVENT 来 updateMembership
onMembershipGossip
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
//......
/** Merges received membership gossip (not spreading gossip further). */
private void onMembershipGossip(Message message) {if (MEMBERSHIP_GOSSIP.equals(message.qualifier())) {MembershipRecord record = message.data();
LOGGER.debug("Received membership gossip: {}", record);
updateMembership(record, MembershipUpdateReason.MEMBERSHIP_GOSSIP)
.subscribe(null, this::onError);
}
}
//......
}
- onMembershipGossip 方法则针对 message.qualifier()为 MEMBERSHIP_GOSSIP 的消息使用 MembershipUpdateReason.MEMBERSHIP_GOSSIP 来 updateMembership
updateMembership
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
//......
/**
* Try to update membership table with the given record.
*
* @param r1 new membership record which compares with existing r0 record
* @param reason indicating the reason for updating membership table
*/
private Mono<Void> updateMembership(MembershipRecord r1, MembershipUpdateReason reason) {
return Mono.defer(() -> {Objects.requireNonNull(r1, "Membership record can't be null");
// Get current record
MembershipRecord r0 = membershipTable.get(r1.id());
// Check if new record r1 overrides existing membership record r0
if (!r1.isOverrides(r0)) {return Mono.empty();
}
// If received updated for local member then increase incarnation and spread Alive gossip
if (r1.member().id().equals(localMember.id())) {int currentIncarnation = Math.max(r0.incarnation(), r1.incarnation());
MembershipRecord r2 =
new MembershipRecord(localMember, r0.status(), currentIncarnation + 1);
membershipTable.put(localMember.id(), r2);
LOGGER.debug("Local membership record r0: {}, but received r1: {},"
+ "spread with increased incarnation r2: {}",
r0,
r1,
r2);
spreadMembershipGossip(r2)
.subscribe(
null,
ex -> {// on-op});
return Mono.empty();}
// Update membership
if (r1.isDead()) {membershipTable.remove(r1.id());
} else {membershipTable.put(r1.id(), r1);
}
// Schedule/cancel suspicion timeout task
if (r1.isSuspect()) {scheduleSuspicionTimeoutTask(r1);
} else {cancelSuspicionTimeoutTask(r1.id());
}
// Emit membership and regardless of result spread gossip
return emitMembershipEvent(r0, r1)
.doFinally(
s -> {// Spread gossip (unless already gossiped)
if (reason != MembershipUpdateReason.MEMBERSHIP_GOSSIP
&& reason != MembershipUpdateReason.INITIAL_SYNC) {spreadMembershipGossip(r1)
.subscribe(
null,
ex -> {// no-op});
}
});
});
}
private Mono<Void> spreadMembershipGossip(MembershipRecord record) {
return Mono.defer(() -> {Message msg = Message.withData(record).qualifier(MEMBERSHIP_GOSSIP).build();
LOGGER.debug("Spead membreship: {} with gossip", msg);
return gossipProtocol
.spread(msg)
.doOnError(
ex ->
LOGGER.debug("Failed to spread membership: {} with gossip, cause: {}",
msg,
ex.toString()))
.then();});
}
private void scheduleSuspicionTimeoutTask(MembershipRecord record) {
long suspicionTimeout =
ClusterMath.suspicionTimeout(config.getSuspicionMult(), membershipTable.size(), config.getPingInterval());
suspicionTimeoutTasks.computeIfAbsent(record.id(),
id ->
scheduler.schedule(() -> onSuspicionTimeout(id), suspicionTimeout, TimeUnit.MILLISECONDS));
}
private void onSuspicionTimeout(String memberId) {suspicionTimeoutTasks.remove(memberId);
MembershipRecord record = membershipTable.get(memberId);
if (record != null) {LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", record);
MembershipRecord deadRecord =
new MembershipRecord(record.member(), DEAD, record.incarnation());
updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT)
.subscribe(null, this::onError);
}
}
private void cancelSuspicionTimeoutTask(String memberId) {Disposable future = suspicionTimeoutTasks.remove(memberId);
if (future != null && !future.isDisposed()) {future.dispose();
}
}
private Mono<Void> emitMembershipEvent(MembershipRecord r0, MembershipRecord r1) {
return Mono.defer(() -> {final Member member = r1.member();
if (r1.isDead()) {members.remove(member.id());
// removed
return Mono.fromRunnable(() -> {Map<String, String> metadata = metadataStore.removeMetadata(member);
sink.next(MembershipEvent.createRemoved(member, metadata));
});
}
if (r0 == null && r1.isAlive()) {members.put(member.id(), member);
// added
return metadataStore
.fetchMetadata(member)
.doOnSuccess(
metadata -> {metadataStore.updateMetadata(member, metadata);
sink.next(MembershipEvent.createAdded(member, metadata));
})
.onErrorResume(TimeoutException.class, e -> Mono.empty())
.then();}
if (r0 != null && r0.incarnation() < r1.incarnation()) {
// updated
return metadataStore
.fetchMetadata(member)
.doOnSuccess(
metadata1 -> {
Map<String, String> metadata0 =
metadataStore.updateMetadata(member, metadata1);
sink.next(MembershipEvent.createUpdated(member, metadata0, metadata1));
})
.onErrorResume(TimeoutException.class, e -> Mono.empty())
.then();}
return Mono.empty();});
}
//......
}
- updateMembership 会对比传入的 MembershipRecord 与本地的 localMember,如果是需要更新 localMember 则执行 spreadMembershipGossip,之后根据 MembershipRecord 的状态做不同处理,比如 isDead 则从 membershipTable 移除,比如 isSuspect 则执行 scheduleSuspicionTimeoutTask,否则执行 cancelSuspicionTimeoutTask,最后执行 emitMembershipEvent 及 spreadMembershipGossip
- scheduleSuspicionTimeoutTask 方法计算 suspicionTimeout 然后注册一个 SuspicionTimeout 的延时任务如果 suspicionTimeoutTasks 没有该 record.id()的 task 的话;onSuspicionTimeout 首先将该 task 从 suspicionTimeoutTasks 移除,然后使用 MembershipUpdateReason.SUSPICION_TIMEOUT 来 updateMembership;cancelSuspicionTimeoutTask 方法也是将该 task 从 suspicionTimeoutTasks 移除,并 dispose 该 future
- emitMembershipEvent 方法这里主要是更新 member 在 metadataStore 的 Metadata,如果是 isDead 则执行 metadataStore.removeMetadata(member),其他的则看情况执行 metadataStore.updateMetadata(member, metadata)
schedulePeriodicSync
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
public final class MembershipProtocolImpl implements MembershipProtocol {
//......
private void schedulePeriodicSync() {int syncInterval = config.getSyncInterval();
actionsDisposables.add(
scheduler.schedulePeriodically(this::doSync, syncInterval, syncInterval, TimeUnit.MILLISECONDS));
}
private void doSync() {Optional<Address> addressOptional = selectSyncAddress();
if (!addressOptional.isPresent()) {return;}
Address address = addressOptional.get();
Message message = prepareSyncDataMsg(SYNC, null);
LOGGER.debug("Send Sync: {} to {}", message, address);
transport
.send(address, message)
.subscribe(
null,
ex ->
LOGGER.debug("Failed to send Sync: {} to {}, cause: {}", message, address, ex.toString()));
}
private Optional<Address> selectSyncAddress() {
List<Address> addresses =
Stream.concat(seedMembers.stream(), otherMembers().stream().map(Member::address))
.collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new));
Collections.shuffle(addresses);
if (addresses.isEmpty()) {return Optional.empty();
} else {int i = ThreadLocalRandom.current().nextInt(addresses.size());
return Optional.of(addresses.get(i));
}
}
private Message prepareSyncDataMsg(String qualifier, String cid) {List<MembershipRecord> membershipRecords = new ArrayList<>(membershipTable.values());
SyncData syncData = new SyncData(membershipRecords, config.getSyncGroup());
return Message.withData(syncData)
.qualifier(qualifier)
.correlationId(cid)
.sender(localMember.address())
.build();}
//......
}
- schedulePeriodically 会注册 doSync 任务每隔 syncInterval 执行;doSync 方法首先调用 selectSyncAddress 随机选择一个 member 来作为发送 SYNC 的目标,之后通过 prepareSyncDataMsg 构造 sync 消息,然后通过 transport.send 发送
小结
- MembershipProtocol 接口定义了 start、stop、listen、members、otherMembers、member 方法;MembershipProtocolImpl 实现了 MembershipProtocol 接口;它定义了 MembershipUpdateReason 枚举(
FAILURE_DETECTOR_EVENT、MEMBERSHIP_GOSSIP、SYNC、INITIAL_SYNC、SUSPICION_TIMEOUT
) - MembershipProtocolImpl 的构造器监听了 transport.listen()触发 onMessage 方法;监听了 failureDetector.listen()触发 onFailureDetectorEvent 方法;监听了 gossipProtocol.listen()触发 onMembershipGossip 方法;MembershipProtocolImpl 的 start 方法在 seedMembers.isEmpty()的时候会执行 schedulePeriodicSync 方法即每隔 syncInterval 执行 doSync 方法;当 seedMembers 不为空时则遍历 seedMembers 通过 transport.requestResponse 发送 SYNC,执行成功则触发 onSyncAck;stop 方法则挨个销毁 suspicionTimeoutTasks 的 future
- onMessage 方法首先通过 checkSyncGroup 检查一下是不是该 syncGroup 的消息;之后根据 message.qualifier()是 SYNC 则执行 onSync,是 SYNC_ACK 则执行 onSyncAck;onSync 方法则执行 syncMembership,成功时向 sender 返回 SYNC_ACK 信息;onSyncAck 方法也是调用 syncMembership,只不过没有再向 sender 返回信息;syncMembership 方法则根据 syncData 的 membership 来挨个执行 updateMembership 方法
- onFailureDetectorEvent 方法根据 FailureDetectorEvent 判断该 MembershipRecord 的状态是否有变化,如果变为 ALIVE 则往 fdEvent.member().address()发送 SYNC 信息;否则使用 MembershipUpdateReason.FAILURE_DETECTOR_EVENT 来 updateMembership;onMembershipGossip 方法则针对 message.qualifier()为 MEMBERSHIP_GOSSIP 的消息使用 MembershipUpdateReason.MEMBERSHIP_GOSSIP 来 updateMembership
- updateMembership 会对比传入的 MembershipRecord 与本地的 localMember,如果是需要更新 localMember 则执行 spreadMembershipGossip,之后根据 MembershipRecord 的状态做不同处理,比如 isDead 则从 membershipTable 移除,比如 isSuspect 则执行 scheduleSuspicionTimeoutTask,否则执行 cancelSuspicionTimeoutTask,最后执行 emitMembershipEvent 及 spreadMembershipGossip
- schedulePeriodically 会注册 doSync 任务每隔 syncInterval 执行;doSync 方法首先调用 selectSyncAddress 随机选择一个 member 来作为发送 SYNC 的目标,之后通过 prepareSyncDataMsg 构造 sync 消息,然后通过 transport.send 发送
MembershipProtocolImpl 的 start 方法会注册 doSync 任务 (
每隔 syncInterval 执行
),该任务会发送 SYNC 消息给随机选择出来的 member,来 sync 全量的 membershipRecords;onMessage 方法接收到 SYNC 消息时执行 syncMembership 并在成功时返回 SYNC_ACK,接收到 SYNC_ACK 时也是执行 syncMembership;onFailureDetectorEvent 及 onMembershipGossip 方法都会触发 updateMembership 方法来更新 membershipTable 必要是进行 spreadMembershipGossip
doc
- MembershipProtocol