Zookeeper3.7源码分析
能力指标
- 把握Zookeeper中Session的管理机制
- 能基于Client进行Debug测试Session创立/刷新操作
- 能搭建Zookeeper集群源码配置
- 把握集群环境下Leader选举启动过程
- 能说出Zookeeper选举过程中的概念
- 能说出Zookeeper选举投票规定
- 能画出Zookeeper集群数据同步流程
1 Session源码剖析
客户端创立Socket
连贯后,会尝试连贯,如果连贯胜利胜利会调用到primeConnection
办法用来发送ConnectRequest
连贯申请,这里便是设置session
会话 ,对于客户端创立会话咱们就不在这里做解说了,咱们间接解说服务端Session
会话解决流程。
1.1 服务端Session属性剖析
Zookeeper服务端会话操作如下图:
在服务端通过SessionTrackerImpl
和ExpiryQueue
来保留Session会话信息。
SessionTrackerImpl
有以下属性:
1:sessionsById 用来存储ConcurrentHashMap<Long, SessionImpl> {sessionId:SessionImpl} 2:sessionExpiryQueue ExpiryQueue<SessionImpl>生效队列3:sessionsWithTimeout ConcurrentMap<Long, Integer>存储的是{sessionId: sessionTimeout} 4:nextSessionId 下一个sessionId
ExpiryQueue
生效队列有以下属性:
1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,生效工夫。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 生效工夫,以后生效工夫的Session对象汇合。3:nextExpirationTime 下一次生效工夫 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 以后零碎工夫毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=以后零碎工夫毫秒值+expirationInterval(生效距离)。4:expirationInterval 生效距离,默认是10s,能够通过sessionlessCnxnTimeout批改。即是通过配置文件的tickTime批改。
1.2 Session创立
咱们接着上一章的案例持续剖析,如果客户端发动申请后,后端如何辨认是第一次创立申请?在之前的案例源码NIOServerCnxn.readPayload()
中有所体现,NIOServerCnxn.readPayload()
局部要害源码如下:
此时如果initialized=false
,示意第一次连贯 须要创立Session(createSession)
,此处调用readConnectRequest()
后,在readConnectRequest()
办法中会将initialized
设置为true
,只有在解决完连贯申请之后才会把initialized
设置为true
,才能够解决客户端其余命令。
下面办法还调用了processConnectRequest
解决连贯申请, processConnectRequest
第一次从申请中获取的sessionId=0
,此时会把创立Session
作为一个业务,会调用createSession()
办法,processConnectRequest
办法局部要害代码如下:
创立会话调用createSession()
,该办法会首先创立一个sessionId,并把该sessionId作为会话ID创立一个创立session会话的申请,并将该申请交给业务链作为一个业务解决,createSession()
源码如下:
下面办法用到的sessionTracker.createSession(timeout)
做了2个操作别离是创立sessionId和配置sessionId的跟踪信息,办法源码如下:
会话信息的跟踪其实就是将会话信息增加到队列中,任何中央能够依据会话ID找到会话信息,trackSession
办法实现了Session创立、Session队列存储、Session
过期队列存储,trackSession
办法源码如下:
在PrepRequestProcessor
的run
办法中调用pRequest2Txn
,要害代码如下:
在SyncRequestProcessor
对txn(创立session的操作)进行长久化,在FinalRequestProcessor
会对Session进行提交,其实就是把Session
的ID和Timeout
存到sessionsWithTimeout
中去。
因为FinalRequestProcessor
中调用链路太简单,咱们把调用链路写进去,大家能够依照这个程序跟踪:
1:FinalRequestProcessor.applyRequest() 办法代码:ProcessTxnResult rc = zks.processTxn(request); 2:ZooKeeperServer.processTxn(org.apache.zookeeper.server.Request) 办法代码:processTxnForSessionEvents(request, hdr, request.getTxn());
下面调用链路中processTxnForSessionEvents(request, hdr, request.getTxn());
办法代码如下:
下面办法次要解决了OpCode.createSession
并且将sessionId、TimeOut
提交到sessionsWithTimeout
中,而提交到sessionsWithTimeout
的办法SessionTrackerImpl.commitSession()
代码如下:
1.3 Session刷新
服务端无论承受什么申请命令(增删或ping等申请)都会更新Session的过期工夫 。咱们做增删或者ping命令的时候,都会通过RequestThrottler
,RequestThrottler
的run办法中调用zks.submitRequestNow()
,而zks.submitRequestNow(request)
中调用了touch(si.cnxn);
,该办法源码如下:
touchSession()
办法更新sessionExpiryQueue生效队列中的生效工夫,源码如下:
update()
办法会在以后工夫的根底上减少timeout,并更新生效工夫为newExpiryTime,要害源码如下:
1.4 Session过期
SessionTrackerImpl
是一个线程类,继承了ZooKeeperCriticalThread
,咱们能够看它的run办法,它首先获取了下一个会话过期工夫,并休眠期待会话过期工夫到期,而后获取过期的客户端会话汇合并循环敞开,源码如下:
下面办法中调用了sessionExpiryQueue.poll()
,该办法代码次要是获取过期工夫对应的客户端会话汇合,源码如下:
下面的setSessionClosing()
办法其实是把Session会话的isClosing
状态设置为了true,办法源码如下:
而让客户端生效的办法expirer.expire(s);
其实也是一个业务操作,次要调用了ZooKeeperServer.expire()
办法,而该办法获取SessionId后,又创立了一个OpCode.closeSession
的申请,并交给业务链解决,咱们查看ZooKeeperServer.expire()
办法源码如下:
在PrepRequestProcessor.pRequest2Txn()
办法中OpCode.closeSession
操作里最初局部代理明确将会话Session的isClosing设置为了true,源码如下:
业务链解决对象FinalRequestProcessor.processRequest()
办法调用了ZooKeeperServer.processTxn()
,并且在processTxn()
办法中执行了processTxnForSessionEvents
,而processTxnForSessionEvents()
办法正好移除了会话信息,办法源码如下:
移除会话的办法SessionTrackerImpl.removeSession()
会移除会话ID以及过期会话对象,源码如下:
1.5 Zookeeper会话测试
为了让Zookeeper的会话了解更粗浅,咱们对会话流程做一个测试,首先测试会话创立,再测试会话刷新。
1)会话创立测试
咱们关上NIOServerCnxn.readPayload()
办法,跟踪首次创立会话,调试状况如下:
此时会建设近程连贯并创立SessionID,咱们调试到NIOServerCnxn.readConnectRequest()
办法,此时建设链接,并且失去的sessionId=0。
当sessionId=0时,会执行Session创立,Session创立会调用SessionTrackerImpl.createSession()
办法实现会话创立,并将会话存入跟踪队列,DEBUG测试如下:
会话创立代码如下:
跟踪测试后,控制台输入如下信息:
AcceptThread----------链接服务的IP:127.0.0.11:会话未连贯,筹备首次连贯会话.....2:建设近程连贯......2:第1次连贯的sessionId=0应用SessionTrackerImpl创立会话,并将会话退出跟踪队列中3:sessionId=0,此时创立sessionId=72061099907219458
2)会话刷新测试
咱们执行get /zookeeper
指令,而后首先跟踪到RequestThrottler.run()
办法,执行如下:
执行程序达到ZooKeeperServer.touch()
,行将开始筹备刷新会话了,咱们测试成果如下:
调用SessionTrackerImpl.touchSession()
的时候会先判断会话是否为空、会话是否曾经敞开,如果都没有,才执行刷新会话操作,DEBUG跟踪如下:
刷新会话其实就是会话工夫减少,减少会话工夫DEBUG跟踪如下:
测试后成果如下:
a.以后申请并未过期,不须要删除,筹备刷新会话b.筹备调用SessionTrackerImpl.touchSession()刷新会话c.会话不为空,会话也未敞开,筹备调用updateSessionExpiry()刷新会话d.残余过期工夫:54572178,减少过期工夫:30000,刷新会话后过期工夫:54604000
2 Zookeeper集群启动流程
咱们先搭建Zookeeper集群,再来剖析选举算法。
2.1 Zookeeper集群配置
如上图:
1:创立zoo1.cfg、zoo2.cfg、zoo3.cfg2:创立zkdata1、zkdata2、zkdata33:创立3个myid,值别离为1、2、3
配置3个启动类,如下图:
2.2 集群启动流程剖析
如上图,上图是Zookeeper单机/集群启动流程,每个细节所做的事件都在上图有阐明,咱们接下来依照流程图对源码进行剖析。
程序启动,运行流程启动集群模式,如下图:
quorumPeer.start()
启动服务,如下代码:
quorumPeer.start()
办法代码如下:
quorumPeer.start()
办法启动的次要步骤:
1:loadDataBase()加载数据。2:startServerCnxnFactory 用来开启acceptThread、SelectorThread和workerPool线程池。3:开启Leader选举startLeaderElection。4:开启JVM监控线程startJvmPauseMonitor。5:调用父类super.start();进行Leader选举。
startLeaderElection()
开启Leader选举办法做了2件事,首先创立初始化选票选本人,接着创立选举投票形式,源码如下:
createElectionAlgorithm()
创立选举算法只有第3种,其余2种均已废除,办法源码如下:
这个办法创立了以下三个对象:
①、创立QuorumCnxManager对象
②、QuorumCnxManager.Listener
③、FastLeaderElection
3 Zookeeper集群Leader选举
3.1 Paxos算法介绍
Zookeeper选举次要依赖于FastLeaderElection算法,其余算法均已淘汰,但FastLeaderElection算法又是典型的Paxos算法,所以咱们要先学习下Paxos算法,这样更有助于把握FastLeaderElection算法。
1)Paxos介绍
分布式事务中常见的事务模型有2PC和3PC,无论是2PC提交还是3PC提交都无奈彻底解决分布式的一致性问题以及无奈解决太过激进及容错性不好。Google Chubby的作者Mike Burrows说过,世上只有一种一致性算法,那就是Paxos,所有其余一致性算法都是Paxos算法的不完整版。Paxos算法是公认的艰涩,很难讲清楚,然而工程上也很难实现,所以有很多Paxos算法的工程实现,如Chubby, Raft,ZAB,微信的PhxPaxos等。这一篇会介绍这个公认为难于了解然而卓有成效的Paxos算法。Paxos算法是莱斯利·兰伯特(Leslie Lamport)1990年提出的一种基于消息传递的一致性算法,它曾就此发表了《The Part-Time Parliament》,《Paxos Made Simple》,因为采纳故事的形式来解释此算法,感觉还是很难了解。
2)Paxos算法背景
Paxos算法是基于消息传递且具备高度容错个性的一致性算法,是目前公认的解决分布式一致性问题最无效的算法之一,其解决的问题就是在分布式系统中如何就某个值(决定)达成统一。
面试的时候:不要把这个Paxos算法达到的目标和分布式事务分割起来,而是针对Zookeeper这样的master-slave集群对某个决定达成统一,也就是正本之间写或者leader选举达成统一。我感觉这个算法和广义的分布式事务不是一样的。
在常见的分布式系统中,总会产生诸如机器宕机或网络异样(包含音讯的提早、失落、反复、乱序,还有网络分区)(也就是会产生异样的分布式系统)等状况。Paxos算法须要解决的问题就是如何在一个可能产生上述异样的分布式系统中,疾速且正确地在集群外部对某个数据的值达成统一。也能够了解成分布式系统中达成状态的一致性。
3)Paxos算法了解
Paxos 算法是分布式一致性算法用来解决一个分布式系统如何就某个值(决定)达成统一的问题。一个典型的场景是,在一个分布式数据库系统中,如果各节点的初始状态统一,每个节点都执行雷同的操作序列,那么他们最初能失去一个统一的状态。为保障每个节点执行雷同的命令序列,须要在每一条指令上执行一个”一致性算法”以保障每个节点看到的指令统一。
分布式系统中个别是通过多副原本保障可靠性,而多个正本之间会存在数据不统一的状况。所以必须有一个一致性算法来保证数据的统一,形容如下:
如果在分布式系统中初始是各个节点的数据是统一的,每个节点都程序执行系列操作,而后每个节点最终的数据还是统一的。
Paxos算法就是解决这种分布式场景中的一致性问题。对于个别的开发人员来说,只须要晓得paxos是一个分布式选举算法即可。多个节点之间存在两种通信模型:共享内存(Shared memory)、消息传递(Messages passing),Paxos是基于消息传递的通信模型的。
4)Paxos相干概念
在Paxos算法中,有三种角色:
- Proposer
- Acceptor
- Learners
在具体的实现中,一个过程可能同时充当多种角色。比方一个过程可能既是Proposer又是Acceptor又是Learner。Proposer负责提出提案,Acceptor负责对提案作出裁决(accept与否),learner负责学习提案后果。
还有一个很重要的概念叫提案(Proposal)。最终要达成统一的value就在提案里。只有Proposer发的提案被Acceptor承受(半数以上的Acceptor批准才行),Proposer就认为该提案里的value被选定了。Acceptor通知Learner哪个value被选定,Learner就认为那个value被选定。只有Acceptor承受了某个提案,Acceptor就认为该提案里的value被选定了。
为了防止单点故障,会有一个Acceptor汇合,Proposer向Acceptor汇合发送提案,Acceptor汇合中的每个成员都有可能批准该提案且每个Acceptor只能批准一个提案,只有当一半以上的成员批准了一个提案,就认为该提案被选定了。
3.2 QuorumPeer工作流程
QuorumCnxManager:每台服务器在启动的过程中,会启动一个QuorumPeer
,负责各台服务器之间的底层Leader选举过程中的网络通信对应的类就是QuorumCnxManager
。
Zookeeper
对于每个节点QuorumPeer
的设计相当的灵便,QuorumPeer
次要包含四个组件:客户端申请接收器(ServerCnxnFactory
)、数据引擎(ZKDatabase
)、选举器(Election
)、外围性能组件(Leader/Follower/Observer
)。
1:ServerCnxnFactory负责保护与客户端的连贯(接管客户端的申请并发送相应的响应);(1001行)2:ZKDatabase负责存储/加载/查找数据(基于目录树结构的KV+操作日志+客户端Session);(129行)3:Election负责选举集群的一个Leader节点;(998行)4:Leader/Follower/Observer确认是QuorumPeer节点应该实现的外围职责;(1270行)
QuorumPeer
工作流程比较复杂,如下图:
QuorumPeer工作流程:
1:初始化配置2:加载以后存在的数据3:启动网络通信组件4:启动控制台5:开启选举协调者,并执行选举(这个过程是会继续,并不是一次操作就完结了)
3.3 QuorumCnxManager源码剖析
QuorumCnxManager
外部保护了一系列的队列,用来保留接管到的、待发送的音讯以及音讯的发送器,除接管队列以外,其余队列都依照SID分组造成队列汇合,如一个集群中除了本身还有3台机器,那么就会为这3台机器别离创立一个发送队列,互不烦扰。
QuorumCnxManager.Listener :为了可能互相投票,Zookeeper集群中的所有机器都须要建设起网络连接。QuorumCnxManager在启动时会创立一个ServerSocket来监听Leader选举的通信端口。开启监听后,Zookeeper可能一直地接管到来自其余服务器地创立连贯申请,在接管到其余服务器地TCP连贯申请时,会进行解决。为了防止两台机器之间反复地创立TCP连贯,Zookeeper只容许SID大的服务器被动和其余机器建设连贯,否则断开连接。在接管到创立连贯申请后,服务器通过比照本人和近程服务器的SID值来判断是否接管连贯申请,如果以后服务器发现自己的SID更大,那么会断开以后连贯,而后本人被动和近程服务器将连贯(本人作为“客户端”)。一旦连贯建设,就会依据近程服务器的SID来创立相应的音讯发送器SendWorker和音讯发送器RecvWorker,并启动。
QuorumCnxManager.Listener
监听启动能够查看QuorumCnxManager.Listener
的run
办法,源代码如下,能够断点调试看到此时监听的正是咱们所说的投票端口:
下面是监听器,各个服务之间进行通信咱们须要开启ListenerHandler
线程,在QuorumCnxManager.Listener.ListenerHandler
的run办法中有一个办法acceptConnections()
调用,该办法就是用于承受每次选举投票的信息,如果只有一个节点或者没有投票信息的时候,此时办法会阻塞,一旦执行选举,程序会往下执行,咱们能够先启动1台服务,再启动第2台、第3台,此时会收到有客户端参加投票链接,程序会往下执行,源码如下:
咱们启动2台服务,成果如下:
下面尽管能证实投票拜访了以后监听的端口,但怎么晓得是哪台服务呢?咱们能够沿着receiveConnection()
源码持续钻研,源码如下:
receiveConnection()
办法只是获取了数据流,并没做非凡解决,并且调用了handleConnection()
办法,该办法源码如下:
通过网络连接获取数据sid,获取sid示意是哪一台连过去的,咱们能够打印输出sid,测试输入如下数据:
参加投票的MyID=2参加投票的MyID=3
3.4 FastLeaderElection算法源码剖析
在Zookeeper
集群中,次要分为三者角色,而每一个节点同时只能表演一种角色,这三种角色别离是:
(1)Leader
承受所有Follower的提案申请并对立协调发动提案的投票,负责与所有的Follower进行外部的数据交换(同步);
(2)Follower
间接为客户端提供服务并参加提案的投票,同时与Leader
进行数据交换(同步);
(3)Observer
间接为客户端服务但并不参加提案的投票,同时也与Leader
进行数据交换(同步);
FastLeaderElection
选举算法是规范的 Fast Paxos
算法实现,可解决 LeaderElection
选举算法收敛速度慢的问题。
创立FastLeaderElection
只须要new FastLeaderElection()
即可,如下代码:
创立FastLeaderElection
会调用starter()
办法,该办法会创立sendqueue
、recvqueue
队列、Messenger
对象,其中Messenger
对象的作用十分要害,办法源码如下:
创立Messenger的时候,会创立WorkerSender
并封装成wsThread
线程,创立WorkerReceiver
并封装成wrThread
线程,看名字就很容易了解,wsThread
用于发送数据,wrThread
用于接收数据,Messenger
创立源码如下:
创立完FastLeaderElection
后接着会调用它的start()
办法启动选举算法,代码如下:
启动选举算法会调用start()办法,start()办法如下:
public void start() { this.messenger.start();}
下面会执行messager.start()
,也就是如下办法,也就意味着wsThread
和wrThread
线程都将启动,源码如下:
void start() { this.wsThread.start(); this.wrThread.start();}
wsThread
由WorkerSender
封装而来,此时会调用WorkerSender
的run
办法,run办法会调用process()
办法,源码如下:
process
办法调用了manager
的toSend
办法,此时是把对应的sid作为了音讯发送进来,这里其实是发送投票信息,源码如下:
void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData); manager.toSend(m.sid, requestBuffer);}
投票能够投本人,也能够投他人,如果是选票选本人,只须要把投票信息增加到recvQueue
中即可,源码如下:
在WorkerReceiver.run
办法中会从recvQueue
中获取Message
,并把发送给其余服务的投票封装到sendqueue
队列中,交给WorkerSender
发送解决,源码如下:
3.5 Zookeeper选举投票分析
选举是个很简单的过程,要思考很多场景,而且选举过程中有很多概念须要了解。
3.5.1 选举概念
1)ZK服务状态:
public enum ServerState { //代表没有以后集群中没有Leader,此时是投票选举状态 LOOKING, //代表曾经是随同者状态 FOLLOWING, //代表曾经是领导者状态 LEADING, //代表曾经是观察者状态(观察者不参加投票过程) OBSERVING}
2)服务角色:
//Learner 是随从服务和观察者的统称public enum LearnerType { //随从者角色 PARTICIPANT, //观察者角色 OBSERVER}
3)投票音讯播送:
public static class Notification { int version; //被举荐leader的ID long leader; //被举荐leader的zxid long zxid; //投票轮次 long electionEpoch; //以后投票者的服务状态 (LOOKING) QuorumPeer.ServerState state; //以后投票者的ID long sid; //QuorumVerifier作为集群验证器,次要实现判断一组server在 //已给定的配置的server列表中,是否可能形成集群 QuorumVerifier qv; //被举荐leader的投票轮次 long peerEpoch; }
4)选票模型:
public class Vote { //投票版本号,作为一个标识 private final int version; //以后服务的ID private final long id; //以后服务事务ID private final long zxid; //以后服务投票的轮次 private final long electionEpoch; //被推举服务器的投票轮次 private final long peerEpoch; //以后服务器所处的状态 private final ServerState state;}
5)音讯发送对象:
public static class ToSend { //反对的音讯类型 enum mType { crequest, //申请 challenge, //确认 notification,//告诉 ack //确认回执 } ToSend(mType type, long leader, long zxid, long electionEpoch, ServerState state, long sid, long peerEpoch, byte[] configData) { this.leader = leader; this.zxid = zxid; this.electionEpoch = electionEpoch; this.state = state; this.sid = sid; this.peerEpoch = peerEpoch; this.configData = configData; } /* * Proposed leader in the case of notification * 被投票推举为leader的服务ID */ long leader; /* * id contains the tag for acks, and zxid for notifications * */ long zxid; /* * Epoch * 投票轮次 */ long electionEpoch; /* * Current state; * 服务状态 */ QuorumPeer.ServerState state; /* * Address of recipient * 音讯接管方服务ID */ long sid; /* * Used to send a QuorumVerifier (configuration info) */ byte[] configData = dummyData; /* * Leader epoch */ long peerEpoch;}
3.5.2 选举过程
QuorumPeer自身是个线程,在集群启动的时候会执行quorumPeer.start();
,此时会调用它重写的start()
办法,最初会调用父类的start()
办法,所以该线程会启动执行,因而会执行它的run办法,而run办法正是选举流程的入口,咱们看run办法要害源码如下:
所有节点初始状态都为LOOKING,会进入到选举流程,选举流程首先要获取算法,获取算法的办法是makeLEStrategy()
,该办法返回的是FastLeaderElection
实例,外围选举流程是FastLeaderElection
中的lookForLeader()
办法。
/**** * 获取选举算法 */@SuppressWarnings("deprecation")protected Election makeLEStrategy() { return electionAlg;}
lookForLeader()
是选举过程的要害流程,源码剖析如下:
下面多个中央都用到了过半数以上的办法hasAllQuorums()
该办法用到了QuorumMaj
类,代码如下:
QuorumMaj
构造函数中体现了过半数以上的操作,代码如下:
3.5.3 投票规定
咱们来看一下选票PK的办法totalOrderPredicate()
,该办法其实就是Leader选举规定,规定有如下三个:
1:比拟 epoche(zxid高32bit),如果其余节点的epoche比本人的大,选举 epoch大的节点(理由:epoch 示意年代,epoch越大示意数据越新)代码:(newEpoch > curEpoch);2:比拟 zxid, 如果epoche雷同,就比拟两个节点的zxid的大小,选举 zxid大的节点(理由:zxid 示意节点所提交事务最大的id,zxid越大代表该节点的数据越残缺)代码:(newEpoch == curEpoch) && (newZxid > curZxid);3:比拟 serviceId,如果 epoch和zxid都相等,就比拟服务的serverId,选举 serviceId大的节点(理由: serviceId 示意机器性能,他是在配置zookeeper集群时确定的,所以咱们配置zookeeper集群的时候能够把服务性能更高的集群的serverId设置大些,让性能好的机器负责leader角色)代码 :(newEpoch == curEpoch) && ((newZxid == curZxid) && (newId > curId))。
源码如下:
4 Zookeeper集群数据同步
所有事务操作都将由leader执行,并且会把数据同步到其余节点,比方follower、observer,咱们能够剖析leader和follower的操作行为即可剖析出数据同步流程。
4.1 Zookeeper同步流程阐明
整体流程:
1:当角色确立之后,leader调用leader.lead();办法运行,创立一个接管连贯的LearnerCnxAcceptor线程,在LearnerCnxAcceptor线程外部又建设一个阻塞的LearnerCnxAcceptorHandler线程期待Learner端的连贯。Learner端以follower为例,follower调用follower.followLeader();办法首先查找leader的Socket服务端,而后建设连贯。当follower建设连贯后,leader端会建设一个LearnerHandler线程绝对应,用来解决follower与leader的数据包传输。 2:follower端封装以后zk服务器的Zxid和Leader.FOLLOWERINFO的LearnerInfo数据包发送给leader3:leader端这时处于getEpochToPropose办法的阻塞期间,须要失去Learner端超过一半的服务器发送Epoch4:getEpochToPropose解阻塞之后,LearnerHandler线程会把超过一半的Epoch与leader比拟失去最新的newLeaderZxid,并封装成Leader.LEADERINFO包发送给Learner端5:Learner端失去最新的Epoch,会更新以后服务器的Epoch。并把以后服务器所处的lastLoggedZxid地位封装成Leader.ACKEPOCH发送给leader6:此时leader端处于waitForEpochAck办法的阻塞期间,须要失去Learner端超过一半的服务器发送EpochACK7:当waitForEpochAck阻塞之后便能够在LearnerHandler线程内决定用那种形式进行同步。如果Learner端的lastLoggedZxid>leader端的,Learner端将会被删除多余的局部。如果小于leader端的,将会以不同形式进行同步 8:leader端发送Leader.NEWLEADER数据包给Learner端(6、7步骤都是另开一个线程来发送这些数据包)9:Learner端同步之后,会在一个while循环内解决各种leader端发送数据包,包含两阶段提交的Leader.PROPOSAL、Leader.COMMIT、Leader.INFORM等。在同步数据后会解决Leader.NEWLEADER数据包,而后发送Leader.ACK给leader端 10:此时leader端处于waitForNewLeaderAck阻塞期待超过一半节点发送ACK。
咱们回到QuorumPeer.run()
办法,依据确认的不同角色执行不同操作开展剖析。
4.2 Zookeeper Follower同步流程
Follower次要连贯Leader实现数据同步,咱们看看Follower做的事,咱们依然沿着QuorumPeer.run()开展学习,要害代码如下:
创立Follower的办法比较简单,代码如下:
咱们看一下整个Follower在数据同步中做的所有操作follower.followLeader();
,源码如下图:
下面源码中的follower.followLeader()
办法次要做了如下几件事:
1:寻找Leader2:和Leader创立链接3:向Leader注册Follower,会将以后Follower节点信息发送给Leader节点4:和Leader同步历史数据5:读取Leader发送的数据包6:同步Leader数据包
咱们对follower.followLeader()
调用的其余办法进行分析,其中findLeader()
是寻找以后Leader节点的,源代码如下:
followLeader()
中调用了registerWithLeader(Leader.FOLLOWERINFO);
该办法是向Leader注册Follower,会将以后Follower节点信息发送给Leader节点,Follower节点信息发给Leader是必须的,是Leader同步数据个根底,源码如下:
followLeader()
中最初读取数据包执行同步的办法中调用了readPacket(qp);
,这个办法就是读取Leader的数据包的封装,源码如下:
4.3 Zookeeper Leader同步流程
咱们查看QuorumPeer.run()
办法的LEADING局部,能够看到先创立了Leader对象,并设置了Leader,而后调用了leader.lead()
,leader.lead()
是执行的外围业务流程,源码如下:
leader.lead()
办法是Leader执行的外围业务流程,源码如下:
leader.lead()
办法会执行如下几个操作:
1:从快照和事务日志中加载数据2:创立一个线程,接管Follower/Observer的连贯3:期待超过一半的(Follower和Observer)连贯,再持续往下执行程序4:期待超过一半的(Follower和Observer)获取了新的epoch,并且返回了Leader.ACKEPOCH,再持续往下执行程序5:期待超过一半的(Follower和Observer)进行数据同步胜利,并且返回了Leader.ACK,再持续往下执行程序6:数据同步实现,开启zkServer,并且同时开启申请调用链接收申请执行7:进行一个死循环,每次休眠self.tickTime / 2,和对所有的(Observer/Follower)发动心跳检测8:集群中没有过半Follower在集群中,调用shutdown敞开一些对象,从新选举
lead()
办法中会创立LearnerCnxAcceptor
,该对象是一个线程,次要用于接管followers的连贯,这里加了CountDownLatch依据配置的同步的地址的数量(例如:server.2=127.0.0.1:12881:13881 配置同步的端口是12881只有一个),LearnerCnxAcceptor
的run办法源码如下:
LearnerCnxAcceptor
的run办法中创立了LearnerCnxAcceptorHandler
对象,在接管到链接后,就会调用LearnerCnxAcceptorHandler
,而LearnerCnxAcceptorHandler
是一个线程,它的run办法中调用了acceptConnections()
办法,源码如下:
acceptConnections()
办法会在这里阻塞接管followers的连贯,当有连贯过去会生成一个socket对象。而后依据以后socket生成一个LearnerHandler线程 ,每个Learner者都会开启一个LearnerHandler线程,办法源码如下:
LearnerHandler.run
这里就是读取或写数据包与Learner替换数据包。如果没有数据包读取,则会阻塞以后办法ia.readRecord(qp, "packet");
,源码如下:
咱们再回到leader.lead()
办法,其中调用了getEpochToPropose()
办法,该办法是判断connectingFollowers发给leader端的Epoch是否过半,如果过半则会解阻塞,不过半会始终阻塞着,直到Follower把本人的Epoch数据包发送过去并合乎过半机制,源码如下:
在lead()
办法中,当发送的Epoch过半之后,把以后zxid设置到zk,并期待EpochAck,要害源码如下:
waitForEpochAck()
办法也会期待超过一半的(Follower和Observer)获取了新的epoch,并且返回了Leader.ACKEPOCH,才会解除阻塞,否则会始终阻塞。期待EpochAck解阻塞后,把失去最新的epoch更新到以后服务,设置以后leader节点的zab状态是SYNCHRONIZATION
,办法源码如下:
lead()
办法中还须要期待超过一半的(Follower和Observer)进行数据同步胜利,并且返回了Leader.ACK,程序才会解除阻塞,如下代码:
下面所有流程都走完之后,就证实数据曾经同步胜利了,会执行startZkServer();
4.4 LearnerHandler数据同步操作
LearnerHandler
线程是对应于Learner
连贯Leader
端后,建设的一个与Learner
端替换数据的线程。每一个Learner
端都会创立一个 LearnerHandler
线程。
咱们具体解说LearnerHandler.run()
办法。
readRecord
读取数据包 一直从learner
节点读数据,如果没读到将会阻塞readRecord
。
如果数据包类型不是Leader.FOLLOWERINFO或Leader.OBSERVERINFO将会返回,因为咱们这里自身就是Leader节点,读数据必定是读非Leader节点数据。
获取learnerInfoData
来获取sid和版本信息。
获取followerInfo和lastAcceptedEpoch,信息如下:
把Leader.NEWLEADER数据包放入到queuedPackets,并向其余节点发送,源码如下:
本文由传智教育博学谷 - 狂野架构师教研团队公布,转载请注明出处!
如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源