对于Leader选举,就是Election接口和其实现类
AuthFastLeaderElection,LeaderElection其在3.4.0之后的版本中曾经不倡议使⽤。
Election源码剖析
public interface Election {public Vote lookForLeader() throws InterruptedException;public void shutdown();}
阐明:
选举的⽗接⼝为Election,其定义了lookForLeader和shutdown两个⽅法,lookForLeader示意寻找Leader,shutdown则示意敞开,如敞开服务端之间的连贯。
看默认实现类FastLeaderElection:
FastLeaderElection实现了Election接⼝,重写了接⼝中定义的lookForLeader⽅法和shutdown
⽅法。
在源码剖析之前,咱们⾸先介绍⼏个概念:
内部投票:特指其余服务器发来的投票。
外部投票:服务器⾃身以后的投票。
选举轮次:ZooKeeper服务器Leader选举的轮次,即logical clock(逻辑时钟)。
PK:指对外部投票和内部投票进⾏⼀个对⽐来确定是否须要变更外部投票。选票治理
sendqueue:选票发送队列,⽤于保留待发送的选票。
recvqueue:选票接管队列,⽤于保留接管到的内部投票。
lookForLeader函数
当 ZooKeeper 服务器检测到以后服务器状态变成 LOOKING 时,就会触发 Leader选举,即调⽤
lookForLeader⽅法来进⾏Leader选举。
public Vote lookForLeader() throws InterruptedException {synchronized(this){// ⾸先会将逻辑时钟⾃增,每进⾏⼀轮新的leader选举,都须要更新逻辑时钟logicalclock++;// 更新选票(初始化选票)updateProposal(getInitId(), getInitLastLoggedZxid(),getPeerEpoch());}LOG.info("New election. My id = " + self.getId() +", proposed zxid=0x" + Long.toHexString(proposedZxid));// 向其余服务器发送⾃⼰的选票(已更新的选票)sendNotifications();
之后每台服务器会一直地从recvqueue队列中获取内部选票。如果服务器发现⽆法获取到任何内部投
票,就⽴即确认⾃⼰是否和集群中其余服务器放弃着无效的连贯,如果没有连贯,则⻢上建⽴连贯,如
果曾经建⽴了连贯,则再次发送⾃⼰以后的外部投票,其流程如下:
/*
* Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ //没有接管到选票,就查看是不是本人的选票都曾经投递了,没投递就阐明和服务器断开了连贯 if(manager.haveDelivered()){ //曾经投递了就再投一遍 sendNotifications(); } else { // 断开了就连贯 manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } // 收到了选票,则先校验投票的投票者和leader是否无效 else if (validVoter(n.sid) && validVoter(n.leader)) { /* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */ switch (n.state) { case LOOKING: // If notification > current, replace and send messages out //比拟内部投票的轮次和本身的轮次大小 if (n.electionEpoch > logicalclock.get()) { //内部大于外部,则将本身轮次设为内部投票 logicalclock.set(n.electionEpoch); //清空本身投票 recvset.clear(); //比拟投票 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) {
先看totalOrderPredicate办法:
/** * Check if a pair (server id, zxid) succeeds our * current vote. * * @param id Server identifier * @param zxid Last zxid observed by the issuer of this vote */ protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ // 先比拟轮次id,一样再比拟事务id,一样再比拟服务器id return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
再回到刚刚的分支:
} else if (n.electionEpoch < logicalclock.get()) { //内部投票小于外部间接抛弃 if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } //比拟投票,若内部投票更大 else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { //更新投票信息 updateProposal(n.leader, n.zxid, n.peerEpoch); //发送投票 sendNotifications(); }
判断完内部投票后:
//归档投票信息 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //判断是否有leader能够选出 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } }
查看termPredicate:
/** * Termination predicate. Given a set of votes, determines if have * sufficient to declare the end of the election round. * * @param votes * Set of votes * @param vote * Identifier of the vote received last */private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self .getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } /* * First make the views consistent. Sometimes peers will have different * zxids for a server depending on timing. */ //遍历投票汇合 for (Map.Entry<Long, Vote> entry : votes.entrySet()) { //找出所有等于此次投票的记录 if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); } } //判断是否能够找出leader,即是否过半 return voteSet.hasAllQuorums();}
至此,leader的选举就实现了。