共计 9039 个字符,预计需要花费 23 分钟才能阅读完成。
public Vote lookForLeader() throws InterruptedException {
try {self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {self.start_fle = Time.currentElapsedTime();
}
try {HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id =" + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){if(manager.haveDelivered()){sendNotifications();
} else {manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out:" + notTimeout);
}
else if(validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the
* voting view for a replica in the voting view.
*/
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();} else if (n.electionEpoch < logicalclock.get()) {if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer:" + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) {self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {synchronized(this){logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {if (!validVoter(n.leader)) {LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {if(self.jmxLeaderElectionBean != null){MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
选举过程:
proposedLeader(选中为 leader 的节点 id);
proposedZxid(选中为 leader 的 lastZxid);
proposedEpoch(选中为 leader 的以后年代);
1. 所有节点首先将选票投给本人
2. 调用 sendNotifications 办法, 将本人以后选票发送给其余节点
private void sendNotifications() {for (QuorumServer server : self.getVotingView().values()) {
long sid = server.id;
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch);
if(LOG.isDebugEnabled()){LOG.debug("Sending Notification:" + proposedLeader + "(n.leader), 0x" +
Long.toHexString(proposedZxid) + "(n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
"(n.round)," + sid + "(recipient)," + self.getId() +
"(myid), 0x" + Long.toHexString(proposedEpoch) + "(n.peerEpoch)");
}
sendqueue.offer(notmsg);
}
}
这里能够看出其实就是将以后选票封装成 ToSend 构造体, 而后放到 sendqueue 这个阻塞队列里, 期待 Sender 线程 (前面讲音讯流转时解说) 从队列中拉取要发送的音讯, 而后发送给其余节点.
3. 而后所有节点进入到 while 循环中, 直到选举实现, 也就是 PeerState!=Looking, 咱们从 while 循环中能够看出首先是从 recvqueue 这个阻塞队列中取拉取其余节点发个过去的投票信息,recvqueue 队列中存储的构造体为 Notification:
属性 | 形容 |
---|---|
version | 单元 2 |
leader | 选举的 leaderId |
zxid | 选举的 leader 的 zxid |
electionEpoch | 以后选举轮次 |
state | 发送者的节点状态 |
sid | 发送者 id |
peerEpoch | 选举的 leader 节点 epoch |
此时须要对 Notification 内容有效性做一些校验, 并做一些解决:
1)Notification 为空, 呈现这种状况须要判断其余节点的发送队列中的音讯是否某一个曾经全副发送进来, 如果存在这种状况, 则须要将本身的选票再次发送给所有其余节点。
2)判断发送者的 sid 和选举 leader 的 sid 是否在无效视图中, 如果是有效的 sid, 则间接抛弃这张选票。
以后选票确定是无效时, 须要对发送的节点状态进行一些独自的解决:
①发送者节点状态也为 Looking 时:
1)当 electionEpoch>logicallock(以后选票的选举轮次大于以后节点的选举轮次):
则以后节点清空 recvset(投票箱)中的所有选票, 并判断以后选票是否优于本身, 如果优于本身, 则更新本身选票为以后选票的投票内容, 否则就投票给本身.
如何判断选票优于以后选票:
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id:" + newId + ", proposed id:" + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){return false;}
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
- 首先时判断新选票的选举的 leader 的 epoch 是否大于以后节点选票的 epoch (epoch 越大阐明经验的选举次数越大, 状态更新)
- 如果 epoch 雷同, 则判断新选票的选举的 leader 的 zxid 是否大于以后节点选票的 zxid(zxid 越大, 阐明节点的事务数据存储越多)
- 如果 zxid 雷同, 则判断以后新选票选举的 leader 的 sid 是否大于以后节点选票的 sid(sid 判断策略则是作为兜底策略)
2)当 electionEpoch<logicallock(以后选票的选举轮次小于以后节点的选举轮次)
呈现这种状况, 阐明以后选票曾经不属于此次选举轮次中的选票, 则间接疏忽掉此选票
3) 当 electionEpoch=logicallock(以后选票的选举轮次等于以后节点的选举轮次)
此时阐明, 新选票和本身选票处于一个投票轮次中, 则直接判断新选票是否优于以后选票, 如果优于以后选票, 则更新以后选票内容为新选票内容, 并将本身新选举内容发送给其余节点.
最初将新选票放入到 recvset 投票箱中, 并判断投票箱中的投票是否有超过一半曾经和本身的选票内容统一, 如果未超过一半则再次从新进行下面选举流程, 如果曾经达到一半, 则进行最初的判断, 把 recvqueue 中的投票信息全副取出来进行判断, 判断是否还存在优与以后本身选票的投票音讯, 如果有的话, 则将以后选票从新放入 recvqueue 中, 从新进行选举流程, 没有的话则间接完结选举.
// 判断投票箱中, 是否曾经超过一半节点的投票内容和以后节点投票内容统一
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
// 最初判断其余节点发送过去的投票信息是否还存在优于以后节点投票信息的音讯
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
// 阐明队列音讯为空或队列中的投票音讯曾经不会对以后节点的投票内容产生扭转
if (n == null) {
// 变更以后节点状态, 如果选举的 leader 时本身, 则状态变更未 Leading, 否则未 Folloing 或 Observing
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
// 保留最终投票内容
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
②发送者节点状态也为 Observing 时:
Observer 角色不参加选举过程, 则来自 Observer 的选票间接疏忽掉.
③发送者节点状态也为 Following 和 Leading 时:
投票音讯流转过程:
在 FastLeaderElection 的构造方法中, 会结构进去一个 Messenger 实例, 该实例中会新开启 2 个线程, 并始终轮询从 FastLeaderElection#sendQuene 中和 QuorumCnxManager#recvQueue 中拉取承受到的音讯和须要发送的音讯.
发送端:
FastLeaderElection 发送选票 (ToSend)
-> 存储在 FastLeaderElection 的 sendQueue 队列中
->FastLeaderElection.Messenger.WorkerSender#run 轮询从 sendQueue 拉取音讯并转换成 ByteBuffer
->QuorumCnxManager#toSend 办法
1) 如果要发送的 sid 为以后节点, 则间接放入 QuorumCnxManager#recvQueue 承受队列中
2) 放入 QuorumCnxManager#queueSendMap()队列中,ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>
->QuorumCnxManager.SendWorker#run 办法从 queueSendMap 中拉取要发送的音讯, 通过 Socket 发送给其余节点, 上一节, 讲诉了和其余每个节点都保护了一个 SendWorker 线程
承受端:
->QuorumCnxManager.RecvWorker#run 办法通过 Socket 阻塞读取其余节点发送的音讯, 转换成 Message 音讯体, 并放入 QuorumCnxManager#recvQueue 阻塞队列中, 上一节, 讲诉了和其余每个节点都保护了一个 RecvWorker 线程
->FastLeaderElection.Messenger.WorkerReceiver 轮询通过 QuorumCnxManager#pollRecvQueue 办法从 QuorumCnxManager#recvQueue 队列中拉取音讯并转换成 Notification, 并 Notification 放入 FastLeaderElection#recvqueue 队列中.
->FastLeaderElection#lookForLeader 办法从 FastLeaderElection#recvqueue 拉取选票信息并解决.