public Vote lookForLeader() throws InterruptedException { try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register( self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } if (self.start_fle == 0) { self.start_fle = Time.currentElapsedTime(); } try { HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); sendNotifications(); /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ if(manager.haveDelivered()){ sendNotifications(); } else { manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if(validVoter(n.sid) && validVoter(n.leader)) { /* * Only proceed if the vote comes from a replica in the * voting view for a replica in the voting view. */ switch (n.state) { case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if(n.electionEpoch == logicalclock.get()){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify * a majority is following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null; } finally { try { if(self.jmxLeaderElectionBean != null){ MBeanRegistry.getInstance().unregister( self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); } }
选举过程:
proposedLeader(选中为leader的节点id); proposedZxid(选中为leader的lastZxid); proposedEpoch(选中为leader的以后年代);
1.所有节点首先将选票投给本人
2.调用sendNotifications办法,将本人以后选票发送给其余节点
private void sendNotifications() { for (QuorumServer server : self.getVotingView().values()) { long sid = server.id; ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } sendqueue.offer(notmsg); } }
这里能够看出其实就是将以后选票封装成ToSend构造体,而后放到sendqueue这个阻塞队列里,期待Sender线程(前面讲音讯流转时解说)从队列中拉取要发送的音讯,而后发送给其余节点.
3.而后所有节点进入到while循环中,直到选举实现,也就是PeerState!=Looking,咱们从while循环中能够看出首先是从recvqueue这个阻塞队列中取拉取其余节点发个过去的投票信息,recvqueue队列中存储的构造体为Notification:
属性 | 形容 |
---|---|
version | 单元 2 |
leader | 选举的leaderId |
zxid | 选举的leader的zxid |
electionEpoch | 以后选举轮次 |
state | 发送者的节点状态 |
sid | 发送者id |
peerEpoch | 选举的leader节点epoch |
此时须要对Notification内容有效性做一些校验,并做一些解决:
1)Notification为空,呈现这种状况须要判断其余节点的发送队列中的音讯是否某一个曾经全副发送进来,如果存在这种状况,则须要将本身的选票再次发送给所有其余节点。
2)判断发送者的sid和选举leader的sid是否在无效视图中,如果是有效的sid,则间接抛弃这张选票。
以后选票确定是无效时,须要对发送的节点状态进行一些独自的解决:
①发送者节点状态也为Looking时:
1)当electionEpoch>logicallock(以后选票的选举轮次大于以后节点的选举轮次):
则以后节点清空recvset(投票箱)中的所有选票,并判断以后选票是否优于本身,如果优于本身,则更新本身选票为以后选票的投票内容,否则就投票给本身.
如何判断选票优于以后选票:
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
- 首先时判断新选票的选举的leader的epoch是否大于以后节点选票的epoch (epoch越大阐明经验的选举次数越大,状态更新)
- 如果epoch雷同,则判断新选票的选举的leader的zxid是否大于以后节点选票的zxid(zxid越大,阐明节点的事务数据存储越多)
- 如果zxid雷同,则判断以后新选票选举的leader的sid是否大于以后节点选票的sid(sid判断策略则是作为兜底策略)
2)当electionEpoch<logicallock(以后选票的选举轮次小于以后节点的选举轮次)
呈现这种状况,阐明以后选票曾经不属于此次选举轮次中的选票,则间接疏忽掉此选票
3)当electionEpoch=logicallock(以后选票的选举轮次等于以后节点的选举轮次)
此时阐明,新选票和本身选票处于一个投票轮次中,则直接判断新选票是否优于以后选票,如果优于以后选票,则更新以后选票内容为新选票内容,并将本身新选举内容发送给其余节点.
最初将新选票放入到recvset投票箱中,并判断投票箱中的投票是否有超过一半曾经和本身的选票内容统一,如果未超过一半则再次从新进行下面选举流程,如果曾经达到一半,则进行最初的判断,把recvqueue中的投票信息全副取出来进行判断,判断是否还存在优与以后本身选票的投票音讯,如果有的话,则将以后选票从新放入recvqueue中,从新进行选举流程,没有的话则间接完结选举.
//判断投票箱中,是否曾经超过一半节点的投票内容和以后节点投票内容统一 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader//最初判断其余节点发送过去的投票信息是否还存在优于以后节点投票信息的音讯 while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue *///阐明队列音讯为空或队列中的投票音讯曾经不会对以后节点的投票内容产生扭转 if (n == null) {//变更以后节点状态,如果选举的leader时本身,则状态变更未Leading,否则未Folloing或Observing self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); //保留最终投票内容 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } }
②发送者节点状态也为Observing时:
Observer角色不参加选举过程,则来自Observer的选票间接疏忽掉.
③发送者节点状态也为Following和Leading时:
投票音讯流转过程:
在FastLeaderElection的构造方法中,会结构进去一个Messenger实例,该实例中会新开启2个线程,并始终轮询从FastLeaderElection#sendQuene中和QuorumCnxManager#recvQueue中拉取承受到的音讯和须要发送的音讯.
发送端:
FastLeaderElection发送选票(ToSend)
->存储在FastLeaderElection的sendQueue队列中
->FastLeaderElection.Messenger.WorkerSender#run轮询从sendQueue拉取音讯并转换成ByteBuffer
->QuorumCnxManager#toSend办法
1)如果要发送的sid为以后节点,则间接放入QuorumCnxManager#recvQueue承受队列中
2)放入QuorumCnxManager#queueSendMap()队列中,ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>
->QuorumCnxManager.SendWorker#run办法从queueSendMap中拉取要发送的音讯,通过Socket发送给其余节点,上一节,讲诉了和其余每个节点都保护了一个SendWorker线程
承受端:
->QuorumCnxManager.RecvWorker#run办法通过Socket阻塞读取其余节点发送的音讯,转换成Message音讯体,并放入QuorumCnxManager#recvQueue阻塞队列中,上一节,讲诉了和其余每个节点都保护了一个RecvWorker线程
->FastLeaderElection.Messenger.WorkerReceiver轮询通过QuorumCnxManager#pollRecvQueue办法从QuorumCnxManager#recvQueue队列中拉取音讯并转换成Notification,并Notification放入FastLeaderElection#recvqueue队列中.
->FastLeaderElection#lookForLeader办法从FastLeaderElection#recvqueue拉取选票信息并解决.