一.源码仓库:
zookeeper
基于分支3.4.14分支在windows系统启动流程进行剖析。
二.流程剖析:
- 源码入口
通过zkServer.cmd可执行文件内容能够看出zookeeper的服务端是通过org.apache.zookeeper.server.quorum.QuorumPeerMain这个类的main作为入口来启动服务端程序的.main办法传入的是咱们zoo.cfg文件的地址,而后通过解析zoo.cfg文件,将key,value的配置信息转换成QuorumPeerConfig的对象,转换细节能够看QuorumPeerConfig.parse办法,其中转换后的外围配置参数有:
参数名 | 参数形容 |
---|---|
dataLogDir | 事务日志存储门路 |
dataDir | 快照存储门路 |
electionType | 选举算法,目前只反对3-疾速选举算法 |
myid | 以后服务id |
tickTime | 工夫单位 |
initLimit | |
syncLimit | 事务存储门路 |
minSessionTimeout | 最小会话超时工夫 |
maxSessionTimeout | 最大会话超时工夫 |
peerType | 角色类型-OBSERVER,PARTICIPANT |
clientPort | 客户端连贯端口 |
clientPortAddress | 客户端连贯Host |
snapRetainCount | 快照保留个数,最小为3 |
purgeInterval | 快照革除距离 |
server.sid | hostName:port(通信端口):electionPort(选举端口):peerType |
maxClientCnxns | 最大客户端连接数 |
拿到解析后的参数后,能够通过是否配置了server.id参数来决定是否集群启动还是单机启动,单机启动运行通过ZooKeeperServerMain#main办法启动,集群启动则还是在QuorumPeerMain#runFromConfig办法进行解决的,这里咱们就间接解说集群模式,因为集群模式比单机模式多了集群间的通信相干的解决,如Leader选举,数据同步,申请转发等.
public void runFromConfig(QuorumPeerConfig config) throws IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); quorumPeer = getQuorumPeer(); quorumPeer.setQuorumPeers(config.getServers()); quorumPeer.setTxnFactory(new FileTxnSnapLog( new File(config.getDataLogDir()), new File(config.getDataDir()))); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); quorumPeer.setClientPortAddress(config.getClientPortAddress()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if(quorumPeer.isQuorumSaslAuthEnabled()){ quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }
能够从代码片段中能够看出,新创建出了一个QuorumPeer对象,其实这就是OOP思维,以后实例代表着集群的一个节点,而后将QuorumPeerConfig从新设置给QuorumPeer对象,在这里呈现几个新的类:
类名 | 类形容 |
---|---|
FileTxnSnapLog | 长久化外围类别,包含快照,事务日志操作 |
ServerCnxnFactory 3 | 服务端网络解决外围类,其实现蕴含NIO和Netty两种实现 |
ZKDatabase | 内存操作外围类,通过树结构存储 |
在设置了参数之后,接下来调用了QuorumPeer#initialize办法,在这个办法里次要是一些鉴权类的对象实例化。外围还是QuorumPeer#start办法:
loadDataBase();//将数据从快照和事务日志加载到内存中 cnxnFactory.start(); //网络服务启动 startLeaderElection(); //选举工作筹备 super.start();
loadDataBase:
在这个办法里次要是通过委托给ZKDatabase#loadDataBase进行加载工作的
public long loadDataBase() throws IOException { long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; return zxid; }
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); //数据反序列化 return fastForwardFromEdits(dt, sessions, listener); }
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { //找到无效的100个快照文件,降序 List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; for (int i = 0; i < snapList.size(); i++) { snap = snapList.get(i); InputStream snapIS = null; CheckedInputStream crcIn = null; try { LOG.info("Reading snapshot " + snap); snapIS = new BufferedInputStream(new FileInputStream(snap)); crcIn = new CheckedInputStream(snapIS, new Adler32()); InputArchive ia = BinaryInputArchive.getArchive(crcIn); //真正序列化的中央 deserialize(dt,sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); //校验快照文件的完整性 if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } foundValid = true; break; } catch(IOException e) { LOG.warn("problem reading snap file " + snap, e); } finally { if (snapIS != null) snapIS.close(); if (crcIn != null) crcIn.close(); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } //快照文件命名为snapshot.lastZxid dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); return dt.lastProcessedZxid; }
在ZkDataBase里有一下几个外围属性:
表列 A | 表列 B |
---|---|
DataTree dataTree | 存储树结构 |
FileTxnSnapLog snapLog | 事务快照长久化类别 |
,ConcurrentHashMap<Long, Integer> sessionsWithTimeouts | 会话治理,sessionId |
在loadDataBase办法中,能够看到调用的snapLog#restore办法,进入到restore办法中能够看到调用到的是FileTxnSnapLog#deserialize进行反序化,而后保留到传入的dt,sessions参数中,能够定位到FileTxnSnapLog#deserialize(DataTree dt, Map<Long, Integer> sessions,
InputArchive ia)的这个重载办法来看下,如何对快照文件进行反序列化的:
public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException { FileHeader header = new FileHeader(); header.deserialize(ia, "fileheader"); if (header.getMagic() != SNAP_MAGIC) { throw new IOException("mismatching magic headers " + header.getMagic() + " != " + FileSnap.SNAP_MAGIC); }
首先通过文件输出流的包装类InputArchive进行读取,调用的是FileHeader#deserialize办法:
public void deserialize(InputArchive a_, String tag) throws java.io.IOException { a_.startRecord(tag); magic=a_.readInt("magic"); version=a_.readInt("version"); dbid=a_.readLong("dbid"); a_.endRecord(tag);}
FileHeader实现Record接口,其实前面所有须要的序列化和反序列化的都实现了这个接口,通过传进来的输出流对象来自定义本人的序列化和反序列化细节.
在这里能够看到FileHeader的存储构造为:
属性值 | 占用大小 | 形容 |
---|---|---|
magic | 4字节 | 魔法数字 |
version | 4字节 | 版本号 |
version | 8字节 | 数据库id |
通过FileHedare#deserialize办法后,曾经从文件流读取了16个字节,接下来调用的是 SerializeUtils#deserializeSnapshot(dt,ia,sessions)进行其余内容的加载,
public static void deserializeSnapshot(DataTree dt,InputArchive ia, Map<Long, Integer> sessions) throws IOException { //会话数量 int count = ia.readInt("count"); while (count > 0) { //会话id long id = ia.readLong("id"); //会话超时工夫 int to = ia.readInt("timeout"); sessions.put(id, to); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "loadData --- session in archive: " + id + " with timeout: " + to); } count--; } dt.deserialize(ia, "tree"); }
能够看到首先是从流外面读取了4个字节的count属性,也就是会话数量,接着再遍历读取了8个字节sessionId(会话id)和4个字节的timeout(会话超时工夫),再赋值个给了sessions(也就是ZkDataBase的sessionsWithTimeouts属性),最初调用的是DataTree#deserialize进行真正存储内容的反序列化工作:
public void deserialize(InputArchive ia, String tag) throws IOException { aclCache.deserialize(ia); nodes.clear(); pTrie.clear(); String path = ia.readString("path"); while (!path.equals("/")) { DataNode node = new DataNode(); ia.readRecord(node, "node"); nodes.put(path, node); synchronized (node) { aclCache.addUsage(node.acl); } int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1) { root = node; } else { String parentPath = path.substring(0, lastSlash); node.parent = nodes.get(parentPath); if (node.parent == null) { throw new IOException("Invalid Datatree, unable to find " + "parent " + parentPath + " of path " + path); } node.parent.addChild(path.substring(lastSlash + 1)); long eowner = node.stat.getEphemeralOwner(); if (eowner != 0) { HashSet<String> list = ephemerals.get(eowner); if (list == null) { list = new HashSet<String>(); ephemerals.put(eowner, list); } list.add(path); } } path = ia.readString("path"); } nodes.put("/", root); setupQuota(); aclCache.purgeUnused(); }
- 网络传输(NIO)
zookeeper与客户端建设连贯与申请与响应的数据传输都是通过ServerCnxnFactory这个类的实现类进行解决的,咱们这里间接通过NIO的实现类NIOServerCnxnFactory来进行解说,再QuorumPeer的start办法里咱们看到调用NIOServerCnxnFactory#start办法.
public void start() { // ensure thread is started once and only once if (thread.getState() == Thread.State.NEW) { thread.start(); } }
再start办法里咱们看到就简略调用了Thread#start办法启动线程.至于thread办法是在哪里进行初始化的,我能够定位到NIOServerCnxnFactory#configure办法里:
public void configure(InetSocketAddress addr, int maxcc) throws IOException { configureSaslLogin(); //初始化线程对象 thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr); thread.setDaemon(true); //设置最大连接数参数 maxClientCnxns = maxcc; //初始化Socket相干配置 this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT); }
选举
在进启动了网络传输服务之后,就开始筹备着选举前的一些筹备工作,咱们能够从QuorumPeer#start办法中的QuorumPeer#startLeaderElection()调用进行一个选举的切入点:synchronized public void startLeaderElection() { try {//设置初始化投票 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; break; } } if (myQuorumAddr == null) { throw new RuntimeException("My id " + myid + " not in the peer list"); } if (electionType == 0) { try { udpSocket = new DatagramSocket(myQuorumAddr.getPort()); //启动响应线程 responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } //依据配置的选举算法进行一些初始化工作 this.electionAlg = createElectionAlgorithm(electionType); }
从startLeaderElection这个办法中能够看出,次要是将初始化投票设置为本身,sid为本身serverId,zxid为通过快照和事务日志加载后的最大lastZxid,还有peerEpoch(选举年代)也就是以后本身的选举年代,而后就是启动了ReponseThread这个响应线程,外围逻辑还是在createElectionAlgorithm这个办法中,咱们能够跟进去看一下具体的代码逻辑:
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1://已过期 le = new AuthFastLeaderElection(this); break; case 2://已过期 le = new AuthFastLeaderElection(this, true); break; case 3://创立连贯管理器 qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ //启动监听其余节点的连贯申请 listener.start();//实例化疾速选举算法外围类 le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
从上述代码中,能够看出次要工作是实例化了一个QuorumCnxManager这个对象,也就是通过这个对象中的Listener这个类来解决和其余节点的连贯申请,调用了Listener#start办法理论是运行到了Listener#run办法代码中:
public void run() { int numRetries = 0; InetSocketAddress addr; while((!shutdown) && (numRetries < 3)){ try { //实例化ServerSocket ss = new ServerSocket(); ss.setReuseAddress(true); if (listenOnAllIPs) { int port = view.get(QuorumCnxManager.this.mySid) .electionAddr.getPort(); addr = new InetSocketAddress(port); } else { addr = view.get(QuorumCnxManager.this.mySid) .electionAddr; } LOG.info("My election bind port: " + addr.toString()); setName(view.get(QuorumCnxManager.this.mySid) .electionAddr.toString()); ss.bind(addr); while (!shutdown) { //阻塞期待其余节点申请连贯 Socket client = ss.accept(); setSockOpts(client); LOG.info("Received connection request " + client.getRemoteSocketAddress()); if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else { //承受申请外围逻辑 receiveConnection(client); } numRetries = 0; } } catch (IOException e) { LOG.error("Exception while listening", e); numRetries++; try { ss.close(); Thread.sleep(1000); } catch (IOException ie) { LOG.error("Error closing server socket", ie); } catch (InterruptedException ie) { LOG.error("Interrupted while sleeping. " + "Ignoring exception", ie); } } } LOG.info("Leaving listener"); if (!shutdown) { LOG.error("As I'm leaving the listener thread, " + "I won't be able to participate in leader " + "election any longer: " + view.get(QuorumCnxManager.this.mySid).electionAddr); } }
该办法次要是应用jdk的阻塞io与其余节点建设连贯,不理解的能够去自行补充一下jdk的socket编程基础知识,在第二个while循环中的ss.accept()代码是会始终阻塞期待其余节点申请连贯,当其余节点建设连贯后,就会返回一个Socket实例,而后将Socket实例传入receiveConnection办法中,而后咱们就能够和其余节点进行通信了,具体receiveConnection代码逻辑如下:
public void receiveConnection(final Socket sock) { DataInputStream din = null; try {//将输出流进行屡次包装 din = new DataInputStream( new BufferedInputStream(sock.getInputStream()));//真正解决连贯 handleConnection(sock, din); } catch (IOException e) { LOG.error("Exception handling connection, addr: {}, closing server connection", sock.getRemoteSocketAddress()); closeSocket(sock); } }
将io输出流包装后,进一步调用了handleConnection进行连贯的解决:
private void handleConnection(Socket sock, DataInputStream din) throws IOException { Long sid = null; try { // 阻塞期待另外一个节点发送建设申请的第一个包 //先读取8个字节,又可能sid(服务id),也有可能是protocolVersion(协定版本) sid = din.readLong();//读取到的是协定版本 if (sid < 0) {//进一步读取8个字节,就是真正的sid sid = din.readLong();//读取4个字节,也就是读取到的是残余的其余内容的字节数 int num_remaining_bytes = din.readInt();//进行字数校验 if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) { LOG.error("Unreasonable buffer length: {}", num_remaining_bytes); closeSocket(sock); return; } byte[] b = new byte[num_remaining_bytes]; //一次性将所有剩下的字节内容读取到b这个字节数组中 int num_read = din.read(b); if (num_read != num_remaining_bytes) { LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid); } } if (sid == QuorumPeer.OBSERVER_ID) { sid = observerCounter.getAndDecrement(); LOG.info("Setting arbitrary identifier to observer: " + sid); } } catch (IOException e) { closeSocket(sock); LOG.warn("Exception reading or writing challenge: " + e.toString()); return; } LOG.debug("Authenticating learner server.id: {}", sid); authServer.authenticate(sock, din); //如果读取的sid小于以后节点的sid,则敞开之前建设过的连贯 if (sid < this.mySid) { SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); } LOG.debug("Create new connection to server: " + sid); closeSocket(sock); //敞开之前的连贯后,由以后节点发动连贯申请 connectOne(sid); } else { //发送线程 SendWorker sw = new SendWorker(sock, sid); //承受线程 RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) vsw.finish(); senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY)); //启动发送线程 sw.start(); //启动承受线程 rw.start(); return; } }
从这段代码中能够看出,建设申请只能由sid大的一方发动,由sid小的一方承受,如当初有sid=1,sid=2,sid=3三个节点,那么只能由2这个节点发动连贯申请,1这个这个节点解决连贯申请.这样就保障了单方只放弃着一条连贯,因为Socket是全双工模式,反对单方进行通信.Socket能够通过ss.accept获取到,还能够通过以后办法的connectOne这个办法去和sid较小的节点进行连贯:
synchronized public void connectOne(long sid){//就是判断sendWorkerMap中是否蕴含了以后sid if (!connectedToPeer(sid)){ InetSocketAddress electionAddr; if (view.containsKey(sid)) { //拿到之前配置的server.id的选举地址 electionAddr = view.get(sid).electionAddr; } else { LOG.warn("Invalid server id: " + sid); return; } try { LOG.debug("Opening channel to server " + sid);//实例化Socket对象 Socket sock = new Socket(); setSockOpts(sock); //进行连贯 sock.connect(view.get(sid).electionAddr, cnxTO); LOG.debug("Connected to server " + sid); if (quorumSaslAuthEnabled) { initiateConnectionAsync(sock, sid); } else { //同步初始化连贯,也就是将以后本身的一些信息发送给其余节点 initiateConnection(sock, sid); } } catch (UnresolvedAddressException e) { LOG.warn("Cannot open channel to " + sid + " at election address " + electionAddr, e); if (view.containsKey(sid)) { view.get(sid).recreateSocketAddresses(); } throw e; } catch (IOException e) { LOG.warn("Cannot open channel to " + sid + " at election address " + electionAddr, e); if (view.containsKey(sid)) { view.get(sid).recreateSocketAddresses(); } } } else { LOG.debug("There is a connection already for server " + sid); } }
public void initiateConnection(final Socket sock, final Long sid) { try { startConnection(sock, sid); } catch (IOException e) { LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection", new Object[] { sid, sock.getRemoteSocketAddress() }, e); closeSocket(sock); return; } }
private boolean startConnection(Socket sock, Long sid) throws IOException { DataOutputStream dout = null; DataInputStream din = null; try { dout = new DataOutputStream(sock.getOutputStream()); //将本身sid发送给其余节点 dout.writeLong(this.mySid); dout.flush(); din = new DataInputStream( new BufferedInputStream(sock.getInputStream())); } catch (IOException e) { LOG.warn("Ignoring exception reading or writing challenge: ", e); closeSocket(sock); return false; } // authenticate learner authLearner.authenticate(sock, view.get(sid).hostname); if (sid > this.mySid) { LOG.info("Have smaller server identifier, so dropping the " + "connection: (" + sid + ", " + this.mySid + ")"); closeSocket(sock); // Otherwise proceed with the connection } else { //以下逻辑就和通过ss.accept拿到socket对象之后一样的逻辑 SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) vsw.finish(); senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY)); sw.start(); rw.start(); return true; } return false; }
从以上几个办法中能够看出,在通过ServerSocket.accpet和socket.connect拿到了Socket对象之后,实例化进去一个SendWorker和一个RecvWorker这个对象,并调用了各自的start办法去启动两个线程,其实就是通过这2个线程去实现和其余节点的申请和响应的数据传输工作,一个节点保护一个SendWorker、一个RecvWorker和通过queueSendMap来存储一个队列来进行通信的。
具体前面这3个对象是如何发挥作用的,会在选举细节中具体解说.实现这一系列的选举筹备工作后,咱们回到QuorumPeer#start办法中,接下来QuorumPeer#start办法调用super.start()办法,因为QuorumPeer这个对象继承了ZooKeeperThread,而ZooKeeperThread又继承了jdk的Thread类,所以调用了super.start之后,就会独自开拓一个线程去执行QuorumPeer#run办法,也就是真正进行选举的中央:
public void run() { setName("QuorumPeer" + "[myid=" + getId() + "]" + cnxnFactory.getLocalAddress()); LOG.debug("Starting quorum peer"); //1.jmx拓展点 try { jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(jmxQuorumBean, null); for(QuorumServer s: getView().values()){ ZKMBeanInfo p; if (getId() == s.id) { p = jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxLocalPeerBean = null; } } else { p = new RemotePeerBean(s); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxQuorumBean = null; } 2.//选举逻辑 try { /* * Main loop */ while (running) { switch (getPeerState()) { //1.Looking状态 case LOOKING: LOG.info("LOOKING"); //开启只读模式 if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer( logFactory, this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb); Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); setBCVote(null); setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception",e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); } } else { try { setBCVote(null); //调用ElectionAlg#lookForLeader办法,而后返回选举后的投票信息 setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; //选举完结,observer角色进如到此处 case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); setPeerState(ServerState.LOOKING); } break; //选举完结,Follower角色进入到此 case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; //选举完结,Leader角色进入到此 case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; } } } finally { LOG.warn("QuorumPeer main thread exited"); try { MBeanRegistry.getInstance().unregisterAll(); } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } jmxQuorumBean = null; jmxLocalPeerBean = null; } }
咱们能够从上诉代码中的MainLoop处开始看,进入while循环后,因为以后节点还是looking状态,苏所以进入到looking分支,在这个分支中能够看到首先判断以后节点是否是只读模式,因为以后不解说只读模式,所以间接进入到另外一个分支:
setBCVote(null); //调用ElectionAlg#lookForLeader办法,而后返回选举后的投票信息 setCurrentVote(makeLEStrategy().lookForLeader());
makeLEStrategy办法返回的其实就是咱们在QuorumPeer#startLeaderElection办法中实例话进去的FastLeaderElection实例,而后调用FastLeaderElection#lookForLeader办法进行Leader选举:
public Vote lookForLeader() throws InterruptedException { try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register( self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } if (self.start_fle == 0) { self.start_fle = Time.currentElapsedTime(); } try { HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); sendNotifications(); /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ if(manager.haveDelivered()){ sendNotifications(); } else { manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if(validVoter(n.sid) && validVoter(n.leader)) { /* * Only proceed if the vote comes from a replica in the * voting view for a replica in the voting view. */ switch (n.state) { case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if(n.electionEpoch == logicalclock.get()){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify * a majority is following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null; } finally { try { if(self.jmxLeaderElectionBean != null){ MBeanRegistry.getInstance().unregister( self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); } }
未完待续.......