共计 19231 个字符,预计需要花费 49 分钟才能阅读完成。
作者:京东物流 梁吉超
zookeeper 是一个分布式服务框架,次要解决分布式应用中常见的多种数据问题,例如集群治理,状态同步等。为解决这些问题 zookeeper 须要 Leader 选举进行保障数据的强一致性机制和稳定性。本文通过集群的配置,对 leader 选举源进行解析,让读者们理解如何利用 BIO 通信机制,多线程多层队列实现高性能架构。
01Leader 选举机制
Leader 选举机制采纳半数选举算法。
每一个 zookeeper 服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为 Leader,其它节点成为 Follower。
02Leader 选举集群配置
- 重命名 zoo_sample.cfg 文件为 zoo1.cfg,zoo2.cfg,zoo3.cfg,zoo4.cfg
- 批改 zoo.cfg 文件,批改值如下:
【plain】zoo1.cfg 文件内容:dataDir=/export/data/zookeeper-1
clientPort=2181
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo2.cfg 文件内容:dataDir=/export/data/zookeeper-2
clientPort=2182
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo3.cfg 文件内容:dataDir=/export/data/zookeeper-3
clientPort=2183
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo4.cfg 文件内容:dataDir=/export/data/zookeeper-4
clientPort=2184
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
- server. 第几号服务器(对应 myid 文件内容)=ip: 数据同步端口: 选举端口: 选举标识
- participant 默认参加选举标识,可不写. observer 不参加选举
4. 在 /export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper- 4 目录下创立 myid 文件,文件内容别离写 1,2,3,4,用于标识 sid(全称:Server ID)赋值。
- 启动三个 zookeeper 实例:
- bin/zkServer.sh start conf/zoo1.cfg
- bin/zkServer.sh start conf/zoo2.cfg
- bin/zkServer.sh start conf/zoo3.cfg
- 每启动一个实例,都会读取启动参数配置 zoo.cfg 文件,这样实例就能够晓得其作为服务端身份信息 sid 以及集群中有多少个实例参加选举。
03Leader 选举流程
图 1 第一轮到第二轮投票流程
前提:
设定票据数据格式 vote(sid,zxid,epoch)
- sid 是 Server ID 每台服务的惟一标识,是 myid 文件内容;
- zxid 是数据事务 id 号;
- epoch 为选举周期,为不便了解上面解说内容暂定为 1 首次选举,不写入上面内容里。
依照程序启动 sid=1,sid= 2 节点
第一轮投票:
- sid= 1 节点:初始选票为本人,将选票 vote(1,0)发送给 sid= 2 节点;
- sid= 2 节点:初始选票为本人,将选票 vote(2,0)发送给 sid= 1 节点;
- sid= 1 节点:收到 sid= 2 节点选票 vote(2,0)和以后本人的选票 vote(1,0),首先比对 zxid 值,zxid 越大代表数据最新,优先选择 zxid 最大的选票,如果 zxid 雷同,选举最大 sid。以后投票选举后果为 vote(2,0),sid= 1 节点的选票变为 vote(2,0);
- sid= 2 节点:收到 sid= 1 节点选票 vote(1,0)和以后本人的选票 vote(2,0), 参照上述选举形式,选举后果为 vote(2,0),sid= 2 节点的选票不变;
- 第一轮投票选举完结。
第二轮投票:
- sid= 1 节点:以后本人的选票为 vote(2,0), 将选票 vote(2,0)发送给 sid= 2 节点;
- sid= 2 节点:以后本人的选票为 vote(2,0), 将选票 vote(2,0)发送给 sid= 1 节点;
- sid= 1 节点:收到 sid= 2 节点选票 vote(2,0)和本人的选票 vote(2,0), 依照半数选举算法,总共 3 个节点参加选举,已有 2 个节点选举出雷同选票, 推举 sid= 2 节点为 Leader,本人角色变为 Follower;
- sid= 2 节点:收到 sid= 1 节点选票 vote(2,0)和本人的选票 vote(2,0),依照半数选举算法推举 sid= 2 节点为 Leader, 本人角色变为 Leader。
这时启动 sid= 3 节点后,集群里曾经选举出 leader,sid= 1 和 sid= 2 节点会将本人的 leader 选票发回给 sid= 3 节点,通过半数选举后果还是 sid= 2 节点为 leader。
3.1 Leader 选举采纳多层队列架构
zookeeper 选举底层次要分为选举应用层和音讯传输队列层,第一层应用层队列对立接管和发送选票,而第二层传输层队列,是依照服务端 sid 分成了多个队列,是为了防止给每台服务端发送音讯相互影响。比方对某台机器发送不胜利不会影响失常服务端的发送。
图 2 多层队列高低关系交互流程图
04 解析代码入口类
通过查看 zkServer.sh 文件内容找到服务启动类:
org.apache.zookeeper.server.quorum.QuorumPeerMain
05 选举流程代码解析
图 3 选举代码实现流程图
- 加载配置文件 QuorumPeerConfig.parse(path);
针对 Leader 选举要害配置信息如下:
- 读取 dataDir 目录找到 myid 文件内容,设置以后利用 sid 标识,做为投票人身份信息。上面遇到 myid 变量为以后节点本人 sid 标识。
-
- 设置 peerType 以后利用是否参加选举
- new QuorumMaj()解析 server. 前缀加载集群成员信息,加载 allMembers 所有成员,votingMembers参加选举成员,observingMembers 观察者成员,设置 half 值 votingMembers.size()/2.
【Java】public QuorumMaj(Properties props) throws ConfigException {for (Entry<Object, Object> entry : props.entrySet()) {String key = entry.getKey().toString();
String value = entry.getValue().toString();
// 读取集群配置文件中的 server. 结尾的利用实例配置信息
if (key.startsWith("server.")) {int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
QuorumServer qs = new QuorumServer(sid, value);
allMembers.put(Long.valueOf(sid), qs);
if (qs.type == LearnerType.PARTICIPANT)
// 利用实例绑定的角色为 PARTICIPANT 意为参加选举
votingMembers.put(Long.valueOf(sid), qs);
else {
// 观察者成员
observingMembers.put(Long.valueOf(sid), qs);
}
} else if (key.equals("version")) {version = Long.parseLong(value, 16);
}
}
// 过半基数
half = votingMembers.size() / 2;}
- QuorumPeerMain.runFromConfig(config) 启动服务;
- QuorumPeer.startLeaderElection() 开启选举服务;
- 设置以后选票 new Vote(sid,zxid,epoch)
【plain】synchronized public void startLeaderElection(){
try {if (getPeerState() == ServerState.LOOKING) {
// 首轮:以后节点默认投票对象为本人
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//........
}
- 创立选举治理类:QuorumCnxnManager;
- 初始化 recvQueue<Message(sid,ByteBuffer)> 接管投票队列(第二层传输队列);
- 初始化 queueSendMap<sid,queue> 按 sid 发送投票队列(第二层传输队列);
- 初始化 senderWorkerMap<sid,SendWorker> 发送投票工作线程容器,示意着与 sid 投票节点已连贯;
- 初始化选举监听线程类 QuorumCnxnManager.Listener。
【Java】//QuorumPeer.createCnxnManager()
public QuorumCnxManager(QuorumPeer self,
final long mySid,
Map<Long,QuorumPeer.QuorumServer> view,
QuorumAuthServer authServer,
QuorumAuthLearner authLearner,
int socketTimeout,
boolean listenOnAllIPs,
int quorumCnxnThreadsSize,
boolean quorumSaslAuthEnabled) {// 接管投票队列(第二层传输队列)
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
// 按 sid 发送投票队列(第二层传输队列)
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
// 发送投票工作线程容器,示意着与 sid 投票节点已连贯
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){this.cnxTO = Integer.parseInt(cnxToValue);
}
this.self = self;
this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;
initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
quorumSaslAuthEnabled);
// Starts listener thread that waits for connection requests
// 创立选举监听线程 接管选举投票申请
listener = new Listener();
listener.setName("QuorumPeerListener");
}
//QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = createCnxnManager();// new QuorumCnxManager(... new Listener())
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){listener.start();// 启动选举监听线程
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;}
- 开启选举监听线程 QuorumCnxnManager.Listener;
- 创立 ServerSockket 期待大于本人 sid 节点连贯,连贯信息存储到 senderWorkerMap<sid,SendWorker>;
- sid>self.sid 才能够连贯过去。
【Java】// 下面的 listener.start()执行后,抉择此办法
public void run() {
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
while((!shutdown) && (numRetries < 3)){
try {ss = new ServerSocket();
ss.setReuseAddress(true);
if (self.getQuorumListenOnAllIPs()) {int port = self.getElectionAddress().getPort();
addr = new InetSocketAddress(port);
} else {
// Resolve hostname for this server in case the
// underlying ip address has changed.
self.recreateSocketAddresses(self.getId());
addr = self.getElectionAddress();}
LOG.info("My election bind port:" + addr.toString());
setName(addr.toString());
ss.bind(addr);
while (!shutdown) {client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request"
+ client.getRemoteSocketAddress());
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
// authentication process may take few seconds to finish,
// this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {receiveConnectionAsync(client);
} else {
// 接管连贯信息
receiveConnection(client);
}
numRetries = 0;
}
} catch (IOException e) {if (shutdown) {break;}
LOG.error("Exception while listening", e);
numRetries++;
try {ss.close();
Thread.sleep(1000);
} catch (IOException ie) {LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping." +
"Ignoring exception", ie);
}
closeSocket(client);
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error("As I'm leaving the listener thread, "+"I won't be able to participate in leader"
+ "election any longer:"
+ self.getElectionAddress());
} else if (ss != null) {
// Clean up for shutdown.
try {ss.close();
} catch (IOException ie) {
// Don't log an error for shutdown.
LOG.debug("Error closing server socket", ie);
}
}
}
// 代码执行门路:receiveConnection()->handleConnection(...)
private void handleConnection(Socket sock, DataInputStream din)
throws IOException {
//... 省略
if (sid < self.getId()) {
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {sw.finish();
}
/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: {}", sid);
closeSocket(sock);
if (electionAddr != null) {connectOne(sid, electionAddr);
} else {connectOne(sid);
}
} else { // Otherwise start worker threads to receive data.
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {vsw.finish();
}
// 存储连贯信息 <sid,SendWorker>
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid,
new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
sw.start();
rw.start();}
}
- 创立 FastLeaderElection 疾速选举服务;
- 初始选票发送队列 sendqueue(第一层队列)
- 初始选票接管队列 recvqueue(第一层队列)
- 创立线程 WorkerSender
- 创立线程 WorkerReceiver
【Java】//FastLeaderElection.starter
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
// 发送队列 sendqueue(第一层队列)sendqueue = new LinkedBlockingQueue<ToSend>();
// 接管队列 recvqueue(第一层队列)recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}
//new Messenger(manager)
Messenger(QuorumCnxManager manager) {
// 创立线程 WorkerSender
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);
// 创立线程 WorkerReceiver
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);}
- 开启 WorkerSender 和 WorkerReceiver 线程。
WorkerSender 线程自旋获取 sendqueue 第一层队列元素
- sendqueue 队列元素内容为相干选票信息详见 ToSend 类;
- 首先判断选票 sid 是否和本人 sid 值雷同,相等间接放入到 recvQueue 队列中;
- 不雷同将 sendqueue 队列元素转储到 queueSendMap<sid,queue> 第二层传输队列中。
【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{
//...
public void run() {while (!stop) {
try {ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
// 将投票信息发送进来
process(m);
} catch (InterruptedException e) {break;}
}
LOG.info("WorkerSender is down");
}
}
//QuorumCnxManager#toSend
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
if (this.mySid == sid) {b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
// 转储到 queueSendMap<sid,queue> 第二层传输队列中
if (oldq != null) {addToSendQueue(oldq, b);
} else {addToSendQueue(bq, b);
}
connectOne(sid);
}
}
WorkerReceiver 线程自旋获取 recvQueue 第二层传输队列元素转存到 recvqueue 第一层队列中。
【Java】//WorkerReceiver
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try {
// 自旋获取 recvQueue 第二层传输队列元素
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
// The current protocol and two previous generations all send at least 28 bytes
if (response.buffer.capacity() < 28) {LOG.error("Got a short response:" + response.buffer.capacity());
continue;
}
//...
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
// 第二层传输队列元素转存到 recvqueue 第一层队列中
recvqueue.offer(n);
//...
}
}
//...
}
06 选举外围逻辑
- 启动线程 QuorumPeer
开始 Leader 选举投票 makeLEStrategy().lookForLeader();
sendNotifications()向其它节点发送选票信息,选票信息存储到 sendqueue 队列中。sendqueue 队列由 WorkerSender 线程解决。
【plain】//QuorunPeer.run
//...
try {reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();}
//makeLEStrategy().lookForLeader() 发送投票
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
//...
//FastLeaderElection.lookLeader
public Vote lookForLeader() throws InterruptedException {
//...
// 向其余利用发送投票
sendNotifications();
//...
}
private void sendNotifications() {
// 获取利用节点
for (long sid : self.getCurrentAndNextConfigVoters()) {QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){LOG.debug("Sending Notification:" + proposedLeader + "(n.leader), 0x" +
Long.toHexString(proposedZxid) + "(n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
"(n.round)," + sid + "(recipient)," + self.getId() +
"(myid), 0x" + Long.toHexString(proposedEpoch) + "(n.peerEpoch)");
}
// 贮存投票信息
sendqueue.offer(notmsg);
}
}
class WorkerSender extends ZooKeeperThread {
//...
public void run() {while (!stop) {
try {
// 提取已贮存的投票信息
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
process(m);
} catch (InterruptedException e) {break;}
}
LOG.info("WorkerSender is down");
}
//...
}
自旋 recvqueue 队列元素获取投票过去的选票信息:
【Java】public Vote lookForLeader() throws InterruptedException {
//...
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
// 提取投递过去的选票信息
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){if(manager.haveDelivered()){
// 已全副连贯胜利,并且前一轮投票都实现,须要再次发动投票
sendNotifications();} else {// 如果未收到选票信息,manager.contentAll()主动连贯其它 socket 节点
manager.connectAll();}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out:" + notTimeout);
}
//....
}
//...
}
【Java】//manager.connectAll()->connectOne(sid)->initiateConnection(...)->startConnection(...)
private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// Use BufferedOutputStream to reduce the number of IP packets. This is
// important for x-DC scenarios.
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// Sending id and challenge
// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {LOG.warn("Ignoring exception reading or writing challenge:", e);
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
}
// If lost the challenge, then drop the new connection
// 保障集群中所有节点之间只有一个通道连贯
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the" +
"connection: (" + sid + "," + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
如上述代码中所示,sid>self.sid 才能够创立连贯 Socket 和 SendWorker,RecvWorker 线程,存储到 senderWorkerMap<sid,SendWorker> 中。对应第 2 步中的 sid<self.sid 逻辑,保障集群中所有节点之间只有一个通道连贯。
图 4 节点之间连贯形式
【Java】public Vote lookForLeader() throws InterruptedException {
//...
if (n.electionEpoch > logicalclock.get()) {
// 以后选举周期小于选票周期,重置 recvset 选票池
// 大于以后周期更新以后选票信息,再次发送投票
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();} else if (n.electionEpoch < logicalclock.get()) {if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {// 雷同选举周期
// 接管的选票与以后选票 PK 胜利后,替换以后选票
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();}
//...
}
在上代码中,自旋从 recvqueue 队列中获取到选票信息。开始进行选举:
- 判断以后选票和接管过去的选票周期是否统一
- 大于以后周期更新以后选票信息,再次发送投票
- 周期相等:以后选票信息和接管的选票信息进行 PK
【Java】// 接管的选票与以后选票 PK
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id:" + newId + ", proposed id:" + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){return false;}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId)))));
}
在上述代码中的 totalOrderPredicate 办法逻辑如下:
- 竞选周期大于以后周期为 true
- 竞选周期相等,竞选 zxid 大于以后 zxid 为 true
- 竞选周期相等,竞选 zxid 等于以后 zxid,竞选 sid 大于以后 sid 为 true
- 通过上述条件判断为 true 将以后选票信息替换为竞选胜利的选票,同时再次将新的选票投出去。
【Java】public Vote lookForLeader() throws InterruptedException {
//...
// 存储节点对应的选票信息
// key: 选票起源 sid value: 选票推举的 Leader sid
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 半数选举开始
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);
break;
}
}
/*WorkerSender
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
// 已选举出 leader 更新以后节点是否为 leader
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
//...
}
/**
* Termination predicate. Given a set of votes, determines if have
* sufficient to declare the end of the election round.
*
* @param votes
* Set of votes
* @param vote
* Identifier of the vote received last PK 后的选票
*/
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
//votes 来源于 recvset 存储各个节点推举进去的选票信息
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
// 选举出的 sid 和其它节点抉择的 sid 雷同存储到 voteSet 变量中。if (vote.equals(entry.getValue())) {
// 保留推举进去的 sid
voteSet.addAck(entry.getKey());
}
}
// 判断选举进去的选票数量是否过半
return voteSet.hasAllQuorums();}
//QuorumMaj#containsQuorum
public boolean containsQuorum(Set<Long> ackSet) {return (ackSet.size() > half);
}
在上述代码中:recvset 是存储每个 sid 推举的选票信息。
第一轮 sid1:vote(1,0,1) ,sid2:vote(2,0,1);
第二轮 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。
最终通过选举信息 vote(2,0,1)为举荐 leader, 并用举荐 leader 在 recvset 选票池里比对持雷同票数量为 2 个。因为总共有 3 个节点参加选举,sid1 和 sid2 都选举 sid2 为 leader,满足票数过半要求,故确认 sid2 为 leader。
- setPeerState 更新以后节点角色;
- proposedLeader 选举进去的 sid 和本人 sid 相等,设置为 Leader;
- 上述条件不相等,设置为 Follower 或 Observing;
- 更新 currentVote 以后选票为 Leader 的选票 vote(2,0,1)。
07 总结
通过对 Leader 选举源码的解析,能够理解到:
- 多个利用节点之间网络通信采纳 BIO 形式进行互相投票,同时保障每个节点之间只应用一个通道,缩小网络资源的耗费,足以见得在 BIO 分布式中间件开发中的技术重要性。
- 基于 BIO 的根底上,灵活运用多线程和内存音讯队列完整实现多层队列架构,每层队列由不同的线程分工协作,进步疾速选举性能目标。
- 为 BIO 在多线程技术上的实际带来了贵重的教训。