关于java:图解源码Zookeeper37源码剖析Session的管理机制Leader选举投票规则集群数据同步流程

36次阅读

共计 16694 个字符,预计需要花费 42 分钟才能阅读完成。

Zookeeper3.7 源码分析

能力指标

  • 把握 Zookeeper 中 Session 的管理机制
  • 能基于 Client 进行 Debug 测试 Session 创立 / 刷新操作
  • 能搭建 Zookeeper 集群源码配置
  • 把握集群环境下 Leader 选举启动过程
  • 能说出 Zookeeper 选举过程中的概念
  • 能说出 Zookeeper 选举投票规定
  • 能画出 Zookeeper 集群数据同步流程

1 Session 源码剖析

客户端创立 Socket 连贯后,会尝试连贯,如果连贯胜利胜利会调用到 primeConnection 办法用来发送 ConnectRequest 连贯申请,这里便是设置 session 会话,对于客户端创立会话咱们就不在这里做解说了,咱们间接解说服务端 Session 会话解决流程。

1.1 服务端 Session 属性剖析

Zookeeper 服务端会话操作如下图:

在服务端通过 SessionTrackerImplExpiryQueue来保留 Session 会话信息。

SessionTrackerImpl有以下属性:

1:sessionsById 用来存储 ConcurrentHashMap<Long, SessionImpl> {sessionId:SessionImpl} 2:sessionExpiryQueue ExpiryQueue<SessionImpl> 生效队列
3:sessionsWithTimeout ConcurrentMap<Long, Integer> 存储的是{sessionId: sessionTimeout} 
4:nextSessionId 下一个 sessionId

ExpiryQueue生效队列有以下属性:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session 实例对象,生效工夫。2:expiryMap ConcurrentHashMap<Long, Set<E>> 存储的是{time: set<SessionImp>} 生效工夫,以后生效工夫的 Session 对象汇合。3:nextExpirationTime 下一次生效工夫 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 以后零碎工夫毫秒值 ms=System.nanoTime() / 1000000。nextExpirationTime= 以后零碎工夫毫秒值 +expirationInterval(生效距离)。4:expirationInterval 生效距离,默认是 10s,能够通过 sessionlessCnxnTimeout 批改。即是通过配置文件的 tickTime 批改。

1.2 Session 创立

咱们接着上一章的案例持续剖析,如果客户端发动申请后,后端如何辨认是第一次创立申请?在之前的案例源码 NIOServerCnxn.readPayload() 中有所体现,NIOServerCnxn.readPayload()局部要害源码如下:

此时如果 initialized=false,示意第一次连贯 须要创立Session(createSession),此处调用readConnectRequest() 后,在 readConnectRequest() 办法中会将 initialized 设置为 true,只有在解决完连贯申请之后才会把initialized 设置为true,才能够解决客户端其余命令。

下面办法还调用了 processConnectRequest 解决连贯申请, processConnectRequest 第一次从申请中获取的 sessionId=0, 此时会把创立Session 作为一个业务,会调用 createSession() 办法,processConnectRequest 办法局部要害代码如下:

创立会话调用 createSession(),该办法会首先创立一个 sessionId,并把该 sessionId 作为会话 ID 创立一个创立 session 会话的申请,并将该申请交给业务链作为一个业务解决,createSession() 源码如下:

下面办法用到的 sessionTracker.createSession(timeout) 做了 2 个操作别离是创立 sessionId 和配置 sessionId 的跟踪信息,办法源码如下:

会话信息的跟踪其实就是将会话信息增加到队列中,任何中央能够依据会话 ID 找到会话信息,trackSession办法实现了 Session 创立、Session 队列存储、Session过期队列存储,trackSession办法源码如下:

PrepRequestProcessorrun办法中调用pRequest2Txn,要害代码如下:

SyncRequestProcessor 对 txn(创立 session 的操作)进行长久化,在 FinalRequestProcessor 会对 Session 进行提交,其实就是把 Session 的 ID 和 Timeout 存到 sessionsWithTimeout 中去。

因为 FinalRequestProcessor 中调用链路太简单,咱们把调用链路写进去,大家能够依照这个程序跟踪:

1:FinalRequestProcessor.applyRequest()
        办法代码:ProcessTxnResult rc = zks.processTxn(request);
        
2:ZooKeeperServer.processTxn(org.apache.zookeeper.server.Request)
        办法代码:processTxnForSessionEvents(request, hdr, request.getTxn());

下面调用链路中 processTxnForSessionEvents(request, hdr, request.getTxn()); 办法代码如下:

下面办法次要解决了 OpCode.createSession 并且将 sessionId、TimeOut 提交到 sessionsWithTimeout 中,而提交到 sessionsWithTimeout 的办法 SessionTrackerImpl.commitSession() 代码如下:

1.3 Session 刷新

服务端无论承受什么申请命令 (增删或 ping 等申请) 都会更新 Session 的过期工夫。咱们做增删或者 ping 命令的时候,都会通过 RequestThrottlerRequestThrottler 的 run 办法中调用 zks.submitRequestNow(),而zks.submitRequestNow(request) 中调用了touch(si.cnxn);,该办法源码如下:

touchSession()办法更新 sessionExpiryQueue 生效队列中的生效工夫,源码如下:

update()办法会在以后工夫的根底上减少 timeout,并更新生效工夫为 newExpiryTime,要害源码如下:

1.4 Session 过期

SessionTrackerImpl是一个线程类,继承了ZooKeeperCriticalThread,咱们能够看它的 run 办法,它首先获取了下一个会话过期工夫,并休眠期待会话过期工夫到期,而后获取过期的客户端会话汇合并循环敞开,源码如下:

下面办法中调用了sessionExpiryQueue.poll(),该办法代码次要是获取过期工夫对应的客户端会话汇合,源码如下:

下面的 setSessionClosing() 办法其实是把 Session 会话的 isClosing 状态设置为了 true, 办法源码如下:

而让客户端生效的办法 expirer.expire(s); 其实也是一个业务操作,次要调用了 ZooKeeperServer.expire() 办法,而该办法获取 SessionId 后,又创立了一个 OpCode.closeSession 的申请,并交给业务链解决,咱们查看 ZooKeeperServer.expire() 办法源码如下:

PrepRequestProcessor.pRequest2Txn() 办法中 OpCode.closeSession 操作里最初局部代理明确将会话 Session 的 isClosing 设置为了 true,源码如下:

业务链解决对象 FinalRequestProcessor.processRequest() 办法调用了 ZooKeeperServer.processTxn(),并且在processTxn() 办法中执行了 processTxnForSessionEvents,而processTxnForSessionEvents() 办法正好移除了会话信息,办法源码如下:

移除会话的办法 SessionTrackerImpl.removeSession() 会移除会话 ID 以及过期会话对象,源码如下:

1.5 Zookeeper 会话测试

为了让 Zookeeper 的会话了解更粗浅,咱们对会话流程做一个测试,首先测试会话创立,再测试会话刷新。

1)会话创立测试

咱们关上 NIOServerCnxn.readPayload() 办法,跟踪首次创立会话,调试状况如下:

此时会建设近程连贯并创立 SessionID,咱们调试到 NIOServerCnxn.readConnectRequest() 办法,此时建设链接,并且失去的 sessionId=0。

当 sessionId= 0 时,会执行 Session 创立,Session 创立会调用 SessionTrackerImpl.createSession() 办法实现会话创立,并将会话存入跟踪队列,DEBUG 测试如下:

会话创立代码如下:

跟踪测试后,控制台输入如下信息:

AcceptThread---------- 链接服务的 IP:127.0.0.1
1:会话未连贯,筹备首次连贯会话.....
2: 建设近程连贯......
2:第 1 次连贯的 sessionId=0
应用 SessionTrackerImpl 创立会话,并将会话退出跟踪队列中
3:sessionId=0, 此时创立 sessionId=72061099907219458

2)会话刷新测试

咱们执行 get /zookeeper 指令,而后首先跟踪到 RequestThrottler.run() 办法,执行如下:

执行程序达到ZooKeeperServer.touch(),行将开始筹备刷新会话了,咱们测试成果如下:

调用 SessionTrackerImpl.touchSession() 的时候会先判断会话是否为空、会话是否曾经敞开,如果都没有,才执行刷新会话操作,DEBUG 跟踪如下:

刷新会话其实就是会话工夫减少,减少会话工夫 DEBUG 跟踪如下:

测试后成果如下:

a. 以后申请并未过期,不须要删除,筹备刷新会话
b. 筹备调用 SessionTrackerImpl.touchSession()刷新会话
c. 会话不为空,会话也未敞开,筹备调用 updateSessionExpiry()刷新会话
d. 残余过期工夫:54572178, 减少过期工夫:30000, 刷新会话后过期工夫:54604000

2 Zookeeper 集群启动流程

咱们先搭建 Zookeeper 集群,再来剖析选举算法。

2.1 Zookeeper 集群配置

如上图:

1: 创立 zoo1.cfg、zoo2.cfg、zoo3.cfg
2: 创立 zkdata1、zkdata2、zkdata3
3: 创立 3 个 myid,值别离为 1、2、3

配置 3 个启动类,如下图:

2.2 集群启动流程剖析

如上图,上图是 Zookeeper 单机 / 集群启动流程,每个细节所做的事件都在上图有阐明,咱们接下来依照流程图对源码进行剖析。

程序启动,运行流程启动集群模式,如下图:

quorumPeer.start()启动服务,如下代码:

quorumPeer.start()办法代码如下:

quorumPeer.start()办法启动的次要步骤:

1:loadDataBase()加载数据。2:startServerCnxnFactory 用来开启 acceptThread、SelectorThread 和 workerPool 线程池。3: 开启 Leader 选举 startLeaderElection。4: 开启 JVM 监控线程 startJvmPauseMonitor。5: 调用父类 super.start(); 进行 Leader 选举。

startLeaderElection()开启 Leader 选举办法做了 2 件事,首先创立初始化选票选本人,接着创立选举投票形式,源码如下:

createElectionAlgorithm()创立选举算法只有第 3 种,其余 2 种均已废除,办法源码如下:

这个办法创立了以下三个对象:

①、创立 QuorumCnxManager 对象

②、QuorumCnxManager.Listener

③、FastLeaderElection

3 Zookeeper 集群 Leader 选举

3.1 Paxos 算法介绍

Zookeeper 选举次要依赖于 FastLeaderElection 算法,其余算法均已淘汰,但 FastLeaderElection 算法又是典型的 Paxos 算法,所以咱们要先学习下 Paxos 算法,这样更有助于把握 FastLeaderElection 算法。

1)Paxos 介绍

分布式事务中常见的事务模型有 2PC 和 3PC,无论是 2PC 提交还是 3PC 提交都无奈彻底解决分布式的一致性问题以及无奈解决太过激进及容错性不好。Google Chubby 的作者 Mike Burrows 说过,世上只有一种一致性算法,那就是 Paxos,所有其余一致性算法都是 Paxos 算法的不完整版。Paxos 算法是公认的艰涩,很难讲清楚,然而工程上也很难实现,所以有很多 Paxos 算法的工程实现,如 Chubby,Raft,ZAB,微信的 PhxPaxos 等。这一篇会介绍这个公认为难于了解然而卓有成效的 Paxos 算法。Paxos 算法是莱斯利·兰伯特(Leslie Lamport)1990 年提出的一种基于消息传递的一致性算法,它曾就此发表了《The Part-Time Parliament》,《Paxos Made Simple》,因为采纳故事的形式来解释此算法,感觉还是很难了解。

2)Paxos 算法背景
Paxos 算法是基于消息传递且具备高度容错个性的一致性算法,是目前公认的解决分布式一致性问题最无效的算法之一,其解决的问题就是在分布式系统中如何就某个值(决定)达成统一。
面试的时候:不要把这个 Paxos 算法达到的目标和分布式事务分割起来,而是针对 Zookeeper 这样的 master-slave 集群对某个决定达成统一,也就是正本之间写或者 leader 选举达成统一。我感觉这个算法和广义的分布式事务不是一样的。
在常见的分布式系统中,总会产生诸如机器宕机或网络异样(包含音讯的提早、失落、反复、乱序,还有网络分区)(也就是会产生异样的分布式系统)等状况。Paxos 算法须要解决的问题就是如何在一个可能产生上述异样的分布式系统中,疾速且正确地在集群外部对某个数据的值达成统一。也能够了解成分布式系统中达成状态的一致性。

3)Paxos 算法了解

Paxos 算法是分布式一致性算法用来解决一个分布式系统如何就某个值 (决定) 达成统一的问题。一个典型的场景是,在一个分布式数据库系统中,如果各节点的初始状态统一,每个节点都执行雷同的操作序列,那么他们最初能失去一个统一的状态。为保障每个节点执行雷同的命令序列,须要在每一条指令上执行一个”一致性算法”以保障每个节点看到的指令统一。
分布式系统中个别是通过多副原本保障可靠性,而多个正本之间会存在数据不统一的状况。所以必须有一个一致性算法来保证数据的统一,形容如下:
  如果在分布式系统中初始是各个节点的数据是统一的,每个节点都程序执行系列操作,而后每个节点最终的数据还是统一的。
  Paxos 算法就是解决这种分布式场景中的一致性问题。对于个别的开发人员来说,只须要晓得 paxos 是一个分布式选举算法 即可。多个节点之间存在两种通信模型:共享内存(Shared memory)、消息传递(Messages passing),Paxos 是基于消息传递的通信模型的。

4)Paxos 相干概念

在 Paxos 算法中,有三种角色:

  • Proposer
  • Acceptor
  • Learners

在具体的实现中,一个过程可能同时充当多种角色。比方一个过程可能既是 Proposer 又是 Acceptor 又是 Learner。Proposer 负责提出提案,Acceptor 负责对提案作出裁决(accept 与否),learner 负责学习提案后果。
还有一个很重要的概念叫提案(Proposal)。最终要达成统一的 value 就在提案里。只有 Proposer 发的提案被 Acceptor 承受(半数以上的 Acceptor 批准才行),Proposer 就认为该提案里的 value 被选定了。Acceptor 通知 Learner 哪个 value 被选定,Learner 就认为那个 value 被选定。只有 Acceptor 承受了某个提案,Acceptor 就认为该提案里的 value 被选定了。
为了防止单点故障,会有一个 Acceptor 汇合,Proposer 向 Acceptor 汇合发送提案,Acceptor 汇合中的每个成员都有可能批准该提案且每个 Acceptor 只能批准一个提案,只有当一半以上的成员批准了一个提案,就认为该提案被选定了。

3.2 QuorumPeer 工作流程

QuorumCnxManager:每台服务器在启动的过程中,会启动一个QuorumPeer,负责各台服务器之间的底层 Leader 选举过程中的网络通信对应的类就是QuorumCnxManager

Zookeeper对于每个节点 QuorumPeer 的设计相当的灵便,QuorumPeer次要包含四个组件:客户端申请接收器(ServerCnxnFactory)、数据引擎(ZKDatabase)、选举器(Election)、外围性能组件(Leader/Follower/Observer)。

1:ServerCnxnFactory 负责保护与客户端的连贯(接管客户端的申请并发送相应的响应);(1001 行)2:ZKDatabase 负责存储 / 加载 / 查找数据(基于目录树结构的 KV+ 操作日志 + 客户端 Session);(129 行)3:Election 负责选举集群的一个 Leader 节点;(998 行)4:Leader/Follower/Observer 确认是 QuorumPeer 节点应该实现的外围职责;(1270 行)

QuorumPeer工作流程比较复杂,如下图:

QuorumPeer 工作流程:

1: 初始化配置
2: 加载以后存在的数据
3: 启动网络通信组件
4: 启动控制台
5: 开启选举协调者,并执行选举(这个过程是会继续,并不是一次操作就完结了)

3.3 QuorumCnxManager 源码剖析

QuorumCnxManager外部保护了一系列的队列,用来保留接管到的、待发送的音讯以及音讯的发送器,除接管队列以外,其余队列都依照 SID 分组造成队列汇合,如一个集群中除了本身还有 3 台机器,那么就会为这 3 台机器别离创立一个发送队列,互不烦扰。

QuorumCnxManager.Listener:为了可能互相投票,Zookeeper 集群中的所有机器都须要建设起网络连接。QuorumCnxManager 在启动时会创立一个 ServerSocket 来监听 Leader 选举的通信端口。开启监听后,Zookeeper 可能一直地接管到来自其余服务器地创立连贯申请,在接管到其余服务器地 TCP 连贯申请时,会进行解决。为了防止两台机器之间反复地创立 TCP 连贯,Zookeeper 只容许 SID 大的服务器被动和其余机器建设连贯,否则断开连接。在接管到创立连贯申请后,服务器通过比照本人和近程服务器的 SID 值来判断是否接管连贯申请,如果以后服务器发现自己的 SID 更大,那么会断开以后连贯,而后本人被动和近程服务器将连贯(本人作为“客户端”)。一旦连贯建设,就会依据近程服务器的 SID 来创立相应的音讯发送器 SendWorker 和音讯发送器 RecvWorker,并启动。

QuorumCnxManager.Listener监听启动能够查看 QuorumCnxManager.Listenerrun办法,源代码如下,能够断点调试看到此时监听的正是咱们所说的投票端口:

下面是监听器,各个服务之间进行通信咱们须要开启 ListenerHandler 线程,在 QuorumCnxManager.Listener.ListenerHandler 的 run 办法中有一个办法 acceptConnections() 调用,该办法就是用于承受每次选举投票的信息,如果只有一个节点或者没有投票信息的时候,此时办法会阻塞,一旦执行选举,程序会往下执行,咱们能够先启动 1 台服务,再启动第 2 台、第 3 台,此时会收到有客户端参加投票链接,程序会往下执行,源码如下:

咱们启动 2 台服务,成果如下:

下面尽管能证实投票拜访了以后监听的端口,但怎么晓得是哪台服务呢?咱们能够沿着 receiveConnection() 源码持续钻研,源码如下:

receiveConnection()办法只是获取了数据流,并没做非凡解决,并且调用了 handleConnection() 办法,该办法源码如下:

通过网络连接获取数据 sid,获取 sid 示意是哪一台连过去的,咱们能够打印输出 sid,测试输入如下数据:

参加投票的 MyID=2
参加投票的 MyID=3

3.4 FastLeaderElection 算法源码剖析

Zookeeper 集群中,次要分为三者角色,而每一个节点同时只能表演一种角色,这三种角色别离是:

(1)Leader 承受所有 Follower 的提案申请并对立协调发动提案的投票,负责与所有的 Follower 进行外部的数据交换(同步);

(2)Follower 间接为客户端提供服务并参加提案的投票,同时与 Leader 进行数据交换(同步);

(3)Observer 间接为客户端服务但并不参加提案的投票,同时也与 Leader 进行数据交换(同步);

FastLeaderElection 选举算法是规范的 Fast Paxos 算法实现,可解决 LeaderElection 选举算法收敛速度慢的问题。

创立 FastLeaderElection 只须要 new FastLeaderElection() 即可,如下代码:

创立 FastLeaderElection 会调用 starter() 办法,该办法会创立 sendqueuerecvqueue 队列、Messenger对象,其中 Messenger 对象的作用十分要害,办法源码如下:

创立 Messenger 的时候,会创立 WorkerSender 并封装成 wsThread 线程,创立 WorkerReceiver 并封装成 wrThread 线程,看名字就很容易了解,wsThread用于发送数据,wrThread用于接收数据,Messenger创立源码如下:

创立完 FastLeaderElection 后接着会调用它的 start() 办法启动选举算法,代码如下:

启动选举算法会调用 start()办法,start()办法如下:

public void start() {this.messenger.start();
}

下面会执行 messager.start(),也就是如下办法,也就意味着wsThreadwrThread线程都将启动,源码如下:

void start() {this.wsThread.start();
    this.wrThread.start();}

wsThreadWorkerSender 封装而来,此时会调用 WorkerSenderrun办法,run 办法会调用 process() 办法,源码如下:

process办法调用了 managertoSend办法,此时是把对应的 sid 作为了音讯发送进来,这里其实是发送投票信息,源码如下:

void process(ToSend m) {ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
    manager.toSend(m.sid, requestBuffer);
}

投票能够投本人,也能够投他人,如果是选票选本人,只须要把投票信息增加到 recvQueue 中即可,源码如下:

WorkerReceiver.run 办法中会从 recvQueue 中获取 Message,并把发送给其余服务的投票封装到sendqueue 队列中,交给 WorkerSender 发送解决,源码如下:

3.5 Zookeeper 选举投票分析

选举是个很简单的过程,要思考很多场景,而且选举过程中有很多概念须要了解。

3.5.1 选举概念

1)ZK 服务状态:

public enum ServerState {
    // 代表没有以后集群中没有 Leader, 此时是投票选举状态
    LOOKING,  
    // 代表曾经是随同者状态
    FOLLOWING,
    // 代表曾经是领导者状态
    LEADING,
    // 代表曾经是观察者状态(观察者不参加投票过程)OBSERVING
}

2)服务角色:

//Learner 是随从服务和观察者的统称
public enum LearnerType {
    // 随从者角色
    PARTICIPANT,
    // 观察者角色
    OBSERVER
}

3)投票音讯播送:

public static class Notification {
    int version;
    
    // 被举荐 leader 的 ID
     long leader;
    
      // 被举荐 leader 的 zxid
      long zxid;
     
     // 投票轮次
     long electionEpoch;
     
     // 以后投票者的服务状态(LOOKING)QuorumPeer.ServerState state;
     // 以后投票者的 ID
     long sid;
     //QuorumVerifier 作为集群验证器,次要实现判断一组 server 在
     // 已给定的配置的 server 列表中,是否可能形成集群
     QuorumVerifier qv;
     
     // 被举荐 leader 的投票轮次
     long peerEpoch;
    
}

4)选票模型:

public class Vote {
    // 投票版本号,作为一个标识 
    private final int version;
    // 以后服务的 ID
    private final long id;
    // 以后服务事务 ID
    private final long zxid;
    // 以后服务投票的轮次
    private final long electionEpoch;
    // 被推举服务器的投票轮次
    private final long peerEpoch;
    // 以后服务器所处的状态
    private final ServerState state;

}

5)音讯发送对象:

public static class ToSend {
    // 反对的音讯类型
    enum mType {
        crequest, // 申请
        challenge, // 确认
        notification,// 告诉
        ack // 确认回执
    }
   
    ToSend(mType type, long leader, long zxid, long electionEpoch, ServerState state, long sid, long peerEpoch, byte[] configData) {

        this.leader = leader;
        this.zxid = zxid;
        this.electionEpoch = electionEpoch;
        this.state = state;
        this.sid = sid;
        this.peerEpoch = peerEpoch;
        this.configData = configData;
    }

    /*
     * Proposed leader in the case of notification
     * 被投票推举为 leader 的服务 ID 
     */ long leader;

    /*
     * id contains the tag for acks, and zxid for notifications
     * 
     */ long zxid;

    /*
     * Epoch
     * 投票轮次
     */ long electionEpoch;

    /*
     * Current state;
     * 服务状态
     */ QuorumPeer.ServerState state;

    /*
     * Address of recipient
     * 音讯接管方服务 ID
     */ long sid;

    /*
     * Used to send a QuorumVerifier (configuration info)
     */ byte[] configData = dummyData;

    /*
     * Leader epoch
     */ long peerEpoch;

}
3.5.2 选举过程

QuorumPeer 自身是个线程,在集群启动的时候会执行 quorumPeer.start();,此时会调用它重写的start() 办法,最初会调用父类的 start() 办法,所以该线程会启动执行,因而会执行它的 run 办法,而 run 办法正是选举流程的入口,咱们看 run 办法要害源码如下:

所有节点初始状态都为 LOOKING,会进入到选举流程,选举流程首先要获取算法,获取算法的办法是 makeLEStrategy(),该办法返回的是FastLeaderElection 实例,外围选举流程是 FastLeaderElection 中的 lookForLeader() 办法。

/****
 * 获取选举算法
 */
@SuppressWarnings("deprecation")
protected Election makeLEStrategy() {return electionAlg;}

lookForLeader()是选举过程的要害流程,源码剖析如下:

下面多个中央都用到了过半数以上的办法 hasAllQuorums() 该办法用到了 QuorumMaj 类,代码如下:

QuorumMaj构造函数中体现了过半数以上的操作,代码如下:

3.5.3 投票规定

咱们来看一下选票 PK 的办法totalOrderPredicate(),该办法其实就是 Leader 选举规定,规定有如下三个:

1: 比拟 epoche(zxid 高 32bit),如果其余节点的 epoche 比本人的大,选举 epoch 大的节点(理由:epoch 示意年代,epoch 越大示意数据越新)代码:(newEpoch > curEpoch);2: 比拟 zxid,如果 epoche 雷同,就比拟两个节点的 zxid 的大小,选举 zxid 大的节点(理由:zxid 示意节点所提交事务最大的 id,zxid 越大代表该节点的数据越残缺)代码:(newEpoch == curEpoch) && (newZxid > curZxid);3: 比拟 serviceId,如果 epoch 和 zxid 都相等,就比拟服务的 serverId,选举 serviceId 大的节点(理由:serviceId 示意机器性能,他是在配置 zookeeper 集群时确定的,所以咱们配置 zookeeper 集群的时候能够把服务性能更高的集群的 serverId 设置大些,让性能好的机器负责 leader 角色)代码:(newEpoch == curEpoch) && ((newZxid == curZxid) && (newId > curId))。

源码如下:

4 Zookeeper 集群数据同步

所有事务操作都将由 leader 执行,并且会把数据同步到其余节点,比方 follower、observer,咱们能够剖析 leader 和 follower 的操作行为即可剖析出数据同步流程。

4.1 Zookeeper 同步流程阐明

整体流程:

1: 当角色确立之后,leader 调用 leader.lead(); 办法运行,创立一个接管连贯的 LearnerCnxAcceptor 线程,在 LearnerCnxAcceptor 线程外部又建设一个阻塞的 LearnerCnxAcceptorHandler 线程期待 Learner 端的连贯。Learner 端以 follower 为例,follower 调用 follower.followLeader(); 办法首先查找 leader 的 Socket 服务端,而后建设连贯。当 follower 建设连贯后,leader 端会建设一个 LearnerHandler 线程绝对应,用来解决 follower 与 leader 的数据包传输。2:follower 端封装以后 zk 服务器的 Zxid 和 Leader.FOLLOWERINFO 的 LearnerInfo 数据包发送给 leader

3:leader 端这时处于 getEpochToPropose 办法的阻塞期间,须要失去 Learner 端超过一半的服务器发送 Epoch

4:getEpochToPropose 解阻塞之后,LearnerHandler 线程会把超过一半的 Epoch 与 leader 比拟失去最新的 newLeaderZxid,并封装成 Leader.LEADERINFO 包发送给 Learner 端

5:Learner 端失去最新的 Epoch,会更新以后服务器的 Epoch。并把以后服务器所处的 lastLoggedZxid 地位封装成 Leader.ACKEPOCH 发送给 leader

6: 此时 leader 端处于 waitForEpochAck 办法的阻塞期间,须要失去 Learner 端超过一半的服务器发送 EpochACK

7: 当 waitForEpochAck 阻塞之后便能够在 LearnerHandler 线程内决定用那种形式进行同步。如果 Learner 端的 lastLoggedZxid>leader 端的,Learner 端将会被删除多余的局部。如果小于 leader 端的,将会以不同形式进行同步 

8:leader 端发送 Leader.NEWLEADER 数据包给 Learner 端(6、7 步骤都是另开一个线程来发送这些数据包)9:Learner 端同步之后,会在一个 while 循环内解决各种 leader 端发送数据包,包含两阶段提交的 Leader.PROPOSAL、Leader.COMMIT、Leader.INFORM 等。在同步数据后会解决 Leader.NEWLEADER 数据包,而后发送 Leader.ACK 给 leader 端 

10: 此时 leader 端处于 waitForNewLeaderAck 阻塞期待超过一半节点发送 ACK。

咱们回到 QuorumPeer.run() 办法,依据确认的不同角色执行不同操作开展剖析。

4.2 Zookeeper Follower 同步流程

Follower 次要连贯 Leader 实现数据同步,咱们看看 Follower 做的事,咱们依然沿着 QuorumPeer.run()开展学习,要害代码如下:

创立 Follower 的办法比较简单,代码如下:

咱们看一下整个 Follower 在数据同步中做的所有操作follower.followLeader();,源码如下图:

下面源码中的 follower.followLeader() 办法次要做了如下几件事:

1: 寻找 Leader
2: 和 Leader 创立链接
3: 向 Leader 注册 Follower,会将以后 Follower 节点信息发送给 Leader 节点
4: 和 Leader 同步历史数据
5: 读取 Leader 发送的数据包
6: 同步 Leader 数据包

咱们对 follower.followLeader() 调用的其余办法进行分析,其中 findLeader() 是寻找以后 Leader 节点的,源代码如下:

followLeader()中调用了 registerWithLeader(Leader.FOLLOWERINFO); 该办法是向 Leader 注册 Follower,会将以后 Follower 节点信息发送给 Leader 节点,Follower 节点信息发给 Leader 是必须的,是 Leader 同步数据个根底,源码如下:

followLeader()中最初读取数据包执行同步的办法中调用了readPacket(qp);,这个办法就是读取 Leader 的数据包的封装,源码如下:

4.3 Zookeeper Leader 同步流程

咱们查看 QuorumPeer.run() 办法的 LEADING 局部,能够看到先创立了 Leader 对象,并设置了 Leader,而后调用了 leader.lead()leader.lead() 是执行的外围业务流程,源码如下:

leader.lead()办法是 Leader 执行的外围业务流程,源码如下:

leader.lead()办法会执行如下几个操作:

1: 从快照和事务日志中加载数据
2: 创立一个线程,接管 Follower/Observer 的连贯
3: 期待超过一半的 (Follower 和 Observer) 连贯,再持续往下执行程序
4: 期待超过一半的 (Follower 和 Observer) 获取了新的 epoch,并且返回了 Leader.ACKEPOCH,再持续往下执行程序
5: 期待超过一半的 (Follower 和 Observer) 进行数据同步胜利,并且返回了 Leader.ACK,再持续往下执行程序
6: 数据同步实现,开启 zkServer,并且同时开启申请调用链接收申请执行
7: 进行一个死循环,每次休眠 self.tickTime / 2,和对所有的 (Observer/Follower) 发动心跳检测
8: 集群中没有过半 Follower 在集群中,调用 shutdown 敞开一些对象,从新选举

lead()办法中会创立 LearnerCnxAcceptor,该对象是一个线程,次要用于接管 followers 的连贯,这里加了 CountDownLatch 依据配置的同步的地址的数量(例如:server.2=127.0.0.1:12881:13881 配置同步的端口是 12881 只有一个),LearnerCnxAcceptor 的 run 办法源码如下:

LearnerCnxAcceptor的 run 办法中创立了 LearnerCnxAcceptorHandler 对象,在接管到链接后,就会调用 LearnerCnxAcceptorHandler,而LearnerCnxAcceptorHandler 是一个线程,它的 run 办法中调用了 acceptConnections() 办法,源码如下:

acceptConnections()办法会在这里阻塞接管 followers 的连贯,当有连贯过去会生成一个 socket 对象。而后依据以后 socket 生成一个 LearnerHandler 线程,每个 Learner 者都会开启一个 LearnerHandler 线程,办法源码如下:

LearnerHandler.run 这里就是读取或写数据包与 Learner 替换数据包。如果没有数据包读取,则会阻塞以后办法ia.readRecord(qp, "packet");,源码如下:

咱们再回到 leader.lead() 办法,其中调用了 getEpochToPropose() 办法,该办法是判断 connectingFollowers 发给 leader 端的 Epoch 是否过半,如果过半则会解阻塞,不过半会始终阻塞着,直到 Follower 把本人的 Epoch 数据包发送过去并合乎过半机制,源码如下:

lead() 办法中,当发送的 Epoch 过半之后,把以后 zxid 设置到 zk,并期待 EpochAck,要害源码如下:

waitForEpochAck()办法也会期待超过一半的 (Follower 和 Observer) 获取了新的 epoch,并且返回了 Leader.ACKEPOCH,才会解除阻塞,否则会始终阻塞。期待 EpochAck 解阻塞后,把失去最新的 epoch 更新到以后服务,设置以后 leader 节点的 zab 状态是SYNCHRONIZATION,办法源码如下:

lead()办法中还须要期待超过一半的 (Follower 和 Observer) 进行数据同步胜利,并且返回了 Leader.ACK,程序才会解除阻塞,如下代码:

下面所有流程都走完之后,就证实数据曾经同步胜利了,会执行 startZkServer();

4.4 LearnerHandler 数据同步操作

LearnerHandler线程是对应于 Learner 连贯 Leader 端后,建设的一个与 Learner 端替换数据的线程。每一个 Learner 端都会创立一个 LearnerHandler线程。

咱们具体解说 LearnerHandler.run() 办法。

readRecord读取数据包 一直从 learner 节点读数据,如果没读到将会阻塞readRecord

如果数据包类型不是 Leader.FOLLOWERINFO 或 Leader.OBSERVERINFO 将会返回,因为咱们这里自身就是 Leader 节点,读数据必定是读非 Leader 节点数据。

获取 learnerInfoData 来获取 sid 和版本信息。

获取 followerInfo 和 lastAcceptedEpoch,信息如下:

把 Leader.NEWLEADER 数据包放入到 queuedPackets,并向其余节点发送,源码如下:

本文由传智教育博学谷 – 狂野架构师教研团队公布,转载请注明出处!

如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源

正文完
 0