乐趣区

关于zookeeper:无微不至之Zookeeper源码深度讲解2核心流程梳理

一. 源码仓库:

zookeeper
基于分支 3.4.14 分支在 windows 系统启动流程进行剖析。

二. 流程剖析:

  1. 源码入口
    通过 zkServer.cmd 可执行文件内容能够看出 zookeeper 的服务端是通过 org.apache.zookeeper.server.quorum.QuorumPeerMain 这个类的 main 作为入口来启动服务端程序的.main 办法传入的是咱们 zoo.cfg 文件的地址, 而后通过解析 zoo.cfg 文件, 将 key,value 的配置信息转换成 QuorumPeerConfig 的对象, 转换细节能够看 QuorumPeerConfig.parse 办法, 其中转换后的外围配置参数有:
参数名 参数形容
dataLogDir 事务日志存储门路
dataDir 快照存储门路
electionType 选举算法, 目前只反对 3 - 疾速选举算法
myid 以后服务 id
tickTime 工夫单位
initLimit
syncLimit 事务存储门路
minSessionTimeout 最小会话超时工夫
maxSessionTimeout 最大会话超时工夫
peerType 角色类型 -OBSERVER,PARTICIPANT
clientPort 客户端连贯端口
clientPortAddress 客户端连贯 Host
snapRetainCount 快照保留个数, 最小为 3
purgeInterval 快照革除距离
server.sid hostName:port(通信端口):electionPort(选举端口):peerType
maxClientCnxns 最大客户端连接数

拿到解析后的参数后, 能够通过是否配置了 server.id 参数来决定是否集群启动还是单机启动, 单机启动运行通过 ZooKeeperServerMain#main 办法启动, 集群启动则还是在 QuorumPeerMain#runFromConfig 办法进行解决的, 这里咱们就间接解说集群模式, 因为集群模式比单机模式多了集群间的通信相干的解决, 如 Leader 选举, 数据同步, 申请转发等.

    public void runFromConfig(QuorumPeerConfig config) throws IOException {
      try {ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);
      }
  
      LOG.info("Starting quorum peer");
      try {ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                                config.getMaxClientCnxns());

          quorumPeer = getQuorumPeer();

          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(config.getDataLogDir()),
                  new File(config.getDataDir())));
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());

          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }

          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();

          quorumPeer.start();
          quorumPeer.join();} catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

能够从代码片段中能够看出, 新创建出了一个 QuorumPeer 对象, 其实这就是 OOP 思维, 以后实例代表着集群的一个节点, 而后将 QuorumPeerConfig 从新设置给 QuorumPeer 对象, 在这里呈现几个新的类:

类名 类形容
FileTxnSnapLog 长久化外围类别, 包含快照, 事务日志操作
ServerCnxnFactory 3 服务端网络解决外围类, 其实现蕴含 NIO 和 Netty 两种实现
ZKDatabase 内存操作外围类, 通过树结构存储

在设置了参数之后, 接下来调用了 QuorumPeer#initialize 办法, 在这个办法里次要是一些鉴权类的对象实例化。外围还是 QuorumPeer#start 办法:

        loadDataBase();// 将数据从快照和事务日志加载到内存中
        cnxnFactory.start();        // 网络服务启动
        startLeaderElection(); // 选举工作筹备
        super.start(); 

loadDataBase:
在这个办法里次要是通过委托给 ZKDatabase#loadDataBase 进行加载工作的

    public long loadDataBase() throws IOException {long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
        initialized = true;
        return zxid;
    }
    public long restore(DataTree dt, Map<Long, Integer> sessions, 
            PlayBackListener listener) throws IOException {snapLog.deserialize(dt, sessions); // 数据反序列化
        return fastForwardFromEdits(dt, sessions, listener);
    }
 public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
        // 找到无效的 100 个快照文件, 降序
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {return -1L;}
        File snap = null;
        boolean foundValid = false;
        for (int i = 0; i < snapList.size(); i++) {snap = snapList.get(i);
            InputStream snapIS = null;
            CheckedInputStream crcIn = null;
            try {LOG.info("Reading snapshot" + snap);
                snapIS = new BufferedInputStream(new FileInputStream(snap));
                crcIn = new CheckedInputStream(snapIS, new Adler32());
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                // 真正序列化的中央
                deserialize(dt,sessions, ia);
                long checkSum = crcIn.getChecksum().getValue();
                long val = ia.readLong("val");
                // 校验快照文件的完整性
                if (val != checkSum) {throw new IOException("CRC corruption in snapshot :" + snap);
                }
                foundValid = true;
                break;
            } catch(IOException e) {LOG.warn("problem reading snap file" + snap, e);
            } finally {if (snapIS != null) 
                    snapIS.close();
                if (crcIn != null) 
                    crcIn.close();} 
        }
        if (!foundValid) {throw new IOException("Not able to find valid snapshots in" + snapDir);
        }
        // 快照文件命名为 snapshot.lastZxid
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        return dt.lastProcessedZxid;
    }

在 ZkDataBase 里有一下几个外围属性:

表列 A 表列 B
DataTree dataTree 存储树结构
FileTxnSnapLog snapLog 事务快照长久化类别
,ConcurrentHashMap<Long, Integer> sessionsWithTimeouts 会话治理,sessionId

在 loadDataBase 办法中, 能够看到调用的 snapLog#restore 办法, 进入到 restore 办法中能够看到调用到的是 FileTxnSnapLog#deserialize 进行反序化, 而后保留到传入的 dt,sessions 参数中, 能够定位到 FileTxnSnapLog#deserialize(DataTree dt, Map<Long, Integer> sessions,

        InputArchive ia)的这个重载办法来看下, 如何对快照文件进行反序列化的:
    public void deserialize(DataTree dt, Map<Long, Integer> sessions,
            InputArchive ia) throws IOException {FileHeader header = new FileHeader();
        header.deserialize(ia, "fileheader");
        if (header.getMagic() != SNAP_MAGIC) {
            throw new IOException("mismatching magic headers"
                    + header.getMagic() + 
                    "!=" + FileSnap.SNAP_MAGIC);
        }
        

首先通过文件输出流的包装类 InputArchive 进行读取, 调用的是 FileHeader#deserialize 办法:

  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {a_.startRecord(tag);
    magic=a_.readInt("magic");
    version=a_.readInt("version");
    dbid=a_.readLong("dbid");
    a_.endRecord(tag);
}

FileHeader 实现 Record 接口, 其实前面所有须要的序列化和反序列化的都实现了这个接口, 通过传进来的输出流对象来自定义本人的序列化和反序列化细节.
在这里能够看到 FileHeader 的存储构造为:

属性值 占用大小 形容
magic 4 字节 魔法数字
version 4 字节 版本号
version 8 字节 数据库 id

通过 FileHedare#deserialize 办法后, 曾经从文件流读取了 16 个字节, 接下来调用的是 SerializeUtils#deserializeSnapshot(dt,ia,sessions)进行其余内容的加载,

    public static void deserializeSnapshot(DataTree dt,InputArchive ia,
            Map<Long, Integer> sessions) throws IOException {
        // 会话数量
        int count = ia.readInt("count");
        while (count > 0) {
            // 会话 id
            long id = ia.readLong("id");
            // 会话超时工夫
            int to = ia.readInt("timeout");
            sessions.put(id, to);
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                        "loadData --- session in archive:" + id
                        + "with timeout:" + to);
            }
            count--;
        }
        dt.deserialize(ia, "tree");
    }

能够看到首先是从流外面读取了 4 个字节的 count 属性, 也就是会话数量, 接着再遍历读取了 8 个字节 sessionId(会话 id)和 4 个字节的 timeout(会话超时工夫), 再赋值个给了 sessions(也就是 ZkDataBase 的 sessionsWithTimeouts 属性), 最初调用的是 DataTree#deserialize 进行真正存储内容的反序列化工作:

    public void deserialize(InputArchive ia, String tag) throws IOException {aclCache.deserialize(ia);
        nodes.clear();
        pTrie.clear();
        String path = ia.readString("path");
        while (!path.equals("/")) {DataNode node = new DataNode();
            ia.readRecord(node, "node");
            nodes.put(path, node);
            synchronized (node) {aclCache.addUsage(node.acl);
            }
            int lastSlash = path.lastIndexOf('/');
            if (lastSlash == -1) {root = node;} else {String parentPath = path.substring(0, lastSlash);
                node.parent = nodes.get(parentPath);
                if (node.parent == null) {
                    throw new IOException("Invalid Datatree, unable to find" +
                            "parent" + parentPath + "of path" + path);
                }
                node.parent.addChild(path.substring(lastSlash + 1));
                long eowner = node.stat.getEphemeralOwner();
                if (eowner != 0) {HashSet<String> list = ephemerals.get(eowner);
                    if (list == null) {list = new HashSet<String>();
                        ephemerals.put(eowner, list);
                    }
                    list.add(path);
                }
            }
            path = ia.readString("path");
        }
        nodes.put("/", root);

        setupQuota();

        aclCache.purgeUnused();}

  1. 网络传输(NIO)
    zookeeper 与客户端建设连贯与申请与响应的数据传输都是通过 ServerCnxnFactory 这个类的实现类进行解决的, 咱们这里间接通过 NIO 的实现类 NIOServerCnxnFactory 来进行解说, 再 QuorumPeer 的 start 办法里咱们看到调用 NIOServerCnxnFactory#start 办法.
    public void start() {
        // ensure thread is started once and only once
        if (thread.getState() == Thread.State.NEW) {thread.start();
        }
    }

再 start 办法里咱们看到就简略调用了 Thread#start 办法启动线程. 至于 thread 办法是在哪里进行初始化的, 我能够定位到 NIOServerCnxnFactory#configure 办法里:

    public void configure(InetSocketAddress addr, int maxcc) throws IOException {configureSaslLogin();
        // 初始化线程对象
        thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
        thread.setDaemon(true);
        // 设置最大连接数参数
        maxClientCnxns = maxcc;
        // 初始化 Socket 相干配置
        this.ss = ServerSocketChannel.open();
        ss.socket().setReuseAddress(true);
        LOG.info("binding to port" + addr);
        ss.socket().bind(addr);
        ss.configureBlocking(false);
        ss.register(selector, SelectionKey.OP_ACCEPT);
    }
  1. 选举
    在进启动了网络传输服务之后, 就开始筹备着选举前的一些筹备工作, 咱们能够从 QuorumPeer#start 办法中的 QuorumPeer#startLeaderElection()调用进行一个选举的切入点:

     synchronized public void startLeaderElection() {
         try {
    // 设置初始化投票
             currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
         } catch(IOException e) {RuntimeException re = new RuntimeException(e.getMessage());
             re.setStackTrace(e.getStackTrace());
             throw re;
         }
         for (QuorumServer p : getView().values()) {if (p.id == myid) {
                 myQuorumAddr = p.addr;
                 break;
             }
         }
         if (myQuorumAddr == null) {throw new RuntimeException("My id" + myid + "not in the peer list");
         }
         if (electionType == 0) {
             try {udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                 // 启动响应线程
                 responder = new ResponderThread();
                 responder.start();} catch (SocketException e) {throw new RuntimeException(e);
             }
         }
         // 依据配置的选举算法进行一些初始化工作
         this.electionAlg = createElectionAlgorithm(electionType);
     }

    从 startLeaderElection 这个办法中能够看出, 次要是将初始化投票设置为本身,sid 为本身 serverId,zxid 为通过快照和事务日志加载后的最大 lastZxid, 还有 peerEpoch(选举年代)也就是以后本身的选举年代, 而后就是启动了 ReponseThread 这个响应线程, 外围逻辑还是在 createElectionAlgorithm 这个办法中, 咱们能够跟进去看一下具体的代码逻辑:

   protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
                
        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
// 已过期
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
// 已过期
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
// 创立连贯管理器
            qcm = createCnxnManager();
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                // 启动监听其余节点的连贯申请
                listener.start();
// 实例化疾速选举算法外围类
                le = new FastLeaderElection(this, qcm);
            } else {LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }

从上述代码中, 能够看出次要工作是实例化了一个 QuorumCnxManager 这个对象, 也就是通过这个对象中的 Listener 这个类来解决和其余节点的连贯申请, 调用了 Listener#start 办法理论是运行到了 Listener#run 办法代码中:

        public void run() {
            int numRetries = 0;
            InetSocketAddress addr;
            while((!shutdown) && (numRetries < 3)){
                try {
                    // 实例化 ServerSocket
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                    if (listenOnAllIPs) {int port = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.getPort();
                        addr = new InetSocketAddress(port);
                    } else {addr = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr;
                    }
                    LOG.info("My election bind port:" + addr.toString());
                    setName(view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.toString());
                    ss.bind(addr);
                    while (!shutdown) {
                        // 阻塞期待其余节点申请连贯
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request"
                                + client.getRemoteSocketAddress());

                        if (quorumSaslAuthEnabled) {receiveConnectionAsync(client);
                        } else {
                            // 承受申请外围逻辑
                            receiveConnection(client);
                        }

                        numRetries = 0;
                    }
                } catch (IOException e) {LOG.error("Exception while listening", e);
                    numRetries++;
                    try {ss.close();
                        Thread.sleep(1000);
                    } catch (IOException ie) {LOG.error("Error closing server socket", ie);
                    } catch (InterruptedException ie) {
                        LOG.error("Interrupted while sleeping." +
                                  "Ignoring exception", ie);
                    }
                }
            }
            LOG.info("Leaving listener");
            if (!shutdown) {
                LOG.error("As I'm leaving the listener thread, "+"I won't be able to participate in leader"
                        + "election any longer:"
                        + view.get(QuorumCnxManager.this.mySid).electionAddr);
            }
        }

该办法次要是应用 jdk 的阻塞 io 与其余节点建设连贯, 不理解的能够去自行补充一下 jdk 的 socket 编程基础知识, 在第二个 while 循环中的 ss.accept()代码是会始终阻塞期待其余节点申请连贯, 当其余节点建设连贯后, 就会返回一个 Socket 实例, 而后将 Socket 实例传入 receiveConnection 办法中, 而后咱们就能够和其余节点进行通信了, 具体 receiveConnection 代码逻辑如下:

    public void receiveConnection(final Socket sock) {
        DataInputStream din = null;
        try {
// 将输出流进行屡次包装
            din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));

// 真正解决连贯
            handleConnection(sock, din);
        } catch (IOException e) {LOG.error("Exception handling connection, addr: {}, closing server connection",
                     sock.getRemoteSocketAddress());
            closeSocket(sock);
        }
    }

将 io 输出流包装后, 进一步调用了 handleConnection 进行连贯的解决:

    private void handleConnection(Socket sock, DataInputStream din)
            throws IOException {
        Long sid = null;
        try {
            // 阻塞期待另外一个节点发送建设申请的第一个包
            // 先读取 8 个字节, 又可能 sid(服务 id), 也有可能是 protocolVersion(协定版本)sid = din.readLong();
// 读取到的是协定版本
            if (sid < 0) {
// 进一步读取 8 个字节, 就是真正的 sid
                sid = din.readLong();
// 读取 4 个字节, 也就是读取到的是残余的其余内容的字节数
                int num_remaining_bytes = din.readInt();
// 进行字数校验
                if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
                    closeSocket(sock);
                    return;
                }
                byte[] b = new byte[num_remaining_bytes];

            // 一次性将所有剩下的字节内容读取到 b 这个字节数组中
                int num_read = din.read(b);
                if (num_read != num_remaining_bytes) {LOG.error("Read only" + num_read + "bytes out of" + num_remaining_bytes + "sent by server" + sid);
                }
            }
            if (sid == QuorumPeer.OBSERVER_ID) {sid = observerCounter.getAndDecrement();
                LOG.info("Setting arbitrary identifier to observer:" + sid);
            }
        } catch (IOException e) {closeSocket(sock);
            LOG.warn("Exception reading or writing challenge:" + e.toString());
            return;
        }

        LOG.debug("Authenticating learner server.id: {}", sid);
        authServer.authenticate(sock, din);
        // 如果读取的 sid 小于以后节点的 sid, 则敞开之前建设过的连贯
        if (sid < this.mySid) {SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {sw.finish();
            }
            LOG.debug("Create new connection to server:" + sid);
            closeSocket(sock);
            // 敞开之前的连贯后, 由以后节点发动连贯申请
            connectOne(sid);

        } else {
            // 发送线程
            SendWorker sw = new SendWorker(sock, sid);
            // 承受线程
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = senderWorkerMap.get(sid);
            if(vsw != null)
                vsw.finish();
            senderWorkerMap.put(sid, sw);
            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
            // 启动发送线程
            sw.start();
            // 启动承受线程
            rw.start();
            return;
        }
    }

从这段代码中能够看出, 建设申请只能由 sid 大的一方发动, 由 sid 小的一方承受, 如当初有 sid=1,sid=2,sid= 3 三个节点, 那么只能由 2 这个节点发动连贯申请,1 这个这个节点解决连贯申请. 这样就保障了单方只放弃着一条连贯, 因为 Socket 是全双工模式, 反对单方进行通信.Socket 能够通过 ss.accept 获取到, 还能够通过以后办法的 connectOne 这个办法去和 sid 较小的节点进行连贯:

    synchronized public void connectOne(long sid){
// 就是判断 sendWorkerMap 中是否蕴含了以后 sid
        if (!connectedToPeer(sid)){
            InetSocketAddress electionAddr;
            if (view.containsKey(sid)) {
            // 拿到之前配置的 server.id 的选举地址
                electionAddr = view.get(sid).electionAddr;
            } else {LOG.warn("Invalid server id:" + sid);
                return;
            }
            try {LOG.debug("Opening channel to server" + sid);
// 实例化 Socket 对象
                Socket sock = new Socket();
                setSockOpts(sock);
                // 进行连贯
                sock.connect(view.get(sid).electionAddr, cnxTO);
                LOG.debug("Connected to server" + sid);
                if (quorumSaslAuthEnabled) {initiateConnectionAsync(sock, sid);
                } else {
                    // 同步初始化连贯, 也就是将以后本身的一些信息发送给其余节点
                    initiateConnection(sock, sid);
                }
            } catch (UnresolvedAddressException e) {
                LOG.warn("Cannot open channel to" + sid
                        + "at election address" + electionAddr, e);
                if (view.containsKey(sid)) {view.get(sid).recreateSocketAddresses();}
                throw e;
            } catch (IOException e) {
                LOG.warn("Cannot open channel to" + sid
                        + "at election address" + electionAddr,
                        e);
                if (view.containsKey(sid)) {view.get(sid).recreateSocketAddresses();}
            }
        } else {LOG.debug("There is a connection already for server" + sid);
        }
    }
    public void initiateConnection(final Socket sock, final Long sid) {
        try {startConnection(sock, sid);
        } catch (IOException e) {LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
                     new Object[] { sid, sock.getRemoteSocketAddress() }, e);
            closeSocket(sock);
            return;
        }
    }
    private boolean startConnection(Socket sock, Long sid)
            throws IOException {
        DataOutputStream dout = null;
        DataInputStream din = null;
        try {dout = new DataOutputStream(sock.getOutputStream());
            // 将本身 sid 发送给其余节点
            dout.writeLong(this.mySid);
            dout.flush();
            din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
        } catch (IOException e) {LOG.warn("Ignoring exception reading or writing challenge:", e);
            closeSocket(sock);
            return false;
        }
        // authenticate learner
        authLearner.authenticate(sock, view.get(sid).hostname);
        if (sid > this.mySid) {
            LOG.info("Have smaller server identifier, so dropping the" +
                     "connection: (" + sid + "," + this.mySid + ")");
            closeSocket(sock);
            // Otherwise proceed with the connection
        } else {
            // 以下逻辑就和通过 ss.accept 拿到 socket 对象之后一样的逻辑
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = senderWorkerMap.get(sid);
            if(vsw != null)
                vsw.finish();
            senderWorkerMap.put(sid, sw);
            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
            sw.start();
            rw.start();
            return true;    
            
        }
        return false;
    }

从以上几个办法中能够看出, 在通过 ServerSocket.accpet 和 socket.connect 拿到了 Socket 对象之后, 实例化进去一个 SendWorker 和一个 RecvWorker 这个对象, 并调用了各自的 start 办法去启动两个线程, 其实就是通过这 2 个线程去实现和其余节点的申请和响应的数据传输工作, 一 个节点保护一个 SendWorker、一个 RecvWorker 和通过 queueSendMap 来存储一个队列来进行通信的。
具体前面这 3 个对象是如何发挥作用的, 会在选举细节中具体解说. 实现这一系列的选举筹备工作后, 咱们回到 QuorumPeer#start 办法中, 接下来 QuorumPeer#start 办法调用 super.start()办法, 因为 QuorumPeer 这个对象继承了 ZooKeeperThread, 而 ZooKeeperThread 又继承了 jdk 的 Thread 类, 所以调用了 super.start 之后, 就会独自开拓一个线程去执行 QuorumPeer#run 办法, 也就是真正进行选举的中央:

    public void run() {setName("QuorumPeer" + "[myid=" + getId() + "]" +
                cnxnFactory.getLocalAddress());
        LOG.debug("Starting quorum peer");
        //1.jmx 拓展点
        try {jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(jmxQuorumBean, null);
            for(QuorumServer s: getView().values()){
                ZKMBeanInfo p;
                if (getId() == s.id) {p = jmxLocalPeerBean = new LocalPeerBean(this);
                    try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {LOG.warn("Failed to register with JMX", e);
                        jmxLocalPeerBean = null;
                    }
                } else {p = new RemotePeerBean(s);
                    try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {LOG.warn("Failed to register with JMX", e);
                    }
                }
            }
        } catch (Exception e) {LOG.warn("Failed to register with JMX", e);
            jmxQuorumBean = null;
        }
        2.// 选举逻辑
        try {
            /*
             * Main loop
             */
            while (running) {switch (getPeerState()) {
                //1.Looking 状态
                case LOOKING:
                    LOG.info("LOOKING");
                    // 开启只读模式
                    if (Boolean.getBoolean("readonlymode.enabled")) {LOG.info("Attempting to start ReadOnlyZooKeeperServer");
                        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                                logFactory, this,
                                new ZooKeeperServer.BasicDataTreeBuilder(),
                                this.zkDb);
                        Thread roZkMgr = new Thread() {public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {roZk.startup();
                                    }
                                } catch (InterruptedException e) {LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception e) {LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                }
                            }
                        };
                        try {roZkMgr.start();
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {LOG.warn("Unexpected exception",e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();}
                    } else {
                        try {setBCVote(null);
                            // 调用 ElectionAlg#lookForLeader 办法, 而后返回选举后的投票信息
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;
                // 选举完结,observer 角色进如到此处
                case OBSERVING:
                    try {LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);                        
                    } finally {observer.shutdown();
                        setObserver(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                // 选举完结,Follower 角色进入到此
                case FOLLOWING:
                    try {LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);
                    } finally {follower.shutdown();
                        setFollower(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                // 选举完结,Leader 角色进入到此
                case LEADING:
                    LOG.info("LEADING");
                    try {setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {LOG.warn("Unexpected exception",e);
                    } finally {if (leader != null) {leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                }
            }
        } finally {LOG.warn("QuorumPeer main thread exited");
            try {MBeanRegistry.getInstance().unregisterAll();} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);
            }
            jmxQuorumBean = null;
            jmxLocalPeerBean = null;
        }
    }

咱们能够从上诉代码中的 MainLoop 处开始看, 进入 while 循环后, 因为以后节点还是 looking 状态, 苏所以进入到 looking 分支, 在这个分支中能够看到首先判断以后节点是否是只读模式, 因为以后不解说只读模式, 所以间接进入到另外一个分支:

                        setBCVote(null);
                        // 调用 ElectionAlg#lookForLeader 办法, 而后返回选举后的投票信息
                        setCurrentVote(makeLEStrategy().lookForLeader());

makeLEStrategy 办法返回的其实就是咱们在 QuorumPeer#startLeaderElection 办法中实例话进去的 FastLeaderElection 实例, 而后调用 FastLeaderElection#lookForLeader 办法进行 Leader 选举:

  public Vote lookForLeader() throws InterruptedException {
        try {self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }
        if (self.start_fle == 0) {self.start_fle = Time.currentElapsedTime();
        }
        try {HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){logicalclock.incrementAndGet();
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =" + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if(n == null){if(manager.haveDelivered()){sendNotifications();
                    } else {manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out:" + notTimeout);
                }
                else if(validVoter(n.sid) && validVoter(n.leader)) {
                    /*
                     * Only proceed if the vote comes from a replica in the
                     * voting view for a replica in the voting view.
                     */
                    switch (n.state) {
                    case LOOKING:
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();} else if (n.electionEpoch < logicalclock.get()) {if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();}

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }

                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock.get(),
                                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer:" + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock.get()){
                            recvset.put(n.sid, new Vote(n.leader,
                                                          n.zxid,
                                                          n.electionEpoch,
                                                          n.peerEpoch));
                           
                            if(ooePredicate(recvset, outofelection, n)) {self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, 
                                        n.zxid, 
                                        n.electionEpoch, 
                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify
                         * a majority is following the same leader.
                         */
                        outofelection.put(n.sid, new Vote(n.version,
                                                            n.leader,
                                                            n.zxid,
                                                            n.electionEpoch,
                                                            n.peerEpoch,
                                                            n.state));
           
                        if(ooePredicate(outofelection, outofelection, n)) {synchronized(this){logicalclock.set(n.electionEpoch);
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader,
                                                    n.zxid,
                                                    n.electionEpoch,
                                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                                n.state, n.sid);
                        break;
                    }
                } else {if (!validVoter(n.leader)) {LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {if(self.jmxLeaderElectionBean != null){MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}",
                    manager.getConnectionThreadCount());
        }
    }

未完待续 …….

退出移动版