一.源码仓库:

zookeeper
基于分支3.4.14分支在windows系统启动流程进行剖析。

二.流程剖析:

  1. 源码入口
    通过zkServer.cmd可执行文件内容能够看出zookeeper的服务端是通过org.apache.zookeeper.server.quorum.QuorumPeerMain这个类的main作为入口来启动服务端程序的.main办法传入的是咱们zoo.cfg文件的地址,而后通过解析zoo.cfg文件,将key,value的配置信息转换成QuorumPeerConfig的对象,转换细节能够看QuorumPeerConfig.parse办法,其中转换后的外围配置参数有:
参数名参数形容
dataLogDir事务日志存储门路
dataDir快照存储门路
electionType选举算法,目前只反对3-疾速选举算法
myid以后服务id
tickTime工夫单位
initLimit
syncLimit事务存储门路
minSessionTimeout最小会话超时工夫
maxSessionTimeout最大会话超时工夫
peerType角色类型-OBSERVER,PARTICIPANT
clientPort客户端连贯端口
clientPortAddress客户端连贯Host
snapRetainCount快照保留个数,最小为3
purgeInterval快照革除距离
server.sidhostName:port(通信端口):electionPort(选举端口):peerType
maxClientCnxns最大客户端连接数

拿到解析后的参数后,能够通过是否配置了server.id参数来决定是否集群启动还是单机启动,单机启动运行通过ZooKeeperServerMain#main办法启动,集群启动则还是在QuorumPeerMain#runFromConfig办法进行解决的,这里咱们就间接解说集群模式,因为集群模式比单机模式多了集群间的通信相干的解决,如Leader选举,数据同步,申请转发等.

    public void runFromConfig(QuorumPeerConfig config) throws IOException {      try {          ManagedUtil.registerLog4jMBeans();      } catch (JMException e) {          LOG.warn("Unable to register log4j JMX control", e);      }        LOG.info("Starting quorum peer");      try {          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();          cnxnFactory.configure(config.getClientPortAddress(),                                config.getMaxClientCnxns());          quorumPeer = getQuorumPeer();          quorumPeer.setQuorumPeers(config.getServers());          quorumPeer.setTxnFactory(new FileTxnSnapLog(                  new File(config.getDataLogDir()),                  new File(config.getDataDir())));          quorumPeer.setElectionType(config.getElectionAlg());          quorumPeer.setMyid(config.getServerId());          quorumPeer.setTickTime(config.getTickTime());          quorumPeer.setInitLimit(config.getInitLimit());          quorumPeer.setSyncLimit(config.getSyncLimit());          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());          quorumPeer.setCnxnFactory(cnxnFactory);          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());          quorumPeer.setClientPortAddress(config.getClientPortAddress());          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));          quorumPeer.setLearnerType(config.getPeerType());          quorumPeer.setSyncEnabled(config.getSyncEnabled());          // sets quorum sasl authentication configurations          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);          if(quorumPeer.isQuorumSaslAuthEnabled()){              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);          }          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);          quorumPeer.initialize();          quorumPeer.start();          quorumPeer.join();      } catch (InterruptedException e) {          // warn, but generally this is ok          LOG.warn("Quorum Peer interrupted", e);      }    }

能够从代码片段中能够看出,新创建出了一个QuorumPeer对象,其实这就是OOP思维,以后实例代表着集群的一个节点,而后将QuorumPeerConfig从新设置给QuorumPeer对象,在这里呈现几个新的类:

类名类形容
FileTxnSnapLog长久化外围类别,包含快照,事务日志操作
ServerCnxnFactory 3服务端网络解决外围类,其实现蕴含NIO和Netty两种实现
ZKDatabase内存操作外围类,通过树结构存储

在设置了参数之后,接下来调用了QuorumPeer#initialize办法,在这个办法里次要是一些鉴权类的对象实例化。外围还是QuorumPeer#start办法:

        loadDataBase();//将数据从快照和事务日志加载到内存中        cnxnFactory.start();        //网络服务启动        startLeaderElection(); //选举工作筹备        super.start(); 

loadDataBase:
在这个办法里次要是通过委托给ZKDatabase#loadDataBase进行加载工作的

    public long loadDataBase() throws IOException {        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);        initialized = true;        return zxid;    }
    public long restore(DataTree dt, Map<Long, Integer> sessions,             PlayBackListener listener) throws IOException {        snapLog.deserialize(dt, sessions); //数据反序列化        return fastForwardFromEdits(dt, sessions, listener);    }
 public long deserialize(DataTree dt, Map<Long, Integer> sessions)            throws IOException {        //找到无效的100个快照文件,降序        List<File> snapList = findNValidSnapshots(100);        if (snapList.size() == 0) {            return -1L;        }        File snap = null;        boolean foundValid = false;        for (int i = 0; i < snapList.size(); i++) {            snap = snapList.get(i);            InputStream snapIS = null;            CheckedInputStream crcIn = null;            try {                LOG.info("Reading snapshot " + snap);                snapIS = new BufferedInputStream(new FileInputStream(snap));                crcIn = new CheckedInputStream(snapIS, new Adler32());                InputArchive ia = BinaryInputArchive.getArchive(crcIn);                //真正序列化的中央                deserialize(dt,sessions, ia);                long checkSum = crcIn.getChecksum().getValue();                long val = ia.readLong("val");                //校验快照文件的完整性                if (val != checkSum) {                    throw new IOException("CRC corruption in snapshot :  " + snap);                }                foundValid = true;                break;            } catch(IOException e) {                LOG.warn("problem reading snap file " + snap, e);            } finally {                if (snapIS != null)                     snapIS.close();                if (crcIn != null)                     crcIn.close();            }         }        if (!foundValid) {            throw new IOException("Not able to find valid snapshots in " + snapDir);        }        //快照文件命名为snapshot.lastZxid        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);        return dt.lastProcessedZxid;    }

在ZkDataBase里有一下几个外围属性:

表列 A表列 B
DataTree dataTree存储树结构
FileTxnSnapLog snapLog事务快照长久化类别
,ConcurrentHashMap<Long, Integer> sessionsWithTimeouts会话治理,sessionId

在loadDataBase办法中,能够看到调用的snapLog#restore办法,进入到restore办法中能够看到调用到的是FileTxnSnapLog#deserialize进行反序化,而后保留到传入的dt,sessions参数中,能够定位到FileTxnSnapLog#deserialize(DataTree dt, Map<Long, Integer> sessions,

        InputArchive ia)的这个重载办法来看下,如何对快照文件进行反序列化的:
    public void deserialize(DataTree dt, Map<Long, Integer> sessions,            InputArchive ia) throws IOException {        FileHeader header = new FileHeader();        header.deserialize(ia, "fileheader");        if (header.getMagic() != SNAP_MAGIC) {            throw new IOException("mismatching magic headers "                    + header.getMagic() +                     " !=  " + FileSnap.SNAP_MAGIC);        }        

首先通过文件输出流的包装类InputArchive进行读取,调用的是FileHeader#deserialize办法:

  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {    a_.startRecord(tag);    magic=a_.readInt("magic");    version=a_.readInt("version");    dbid=a_.readLong("dbid");    a_.endRecord(tag);}

FileHeader实现Record接口,其实前面所有须要的序列化和反序列化的都实现了这个接口,通过传进来的输出流对象来自定义本人的序列化和反序列化细节.
在这里能够看到FileHeader的存储构造为:

属性值占用大小形容
magic4字节魔法数字
version4字节版本号
version8字节数据库id

通过FileHedare#deserialize办法后,曾经从文件流读取了16个字节,接下来调用的是 SerializeUtils#deserializeSnapshot(dt,ia,sessions)进行其余内容的加载,

    public static void deserializeSnapshot(DataTree dt,InputArchive ia,            Map<Long, Integer> sessions) throws IOException {        //会话数量        int count = ia.readInt("count");        while (count > 0) {            //会话id            long id = ia.readLong("id");            //会话超时工夫            int to = ia.readInt("timeout");            sessions.put(id, to);            if (LOG.isTraceEnabled()) {                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,                        "loadData --- session in archive: " + id                        + " with timeout: " + to);            }            count--;        }        dt.deserialize(ia, "tree");    }

能够看到首先是从流外面读取了4个字节的count属性,也就是会话数量,接着再遍历读取了8个字节sessionId(会话id)和4个字节的timeout(会话超时工夫),再赋值个给了sessions(也就是ZkDataBase的sessionsWithTimeouts属性),最初调用的是DataTree#deserialize进行真正存储内容的反序列化工作:

    public void deserialize(InputArchive ia, String tag) throws IOException {        aclCache.deserialize(ia);        nodes.clear();        pTrie.clear();        String path = ia.readString("path");        while (!path.equals("/")) {            DataNode node = new DataNode();            ia.readRecord(node, "node");            nodes.put(path, node);            synchronized (node) {                aclCache.addUsage(node.acl);            }            int lastSlash = path.lastIndexOf('/');            if (lastSlash == -1) {                root = node;            } else {                String parentPath = path.substring(0, lastSlash);                node.parent = nodes.get(parentPath);                if (node.parent == null) {                    throw new IOException("Invalid Datatree, unable to find " +                            "parent " + parentPath + " of path " + path);                }                node.parent.addChild(path.substring(lastSlash + 1));                long eowner = node.stat.getEphemeralOwner();                if (eowner != 0) {                    HashSet<String> list = ephemerals.get(eowner);                    if (list == null) {                        list = new HashSet<String>();                        ephemerals.put(eowner, list);                    }                    list.add(path);                }            }            path = ia.readString("path");        }        nodes.put("/", root);        setupQuota();        aclCache.purgeUnused();    }

  1. 网络传输(NIO)
    zookeeper与客户端建设连贯与申请与响应的数据传输都是通过ServerCnxnFactory这个类的实现类进行解决的,咱们这里间接通过NIO的实现类NIOServerCnxnFactory来进行解说,再QuorumPeer的start办法里咱们看到调用NIOServerCnxnFactory#start办法.
    public void start() {        // ensure thread is started once and only once        if (thread.getState() == Thread.State.NEW) {            thread.start();        }    }

再start办法里咱们看到就简略调用了Thread#start办法启动线程.至于thread办法是在哪里进行初始化的,我能够定位到NIOServerCnxnFactory#configure办法里:

    public void configure(InetSocketAddress addr, int maxcc) throws IOException {        configureSaslLogin();        //初始化线程对象        thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);        thread.setDaemon(true);        //设置最大连接数参数        maxClientCnxns = maxcc;        //初始化Socket相干配置        this.ss = ServerSocketChannel.open();        ss.socket().setReuseAddress(true);        LOG.info("binding to port " + addr);        ss.socket().bind(addr);        ss.configureBlocking(false);        ss.register(selector, SelectionKey.OP_ACCEPT);    }
  1. 选举
    在进启动了网络传输服务之后,就开始筹备着选举前的一些筹备工作,咱们能够从QuorumPeer#start办法中的QuorumPeer#startLeaderElection()调用进行一个选举的切入点:

     synchronized public void startLeaderElection() {     try {//设置初始化投票         currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());     } catch(IOException e) {         RuntimeException re = new RuntimeException(e.getMessage());         re.setStackTrace(e.getStackTrace());         throw re;     }     for (QuorumServer p : getView().values()) {         if (p.id == myid) {             myQuorumAddr = p.addr;             break;         }     }     if (myQuorumAddr == null) {         throw new RuntimeException("My id " + myid + " not in the peer list");     }     if (electionType == 0) {         try {             udpSocket = new DatagramSocket(myQuorumAddr.getPort());             //启动响应线程             responder = new ResponderThread();             responder.start();         } catch (SocketException e) {             throw new RuntimeException(e);         }     }     //依据配置的选举算法进行一些初始化工作     this.electionAlg = createElectionAlgorithm(electionType); }

    从startLeaderElection这个办法中能够看出,次要是将初始化投票设置为本身,sid为本身serverId,zxid为通过快照和事务日志加载后的最大lastZxid,还有peerEpoch(选举年代)也就是以后本身的选举年代,而后就是启动了ReponseThread这个响应线程,外围逻辑还是在createElectionAlgorithm这个办法中,咱们能够跟进去看一下具体的代码逻辑:

   protected Election createElectionAlgorithm(int electionAlgorithm){        Election le=null;                        //TODO: use a factory rather than a switch        switch (electionAlgorithm) {        case 0:            le = new LeaderElection(this);            break;        case 1://已过期            le = new AuthFastLeaderElection(this);            break;        case 2://已过期            le = new AuthFastLeaderElection(this, true);            break;        case 3://创立连贯管理器            qcm = createCnxnManager();            QuorumCnxManager.Listener listener = qcm.listener;            if(listener != null){                //启动监听其余节点的连贯申请                listener.start();//实例化疾速选举算法外围类                le = new FastLeaderElection(this, qcm);            } else {                LOG.error("Null listener when initializing cnx manager");            }            break;        default:            assert false;        }        return le;    }

从上述代码中,能够看出次要工作是实例化了一个QuorumCnxManager这个对象,也就是通过这个对象中的Listener这个类来解决和其余节点的连贯申请,调用了Listener#start办法理论是运行到了Listener#run办法代码中:

        public void run() {            int numRetries = 0;            InetSocketAddress addr;            while((!shutdown) && (numRetries < 3)){                try {                    //实例化ServerSocket                    ss = new ServerSocket();                    ss.setReuseAddress(true);                    if (listenOnAllIPs) {                        int port = view.get(QuorumCnxManager.this.mySid)                            .electionAddr.getPort();                        addr = new InetSocketAddress(port);                    } else {                        addr = view.get(QuorumCnxManager.this.mySid)                            .electionAddr;                    }                    LOG.info("My election bind port: " + addr.toString());                    setName(view.get(QuorumCnxManager.this.mySid)                            .electionAddr.toString());                    ss.bind(addr);                    while (!shutdown) {                        //阻塞期待其余节点申请连贯                        Socket client = ss.accept();                        setSockOpts(client);                        LOG.info("Received connection request "                                + client.getRemoteSocketAddress());                        if (quorumSaslAuthEnabled) {                            receiveConnectionAsync(client);                        } else {                            //承受申请外围逻辑                            receiveConnection(client);                        }                        numRetries = 0;                    }                } catch (IOException e) {                    LOG.error("Exception while listening", e);                    numRetries++;                    try {                        ss.close();                        Thread.sleep(1000);                    } catch (IOException ie) {                        LOG.error("Error closing server socket", ie);                    } catch (InterruptedException ie) {                        LOG.error("Interrupted while sleeping. " +                                  "Ignoring exception", ie);                    }                }            }            LOG.info("Leaving listener");            if (!shutdown) {                LOG.error("As I'm leaving the listener thread, "                        + "I won't be able to participate in leader "                        + "election any longer: "                        + view.get(QuorumCnxManager.this.mySid).electionAddr);            }        }

该办法次要是应用jdk的阻塞io与其余节点建设连贯,不理解的能够去自行补充一下jdk的socket编程基础知识,在第二个while循环中的ss.accept()代码是会始终阻塞期待其余节点申请连贯,当其余节点建设连贯后,就会返回一个Socket实例,而后将Socket实例传入receiveConnection办法中,而后咱们就能够和其余节点进行通信了,具体receiveConnection代码逻辑如下:

    public void receiveConnection(final Socket sock) {        DataInputStream din = null;        try {//将输出流进行屡次包装            din = new DataInputStream(                    new BufferedInputStream(sock.getInputStream()));//真正解决连贯            handleConnection(sock, din);        } catch (IOException e) {            LOG.error("Exception handling connection, addr: {}, closing server connection",                     sock.getRemoteSocketAddress());            closeSocket(sock);        }    }

将io输出流包装后,进一步调用了handleConnection进行连贯的解决:

    private void handleConnection(Socket sock, DataInputStream din)            throws IOException {        Long sid = null;        try {            // 阻塞期待另外一个节点发送建设申请的第一个包            //先读取8个字节,又可能sid(服务id),也有可能是protocolVersion(协定版本)            sid = din.readLong();//读取到的是协定版本            if (sid < 0) {//进一步读取8个字节,就是真正的sid                sid = din.readLong();//读取4个字节,也就是读取到的是残余的其余内容的字节数                int num_remaining_bytes = din.readInt();//进行字数校验                if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {                    LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);                    closeSocket(sock);                    return;                }                byte[] b = new byte[num_remaining_bytes];            //一次性将所有剩下的字节内容读取到b这个字节数组中                int num_read = din.read(b);                if (num_read != num_remaining_bytes) {                    LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);                }            }            if (sid == QuorumPeer.OBSERVER_ID) {                sid = observerCounter.getAndDecrement();                LOG.info("Setting arbitrary identifier to observer: " + sid);            }        } catch (IOException e) {            closeSocket(sock);            LOG.warn("Exception reading or writing challenge: " + e.toString());            return;        }        LOG.debug("Authenticating learner server.id: {}", sid);        authServer.authenticate(sock, din);        //如果读取的sid小于以后节点的sid,则敞开之前建设过的连贯        if (sid < this.mySid) {            SendWorker sw = senderWorkerMap.get(sid);            if (sw != null) {                sw.finish();            }            LOG.debug("Create new connection to server: " + sid);            closeSocket(sock);            //敞开之前的连贯后,由以后节点发动连贯申请            connectOne(sid);        } else {            //发送线程            SendWorker sw = new SendWorker(sock, sid);            //承受线程            RecvWorker rw = new RecvWorker(sock, din, sid, sw);            sw.setRecv(rw);            SendWorker vsw = senderWorkerMap.get(sid);            if(vsw != null)                vsw.finish();            senderWorkerMap.put(sid, sw);            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));            //启动发送线程            sw.start();            //启动承受线程            rw.start();            return;        }    }

从这段代码中能够看出,建设申请只能由sid大的一方发动,由sid小的一方承受,如当初有sid=1,sid=2,sid=3三个节点,那么只能由2这个节点发动连贯申请,1这个这个节点解决连贯申请.这样就保障了单方只放弃着一条连贯,因为Socket是全双工模式,反对单方进行通信.Socket能够通过ss.accept获取到,还能够通过以后办法的connectOne这个办法去和sid较小的节点进行连贯:

    synchronized public void connectOne(long sid){//就是判断sendWorkerMap中是否蕴含了以后sid        if (!connectedToPeer(sid)){            InetSocketAddress electionAddr;            if (view.containsKey(sid)) {            //拿到之前配置的server.id的选举地址                electionAddr = view.get(sid).electionAddr;            } else {                LOG.warn("Invalid server id: " + sid);                return;            }            try {                LOG.debug("Opening channel to server " + sid);//实例化Socket对象                Socket sock = new Socket();                setSockOpts(sock);                //进行连贯                sock.connect(view.get(sid).electionAddr, cnxTO);                LOG.debug("Connected to server " + sid);                if (quorumSaslAuthEnabled) {                    initiateConnectionAsync(sock, sid);                } else {                    //同步初始化连贯,也就是将以后本身的一些信息发送给其余节点                    initiateConnection(sock, sid);                }            } catch (UnresolvedAddressException e) {                LOG.warn("Cannot open channel to " + sid                        + " at election address " + electionAddr, e);                if (view.containsKey(sid)) {                    view.get(sid).recreateSocketAddresses();                }                throw e;            } catch (IOException e) {                LOG.warn("Cannot open channel to " + sid                        + " at election address " + electionAddr,                        e);                if (view.containsKey(sid)) {                    view.get(sid).recreateSocketAddresses();                }            }        } else {            LOG.debug("There is a connection already for server " + sid);        }    }
    public void initiateConnection(final Socket sock, final Long sid) {        try {            startConnection(sock, sid);        } catch (IOException e) {            LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",                     new Object[] { sid, sock.getRemoteSocketAddress() }, e);            closeSocket(sock);            return;        }    }
    private boolean startConnection(Socket sock, Long sid)            throws IOException {        DataOutputStream dout = null;        DataInputStream din = null;        try {            dout = new DataOutputStream(sock.getOutputStream());            //将本身sid发送给其余节点            dout.writeLong(this.mySid);            dout.flush();            din = new DataInputStream(                    new BufferedInputStream(sock.getInputStream()));        } catch (IOException e) {            LOG.warn("Ignoring exception reading or writing challenge: ", e);            closeSocket(sock);            return false;        }        // authenticate learner        authLearner.authenticate(sock, view.get(sid).hostname);        if (sid > this.mySid) {            LOG.info("Have smaller server identifier, so dropping the " +                     "connection: (" + sid + ", " + this.mySid + ")");            closeSocket(sock);            // Otherwise proceed with the connection        } else {            //以下逻辑就和通过ss.accept拿到socket对象之后一样的逻辑            SendWorker sw = new SendWorker(sock, sid);            RecvWorker rw = new RecvWorker(sock, din, sid, sw);            sw.setRecv(rw);            SendWorker vsw = senderWorkerMap.get(sid);            if(vsw != null)                vsw.finish();            senderWorkerMap.put(sid, sw);            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));            sw.start();            rw.start();            return true;                        }        return false;    }

从以上几个办法中能够看出,在通过ServerSocket.accpet和socket.connect拿到了Socket对象之后,实例化进去一个SendWorker和一个RecvWorker这个对象,并调用了各自的start办法去启动两个线程,其实就是通过这2个线程去实现和其余节点的申请和响应的数据传输工作,一个节点保护一个SendWorker、一个RecvWorker和通过queueSendMap来存储一个队列来进行通信的。
具体前面这3个对象是如何发挥作用的,会在选举细节中具体解说.实现这一系列的选举筹备工作后,咱们回到QuorumPeer#start办法中,接下来QuorumPeer#start办法调用super.start()办法,因为QuorumPeer这个对象继承了ZooKeeperThread,而ZooKeeperThread又继承了jdk的Thread类,所以调用了super.start之后,就会独自开拓一个线程去执行QuorumPeer#run办法,也就是真正进行选举的中央:

    public void run() {        setName("QuorumPeer" + "[myid=" + getId() + "]" +                cnxnFactory.getLocalAddress());        LOG.debug("Starting quorum peer");        //1.jmx拓展点        try {            jmxQuorumBean = new QuorumBean(this);            MBeanRegistry.getInstance().register(jmxQuorumBean, null);            for(QuorumServer s: getView().values()){                ZKMBeanInfo p;                if (getId() == s.id) {                    p = jmxLocalPeerBean = new LocalPeerBean(this);                    try {                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);                    } catch (Exception e) {                        LOG.warn("Failed to register with JMX", e);                        jmxLocalPeerBean = null;                    }                } else {                    p = new RemotePeerBean(s);                    try {                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);                    } catch (Exception e) {                        LOG.warn("Failed to register with JMX", e);                    }                }            }        } catch (Exception e) {            LOG.warn("Failed to register with JMX", e);            jmxQuorumBean = null;        }        2.//选举逻辑        try {            /*             * Main loop             */            while (running) {                switch (getPeerState()) {                //1.Looking状态                case LOOKING:                    LOG.info("LOOKING");                    //开启只读模式                    if (Boolean.getBoolean("readonlymode.enabled")) {                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");                        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(                                logFactory, this,                                new ZooKeeperServer.BasicDataTreeBuilder(),                                this.zkDb);                        Thread roZkMgr = new Thread() {                            public void run() {                                try {                                    // lower-bound grace period to 2 secs                                    sleep(Math.max(2000, tickTime));                                    if (ServerState.LOOKING.equals(getPeerState())) {                                        roZk.startup();                                    }                                } catch (InterruptedException e) {                                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");                                } catch (Exception e) {                                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);                                }                            }                        };                        try {                            roZkMgr.start();                            setBCVote(null);                            setCurrentVote(makeLEStrategy().lookForLeader());                        } catch (Exception e) {                            LOG.warn("Unexpected exception",e);                            setPeerState(ServerState.LOOKING);                        } finally {                            // If the thread is in the the grace period, interrupt                            // to come out of waiting.                            roZkMgr.interrupt();                            roZk.shutdown();                        }                    } else {                        try {                            setBCVote(null);                            //调用ElectionAlg#lookForLeader办法,而后返回选举后的投票信息                            setCurrentVote(makeLEStrategy().lookForLeader());                        } catch (Exception e) {                            LOG.warn("Unexpected exception", e);                            setPeerState(ServerState.LOOKING);                        }                    }                    break;                //选举完结,observer角色进如到此处                case OBSERVING:                    try {                        LOG.info("OBSERVING");                        setObserver(makeObserver(logFactory));                        observer.observeLeader();                    } catch (Exception e) {                        LOG.warn("Unexpected exception",e );                                            } finally {                        observer.shutdown();                        setObserver(null);                        setPeerState(ServerState.LOOKING);                    }                    break;                //选举完结,Follower角色进入到此                case FOLLOWING:                    try {                        LOG.info("FOLLOWING");                        setFollower(makeFollower(logFactory));                        follower.followLeader();                    } catch (Exception e) {                        LOG.warn("Unexpected exception",e);                    } finally {                        follower.shutdown();                        setFollower(null);                        setPeerState(ServerState.LOOKING);                    }                    break;                //选举完结,Leader角色进入到此                case LEADING:                    LOG.info("LEADING");                    try {                        setLeader(makeLeader(logFactory));                        leader.lead();                        setLeader(null);                    } catch (Exception e) {                        LOG.warn("Unexpected exception",e);                    } finally {                        if (leader != null) {                            leader.shutdown("Forcing shutdown");                            setLeader(null);                        }                        setPeerState(ServerState.LOOKING);                    }                    break;                }            }        } finally {            LOG.warn("QuorumPeer main thread exited");            try {                MBeanRegistry.getInstance().unregisterAll();            } catch (Exception e) {                LOG.warn("Failed to unregister with JMX", e);            }            jmxQuorumBean = null;            jmxLocalPeerBean = null;        }    }

咱们能够从上诉代码中的MainLoop处开始看,进入while循环后,因为以后节点还是looking状态,苏所以进入到looking分支,在这个分支中能够看到首先判断以后节点是否是只读模式,因为以后不解说只读模式,所以间接进入到另外一个分支:

                        setBCVote(null);                        //调用ElectionAlg#lookForLeader办法,而后返回选举后的投票信息                        setCurrentVote(makeLEStrategy().lookForLeader());

makeLEStrategy办法返回的其实就是咱们在QuorumPeer#startLeaderElection办法中实例话进去的FastLeaderElection实例,而后调用FastLeaderElection#lookForLeader办法进行Leader选举:

  public Vote lookForLeader() throws InterruptedException {        try {            self.jmxLeaderElectionBean = new LeaderElectionBean();            MBeanRegistry.getInstance().register(                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);        } catch (Exception e) {            LOG.warn("Failed to register with JMX", e);            self.jmxLeaderElectionBean = null;        }        if (self.start_fle == 0) {           self.start_fle = Time.currentElapsedTime();        }        try {            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();            int notTimeout = finalizeWait;            synchronized(this){                logicalclock.incrementAndGet();                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());            }            LOG.info("New election. My id =  " + self.getId() +                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));            sendNotifications();            /*             * Loop in which we exchange notifications until we find a leader             */            while ((self.getPeerState() == ServerState.LOOKING) &&                    (!stop)){                /*                 * Remove next notification from queue, times out after 2 times                 * the termination time                 */                Notification n = recvqueue.poll(notTimeout,                        TimeUnit.MILLISECONDS);                /*                 * Sends more notifications if haven't received enough.                 * Otherwise processes new notification.                 */                if(n == null){                    if(manager.haveDelivered()){                        sendNotifications();                    } else {                        manager.connectAll();                    }                    /*                     * Exponential backoff                     */                    int tmpTimeOut = notTimeout*2;                    notTimeout = (tmpTimeOut < maxNotificationInterval?                            tmpTimeOut : maxNotificationInterval);                    LOG.info("Notification time out: " + notTimeout);                }                else if(validVoter(n.sid) && validVoter(n.leader)) {                    /*                     * Only proceed if the vote comes from a replica in the                     * voting view for a replica in the voting view.                     */                    switch (n.state) {                    case LOOKING:                        // If notification > current, replace and send messages out                        if (n.electionEpoch > logicalclock.get()) {                            logicalclock.set(n.electionEpoch);                            recvset.clear();                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {                                updateProposal(n.leader, n.zxid, n.peerEpoch);                            } else {                                updateProposal(getInitId(),                                        getInitLastLoggedZxid(),                                        getPeerEpoch());                            }                            sendNotifications();                        } else if (n.electionEpoch < logicalclock.get()) {                            if(LOG.isDebugEnabled()){                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"                                        + Long.toHexString(n.electionEpoch)                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));                            }                            break;                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,                                proposedLeader, proposedZxid, proposedEpoch)) {                            updateProposal(n.leader, n.zxid, n.peerEpoch);                            sendNotifications();                        }                        if(LOG.isDebugEnabled()){                            LOG.debug("Adding vote: from=" + n.sid +                                    ", proposed leader=" + n.leader +                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));                        }                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));                        if (termPredicate(recvset,                                new Vote(proposedLeader, proposedZxid,                                        logicalclock.get(), proposedEpoch))) {                            // Verify if there is any change in the proposed leader                            while((n = recvqueue.poll(finalizeWait,                                    TimeUnit.MILLISECONDS)) != null){                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,                                        proposedLeader, proposedZxid, proposedEpoch)){                                    recvqueue.put(n);                                    break;                                }                            }                            /*                             * This predicate is true once we don't read any new                             * relevant message from the reception queue                             */                            if (n == null) {                                self.setPeerState((proposedLeader == self.getId()) ?                                        ServerState.LEADING: learningState());                                Vote endVote = new Vote(proposedLeader,                                                        proposedZxid,                                                        logicalclock.get(),                                                        proposedEpoch);                                leaveInstance(endVote);                                return endVote;                            }                        }                        break;                    case OBSERVING:                        LOG.debug("Notification from observer: " + n.sid);                        break;                    case FOLLOWING:                    case LEADING:                        /*                         * Consider all notifications from the same epoch                         * together.                         */                        if(n.electionEpoch == logicalclock.get()){                            recvset.put(n.sid, new Vote(n.leader,                                                          n.zxid,                                                          n.electionEpoch,                                                          n.peerEpoch));                                                       if(ooePredicate(recvset, outofelection, n)) {                                self.setPeerState((n.leader == self.getId()) ?                                        ServerState.LEADING: learningState());                                Vote endVote = new Vote(n.leader,                                         n.zxid,                                         n.electionEpoch,                                         n.peerEpoch);                                leaveInstance(endVote);                                return endVote;                            }                        }                        /*                         * Before joining an established ensemble, verify                         * a majority is following the same leader.                         */                        outofelection.put(n.sid, new Vote(n.version,                                                            n.leader,                                                            n.zxid,                                                            n.electionEpoch,                                                            n.peerEpoch,                                                            n.state));                                   if(ooePredicate(outofelection, outofelection, n)) {                            synchronized(this){                                logicalclock.set(n.electionEpoch);                                self.setPeerState((n.leader == self.getId()) ?                                        ServerState.LEADING: learningState());                            }                            Vote endVote = new Vote(n.leader,                                                    n.zxid,                                                    n.electionEpoch,                                                    n.peerEpoch);                            leaveInstance(endVote);                            return endVote;                        }                        break;                    default:                        LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",                                n.state, n.sid);                        break;                    }                } else {                    if (!validVoter(n.leader)) {                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);                    }                    if (!validVoter(n.sid)) {                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);                    }                }            }            return null;        } finally {            try {                if(self.jmxLeaderElectionBean != null){                    MBeanRegistry.getInstance().unregister(                            self.jmxLeaderElectionBean);                }            } catch (Exception e) {                LOG.warn("Failed to unregister with JMX", e);            }            self.jmxLeaderElectionBean = null;            LOG.debug("Number of connection processing threads: {}",                    manager.getConnectionThreadCount());        }    }

未完待续.......