乐趣区

关于java:zookeeper选举源码分析

对于 Leader 选举,就是 Election 接口和其实现类

AuthFastLeaderElection,LeaderElection 其在 3.4.0 之后的版本中曾经不倡议使⽤。
Election 源码剖析

public interface Election {public Vote lookForLeader() throws InterruptedException;
public void shutdown();}

阐明:
选举的⽗接⼝为 Election,其定义了 lookForLeader 和 shutdown 两个⽅法,lookForLeader 示意寻找 Leader,shutdown 则示意敞开,如敞开服务端之间的连贯。
看默认实现类 FastLeaderElection:
FastLeaderElection 实现了 Election 接⼝,重写了接⼝中定义的 lookForLeader ⽅法和 shutdown
⽅法。
在源码剖析之前,咱们⾸先介绍⼏个概念:
内部投票:特指其余服务器发来的投票。
外部投票:服务器⾃身以后的投票。
选举轮次:ZooKeeper 服务器 Leader 选举的轮次,即 logical clock(逻辑时钟)。
PK:指对外部投票和内部投票进⾏⼀个对⽐来确定是否须要变更外部投票。选票治理
sendqueue:选票发送队列,⽤于保留待发送的选票。
recvqueue:选票接管队列,⽤于保留接管到的内部投票。

lookForLeader 函数
当 ZooKeeper 服务器检测到以后服务器状态变成 LOOKING 时,就会触发 Leader 选举,即调⽤
lookForLeader ⽅法来进⾏ Leader 选举。

public Vote lookForLeader() throws InterruptedException {synchronized(this){
// ⾸先会将逻辑时钟⾃增,每进⾏⼀轮新的 leader 选举,都须要更新逻辑时钟
logicalclock++;
// 更新选票(初始化选票)updateProposal(getInitId(), getInitLastLoggedZxid(),
getPeerEpoch());
}
LOG.info("New election. My id =" + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 向其余服务器发送⾃⼰的选票(已更新的选票)sendNotifications();

之后每台服务器会一直地从 recvqueue 队列中获取内部选票。如果服务器发现⽆法获取到任何内部投
票,就⽴即确认⾃⼰是否和集群中其余服务器放弃着无效的连贯,如果没有连贯,则⻢上建⽴连贯,如
果曾经建⽴了连贯,则再次发送⾃⼰以后的外部投票,其流程如下:
/*

             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            if(n == null){
                // 没有接管到选票,就查看是不是本人的选票都曾经投递了,没投递就阐明和服务器断开了连贯
                if(manager.haveDelivered()){
                    // 曾经投递了就再投一遍
                    sendNotifications();} else {
                    // 断开了就连贯
                    manager.connectAll();}

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out:" + notTimeout);
            }
            // 收到了选票,则先校验投票的投票者和 leader 是否无效
            else if (validVoter(n.sid) && validVoter(n.leader)) {
                /*
                 * Only proceed if the vote comes from a replica in the current or next
                 * voting view for a replica in the current or next voting view.
                 */
                switch (n.state) {
                case LOOKING:
                    // If notification > current, replace and send messages out
                    // 比拟内部投票的轮次和本身的轮次大小
                    if (n.electionEpoch > logicalclock.get()) {
                        // 内部大于外部,则将本身轮次设为内部投票
                        logicalclock.set(n.electionEpoch);
                        // 清空本身投票
                        recvset.clear();
                        // 比拟投票
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        sendNotifications();} else if (n.electionEpoch < logicalclock.get()) {

先看 totalOrderPredicate 办法:

   /**
     * Check if a pair (server id, zxid) succeeds our
     * current vote.
     *
     * @param id    Server identifier
     * @param zxid  Last zxid observed by the issuer of this vote
     */
    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id:" + newId + ", proposed id:" + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){return false;}

        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */
        // 先比拟轮次 id,一样再比拟事务 id,一样再比拟服务器 id
        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

再回到刚刚的分支:

    } else if (n.electionEpoch < logicalclock.get()) {
                            // 内部投票小于外部间接抛弃
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            break;
                        }   // 比拟投票,若内部投票更大
                        else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            // 更新投票信息
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            // 发送投票
                            sendNotifications();}

判断完内部投票后:

   // 归档投票信息
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        // 判断是否有 leader 能够选出
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

查看 termPredicate:

/**
 * Termination predicate. Given a set of votes, determines if have
 * sufficient to declare the end of the election round.
 * 
 * @param votes
 *            Set of votes
 * @param vote
 *            Identifier of the vote received last
 */
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier() != null
            && self.getLastSeenQuorumVerifier().getVersion() > self
                    .getQuorumVerifier().getVersion()) {voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    /*
     * First make the views consistent. Sometimes peers will have different
     * zxids for a server depending on timing.
     */
    // 遍历投票汇合
    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
        // 找出所有等于此次投票的记录
        if (vote.equals(entry.getValue())) {voteSet.addAck(entry.getKey());
        }
    }

     // 判断是否能够找出 leader,即是否过半
    return voteSet.hasAllQuorums();}

至此,leader 的选举就实现了。

退出移动版