序
本文主要研究一下 scalecube-cluster 的 GossipProtocol
GossipProtocol
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocol.java
/**
* Gossip Protocol component responsible for spreading information (gossips) over the cluster
* members using infection-style dissemination algorithms. It provides reliable cross-cluster
* broadcast.
*/
public interface GossipProtocol {
/** Starts running gossip protocol. After started it begins to receive and send gossip messages */
void start();
/** Stops running gossip protocol and releases occupied resources. */
void stop();
/**
* Spreads given message between cluster members.
*
* @return future result with gossip id once gossip fully spread.
*/
Mono<String> spread(Message message);
/** Listens for gossips from other cluster members. */
Flux<Message> listen();}
- GossipProtocol 接口定义了 start、stop、spread、listen 方法
GossipProtocolImpl
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
public final class GossipProtocolImpl implements GossipProtocol {private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class);
// Qualifiers
public static final String GOSSIP_REQ = "sc/gossip/req";
// Injected
private final Member localMember;
private final Transport transport;
private final GossipConfig config;
// Local State
private long currentPeriod = 0;
private long gossipCounter = 0;
private Map<String, GossipState> gossips = new HashMap<>();
private Map<String, MonoSink<String>> futures = new HashMap<>();
private List<Member> remoteMembers = new ArrayList<>();
private int remoteMembersIndex = -1;
// Disposables
private final Disposable.Composite actionsDisposables = Disposables.composite();
// Subject
private final FluxProcessor<Message, Message> subject =
DirectProcessor.<Message>create().serialize();
private final FluxSink<Message> sink = subject.sink();
// Scheduled
private final Scheduler scheduler;
/**
* Creates new instance of gossip protocol with given memberId, transport and settings.
*
* @param localMember local cluster member
* @param transport cluster transport
* @param membershipProcessor membership event processor
* @param config gossip protocol settings
* @param scheduler scheduler
*/
public GossipProtocolImpl(
Member localMember,
Transport transport,
Flux<MembershipEvent> membershipProcessor,
GossipConfig config,
Scheduler scheduler) {this.transport = Objects.requireNonNull(transport);
this.config = Objects.requireNonNull(config);
this.localMember = Objects.requireNonNull(localMember);
this.scheduler = Objects.requireNonNull(scheduler);
// Subscribe
actionsDisposables.addAll(
Arrays.asList(
membershipProcessor //
.publishOn(scheduler)
.subscribe(this::onMemberEvent, this::onError),
transport
.listen()
.publishOn(scheduler)
.filter(this::isGossipReq)
.subscribe(this::onGossipReq, this::onError)));
}
@Override
public void start() {
actionsDisposables.add(
scheduler.schedulePeriodically(
this::doSpreadGossip,
config.getGossipInterval(),
config.getGossipInterval(),
TimeUnit.MILLISECONDS));
}
@Override
public void stop() {
// Stop accepting gossip requests and spreading gossips
actionsDisposables.dispose();
// Stop publishing events
sink.complete();}
@Override
public Mono<String> spread(Message message) {return Mono.fromCallable(() -> message)
.subscribeOn(scheduler)
.flatMap(msg -> Mono.create(sink -> futures.put(createAndPutGossip(msg), sink)));
}
@Override
public Flux<Message> listen() {return subject.onBackpressureBuffer();
}
private void onMemberEvent(MembershipEvent event) {Member member = event.member();
if (event.isRemoved()) {remoteMembers.remove(member);
}
if (event.isAdded()) {remoteMembers.add(member);
}
}
private void onGossipReq(Message message) {
long period = this.currentPeriod;
GossipRequest gossipRequest = message.data();
for (Gossip gossip : gossipRequest.gossips()) {GossipState gossipState = gossips.get(gossip.gossipId());
if (gossipState == null) { // new gossip
gossipState = new GossipState(gossip, period);
gossips.put(gossip.gossipId(), gossipState);
sink.next(gossip.message());
}
gossipState.addToInfected(gossipRequest.from());
}
}
private boolean isGossipReq(Message message) {return GOSSIP_REQ.equals(message.qualifier());
}
private String createAndPutGossip(Message message) {
long period = this.currentPeriod;
Gossip gossip = new Gossip(generateGossipId(), message);
GossipState gossipState = new GossipState(gossip, period);
gossips.put(gossip.gossipId(), gossipState);
return gossip.gossipId();}
//......
}
- GossipProtocolImpl 实现了 GossipProtocol 接口,它维护了名为 gossips 的 gossipId 与 GossipState 的 map,以及 remoteMembers 列表
- 它的构造器订阅了 membershipProcessor,触发 onMemberEvent 方法,该方法根据 MembershipEvent 来对 remoteMembers 进行添加或移除 member;订阅了 transport.listen(),过滤出 GossipReq,触发 onGossipReq 方法,该方法合并 GossipRequest 的 gossips 到本地的 gossips,对于新的 gossip 的 message 则发送到 sink,并维护该 gossip 的 gossipState,将请求的 memberId 添加到 infected 中;spread 方法则将 message 放入到本地的 gossips 中
- start 方法每隔 gossipInterval 执行 doSpreadGossip 方法;spread 方法则通过 createAndPutGossip 创建 Gossip 并放入 gossips 中
doSpreadGossip
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
public final class GossipProtocolImpl implements GossipProtocol {
//......
private List<Member> remoteMembers = new ArrayList<>();
private int remoteMembersIndex = -1;
private void doSpreadGossip() {
// Increment period
long period = currentPeriod++;
// Check any gossips exists
if (gossips.isEmpty()) {return; // nothing to spread}
try {// Spread gossips to randomly selected member(s)
selectGossipMembers().forEach(member -> spreadGossipsTo(period, member));
// Sweep gossips
sweepGossips(period);
} catch (Exception ex) {LOGGER.warn("Exception at doSpreadGossip[{}]: {}", period, ex.getMessage(), ex);
}
}
private void spreadGossipsTo(long period, Member member) {
// Select gossips to send
List<Gossip> gossips = selectGossipsToSend(period, member);
if (gossips.isEmpty()) {return; // nothing to spread}
// Send gossip request
Address address = member.address();
gossips
.stream()
.map(this::buildGossipRequestMessage)
.forEach(
message ->
transport
.send(address, message)
.subscribe(
null,
ex ->
LOGGER.debug("Failed to send GossipReq[{}]: {} to {}, cause: {}",
period,
message,
address,
ex.toString())));
}
private List<Gossip> selectGossipsToSend(long period, Member member) {
int periodsToSpread =
ClusterMath.gossipPeriodsToSpread(config.getGossipRepeatMult(), remoteMembers.size() + 1);
return gossips
.values()
.stream()
.filter(gossipState -> gossipState.infectionPeriod() + periodsToSpread >= period) // max rounds
.filter(gossipState -> !gossipState.isInfected(member.id())) // already infected
.map(GossipState::gossip)
.collect(Collectors.toList());
}
private List<Member> selectGossipMembers() {int gossipFanout = config.getGossipFanout();
if (remoteMembers.size() < gossipFanout) { // select all
return remoteMembers;
} else { // select random members
// Shuffle members initially and once reached top bound
if (remoteMembersIndex < 0 || remoteMembersIndex + gossipFanout > remoteMembers.size()) {Collections.shuffle(remoteMembers);
remoteMembersIndex = 0;
}
// Select members
List<Member> selectedMembers =
gossipFanout == 1
? Collections.singletonList(remoteMembers.get(remoteMembersIndex))
: remoteMembers.subList(remoteMembersIndex, remoteMembersIndex + gossipFanout);
// Increment index and return result
remoteMembersIndex += gossipFanout;
return selectedMembers;
}
}
private Message buildGossipRequestMessage(Gossip gossip) {GossipRequest gossipRequest = new GossipRequest(gossip, localMember.id());
return Message.withData(gossipRequest)
.qualifier(GOSSIP_REQ)
.sender(localMember.address())
.build();}
private void sweepGossips(long period) {
// Select gossips to sweep
int periodsToSweep =
ClusterMath.gossipPeriodsToSweep(config.getGossipRepeatMult(), remoteMembers.size() + 1);
Set<GossipState> gossipsToRemove =
gossips
.values()
.stream()
.filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep)
.collect(Collectors.toSet());
// Check if anything selected
if (gossipsToRemove.isEmpty()) {return; // nothing to sweep}
// Sweep gossips
LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove);
for (GossipState gossipState : gossipsToRemove) {gossips.remove(gossipState.gossip().gossipId());
MonoSink<String> sink = futures.remove(gossipState.gossip().gossipId());
if (sink != null) {sink.success(gossipState.gossip().gossipId());
}
}
}
//......
}
- doSpreadGossip 方法首先递增 currentPeriod,然后执行 selectGossipMembers,遍历该 member 执行 spreadGossipsTo,最后执行 sweepGossips
- selectGossipMembers 方法会根据 gossipFanout 配置随机选择 gossipFanout 个 member,这里维护了 remoteMembersIndex,具体是对 remoteMembers 进行 subList,当 remoteMembersIndex 小于 0 或 remoteMembersIndex + gossipFanout > remoteMembers.size() 时会 Collections.shuffle(remoteMembers) 并重置 remoteMembersIndex 为 0,之后对 remoteMembersIndex 加上 gossipFanout
- spreadGossipsTo 方法首先执行 selectGossipsToSend 获取要发送的 gossips,然后通过 buildGossipRequestMessage 构造 GOSSIP_REQ 消息,最后通过 transport.send 方法发送
- sweepGossips 方法则选取 periodsToSweep,然后从 gossips 移除 period > gossipState.infectionPeriod() + periodsToSweep 的 gossipState
小结
- GossipProtocol 接口定义了 start、stop、spread、listen 方法;GossipProtocolImpl 实现了 GossipProtocol 接口,它维护了名为 gossips 的 gossipId 与 GossipState 的 map,以及 remoteMembers 列表
- GossipProtocolImpl 的构造器订阅了 membershipProcessor,触发 onMemberEvent 方法,该方法根据 MembershipEvent 来对 remoteMembers 进行添加或移除 member;订阅了 transport.listen(),过滤出 GossipReq,触发 onGossipReq 方法,该方法合并 GossipRequest 的 gossips 到本地的 gossips,对于新的 gossip 的 message 则发送到 sink,并维护该 gossip 的 gossipState,将请求的 memberId 添加到 infected 中;spread 方法则将 message 放入到本地的 gossips 中
- GossipProtocolImpl 的 start 方法每隔 gossipInterval 执行 doSpreadGossip 方法;spread 方法则通过 createAndPutGossip 创建 Gossip 并放入 gossips 中;doSpreadGossip 方法首先递增 currentPeriod,然后执行 selectGossipMembers,遍历该 member 执行 spreadGossipsTo,最后执行 sweepGossips
这里 GossipProtocolImpl 注册了 onMemberEvent 及 onGossipReq,其中 onMemberEvent 用于监听 MembershipEvent,并根据该 event 来维护 remoteMembers 列表;onGossipReq 则是监听其他 member 的 doSpreadGossip 方法发送过来的 GossipReq 消息,合并该消息的 gossips 到本地的 gossips;而 doSpreadGossip 方法则是每隔 gossipInterval 执行,根据 gossipFanout 配置随机选择 gossipFanout 个 member,然后针对每个 member 选择要发送的 gossips 进行 spread(
onGossipReq 及 spread 方法会更改 gossips,而每隔 gossipInterval 触发的 doSpreadGossip 则从 gossips 选择待 spread 的消息进行发送
)
doc
- Gossip
- GossipProtocol
- GossipProtocolImpl