共计 3552 个字符,预计需要花费 9 分钟才能阅读完成。
1 zk 的 leader 和 follower 之间的连贯
(1)leader 选举胜利后,状态扭转,执行 lead 办法,lead() 办法其实就是启动了个 LearnCnxAcceptor 线程,为了承受其余 follower/observer 的申请。用的是传统的 bio,
a LearnerCnxAcceptor 线程 start 启动
b LearnerCnxAcceptor 线程调用 accept,期待其余 follower 连贯
c 其余 follower 连贯建设,则生成这个 follower 专用的 LearnerHandler 线程, 并 start
d LearnerHandler 线程代表了对 follower 的申请收发,通过 while 循环阻塞读取 follower 的申请,申请包含注册,proposal 的 ack,ping 等等。e 读取申请后,通过 jute 的反序列化和定制的协定规定,解析申请数据并解决。
(2)follower 身份确立后,状态扭转,执行 Follwer 的 followLeader 办法,做这几个事:
a findLeader, 找到 leader 是哪个机器
b connectToLeader, 对 leader 发动连贯,此时 leader 如果连贯建设会生成 LearnerHandler
c 向 leader 发动注册,次要是 follower 信息。d while 循环,阻塞式的读取和解决 leader 发来的申请,proposal commit 等操作。
leader 和 follower 次要通过 jute 来进行序列化 / 反序列化,数据用单词开始完结,如 type xxxxx type, 代表 type 的内容。
2 客户端和服务端的连贯建设
(1)客户端启动,其实就是 new Zookeeper 实例,构造方法中会初始化一个 ClientCnxn 组件,用于和 server 端连贯通信。clientCnx 启动,run 办法里 while 循环,再循环中一直收回 ping 和数据包
a 如果没连贯,ClientCnxnSocketNIO 的 registerAndConnect 创立连贯,b 连贯建设后,判断上次发动 ping 的工夫距离,通过 sendping 和服务端发动心跳
c 基于底层封装的 clientCnxnSocket,通过 nio 的读写事件遍历 selectkey 对读写事件进行解决。
(2)服务端就是 quorumpeer 启动的时候启动起来的 NIOServerCnxnFactory, 应用的通用的 nio,通过 configure 计算应用的线程数,实际上是 2 *cpu 核数的 worker 线程和一个固定的 accept 建设连接线程。因为客户端不肯定特地多,所以一个 accept 也够用。接下来 accept 的 run 承受 accept 事件,而后注册 read 事件,应用 select 解决具体读写事件。
3 session 治理
(1)session 的创立流程:
a zookeeper 客户端创立后,会定时 ping 到 server,申请入本人的队列,而后由 nio 组件生产队列通过下面说的连贯发到 server 端,b server 端通过启动的 NIOServerCnxnFactory 网络组件承受申请,拆包转化。c 如果第一次申请,没有 session,会进入 processConnectRequest, 应用 sessionTraker 组件创立惟一 sessionid。d 而后把这次 session 存到 sessionid 为 key 的 map 中,并进行 touch 分桶,治理 seesion 申明周期。前面每次申请都会 touch 一下分到新的过期桶。e 这次 session 申请 submit 到解决链中。f 解决链一层层走上来,走完写回后果到客户端。创立 session 的话解决链做的不多,解决链出要给前面 proposal 和 commit 二次提交用。
(2)sessionid:通过机器标识(配置的 myid)+ 工夫戳位移,失去惟一 id。分桶策略:ExpirationTime 是指该会话最近一次可能超时的工夫点,ExpirationTime = CurrentTime + SessionTimeout,sessionTracker 的 run 会定时查看会话,其工夫距离为 ExpirationInterval(默认配置的 ticktime)。
(3)session 的状态轮转
session 的次要作用是判断长期节点是否还保留。
a Connected/Reconnected->Suspended
表明心跳超时,Session 可能将会生效(长期节点可能会隐没);
b Suspended->Lost
表明曾经连贯上 Server 和服务器确认了 Session 超时,或者客服端长期无奈连贯到服务端,此事 Session 曾经齐全无奈再应用(长期节点隐没了);
c Suspended->Reconnected
从新连贯上 Server, 并且 Session 可能持续应用(长期节点还存在);
d Suspended->Lost->Reconnected
从新连贯上 Server, 然而 Session 曾经从新创立(长期节点须要从新创立)
(4)session 的 touch 激活
client 收回激活申请的机会:
a 客户端向服务端发送申请,包含读写申请,就会触发会话激活。b 客户端发现在 sessionTimeout/ 3 工夫内尚未与服务端进行任何通信,就会被动发动 ping 申请,服务端收到该申请后,就会触发会话激活。
(5)session 的检测和清理
sessionTracker 的 run 办法定期检测,
a 设置 session 状态位敞开,革除过期的桶
b 提交一个 closeSession 的本地申请到解决链,和失常申请一样进行一个 2pc 的删除,删除内存长期节点。session 保留了本人创立的所有长期节点,间接删十分不便。删了当前还会触发 watcher。c 移除会话,把 session 对象移除 sessionTracker.removeSession(sessionId);
d 从 NIOServerCnxFactory 里,依照规范的 nio 敞开流程敞开 session 对应的连贯。
4 2pc 提交流程剖析
从 client 客户端的 create 开始,发送到 zkserver 的 leader 机器。leader 有本人的 proposal 解决链,对所有 follower 发送 proposal,同时本人用一个 proposal 的 ack 汇合收集返回的 ack,过半后触发 commit
a 客户端 create,申请进入客户端的发送队列 outgoingqueue,有 clientcnx 的 sendthread 线程发送申请进来,达到 zkserver 服务器的 NIOServerCnxnFactory 组件,提交给解决链。如果申请到了 follower,follower 实现转发。b 解决链达到 ProposalRequestProcessor, 开启 2.1,进入 proposal 办法,放入 outstandingProposals 队列,遍历所有 leaner 组件(这里 learner 能够看作 follower),发送 proposal 到每个 follower
c follower 接管到 proposal 申请后,对数据包进行解决,先提交到 Synprocessor 线程进行刷内存数据库和刷盘,这里看到数据是写入内存的,刷盘是数据包队列为空或累计了 1000 条事务数据后再刷盘。而后通过 SendAckRequestProcessor 返回 ack,这些 processor 名字很容易了解。d leader 收到 proposal 的 ack 后,边收集边统计,如果 ack 过半,则 trycommit,本人先 commit,而后异步发送 commit 到所有 follower。leader 本人 commit 完就能对外应用了,不必等 commit 的 ack,leader 本人 commit 是通过 FinalRequestProcessor, 写入内存数据库 zkDatabase,反馈给客户端后果。e follower 收到 commit,本人更新数据,逻辑和 leader 的 commit 一样。
还有些细节:
(1)leader 接管写申请的时候,第一个 processor,PreRequestProcessor 里生成的事务 id,zxid 怎么生成?其实就是个原子 long 递增 hzxid.incrementAndGet();
(2)全量数据快照的存储:SyncRequestProcessor 里,一直读事务申请存内存的时候,会判断是否须要落整体快照(依据配置大小来判断),需要的话就开启写盘线程,通过序列化把内存树写入磁盘。
(3)qurumPeerMain 启动的时候,有个 DatadirCleanupManager,启动 PurgeTask 清理文件,因为快照都是全量的,后面的快照能够删,通过这个线程能够清理。
(4)leader 和 follower 的解决链: