1、3.4.5 源码编译
1、下载源码,执行以上批改
首先
build.xml 更改
<property name="ivy.url"
value="https://repo1.maven.org/maven2/org/apache/ivy/ivy" />
ivysettings.xml 更改
<property name="repo.maven.org"
value="https://repo1.maven.org/maven2/" override="false"/>
<property name="repo.jboss.org"
value="https://repository.jboss.org/nexus/content/groups/public-jboss/" override="false"/>
<property name="repo.sun.org"
value="https://download.java.net/maven/2/" override="false"/>
其实都是把 http 改为 https,否则这会无法访问,在 2020 年 1 月之后就不能够拜访了
2、执行 ant eclipse
3、import 指定 eclipse
4、/Users/xxx/IdeaProjects/zookeeper 间接根目录下执行 ant 即可
2、Zookeeper 三种角色 & 节点类型
Leader、Follower 和 Observer
只有 Leader 能够写数据,其余节点只能同步数据,Follower 参加选举
Observer 不参加选举
所以只能一个 leader 进行写,单机写入最多每秒上万 QPS,这是没法扩大的,所以 zk 适宜是写少读多的场景
长久化节点、长期节点、程序节点(是和长久化节点和长期节点配合应用)
3、ZAB(Zookeeper Atomic Broadcast)
ZAB = 2PC + 过半写磁盘日志,proposal(os cache) + 过半 ack + commit(forceSync 磁盘)内存数据结构
4、程序一致性
程序一致性,其实是最终一致性。因为 leader 肯定会保障所有的 proposal 同步到 follower 上都是依照程序来走的,起码程序不会乱
5、Zookeeper 集群部署 & 外围参数
4 核 8G 的机器,一般来说,每秒并发搞到 1000 是能够的
8 核 16G 的机器,每秒并发搞到几千是能够的
16 核 32G 的机器,每秒并发搞到上万或者是几万都是有可能的
tickTime : 默认 2s,其余一些参数会以这个 tickTime 为基准来进行设置
dataDir : 数据快照
dataLogDir : 日志数据,proposal 日志文件
snapCount : 默认 10 万个事务,存储一次快照
initLimit : 默认是 10,10 * tickTime,20s,启动结束之后,follower 和 leader 之间的数据同步,如果超过这个工夫,leader 间接对外提供服务了
syncLimit : 默认值是 5,5 * 2 = 10s,leader 和 follower 之间会进行心跳,如果超过 10s 没有心跳,leader 就把这个 follower 给踢出去了,认为他曾经死了
maxClientCnxns : 每个客户端对 zk 的连贯最大是 60 个
jute.maxbuffer : 一个 znode 最多能够存储 1MB 数据
server.x = zk01:2888:3888
3888 是用来 leader 选举的,2888 是用来数据同步的
客户端反对非交互式
./zkCli.sh -server localhost:2181 get /zookeeper2
6、读写锁
[写锁,写锁,读锁,读锁,写锁,读锁,读锁,读锁]
写锁只关怀后面一个节点,对后面的这个节点做监听,读锁,应该计算排在离本人最近的一把写锁
所谓的羊群效应,就是不能我开释锁了,把所有不是监听我的锁都惊醒了,curator 读锁应该是判断的离本人最后面的一个写锁
7、leader 选举
三台机器 myid 1,2,3
如果是空的集群,他们 zxid 都是 0,如果是 1,2,3 顺次启动,则第二台机器入选为 leader,当 quorum=(3/2+1)=2,半数的机器的时候才会进行选举
leader 选举
第一轮投票
myid= 0 的机器,投票(0,0),(myid,zxid),播送给集群其余节点
myid= 1 的机器,投票(1,0),(myid,zxid),播送给集群其余节点
myid= 0 的机器,接管到(1,0),和投出去(0,0) 比照,因为不一样,须要从新调整抉择 (1,0)
myid= 1 的机器,接管到(0,0),和投出去(1,0) 比照,不须要从新调整持续(1,0)
第二轮投票
myid= 0 的机器,投票(1,0),(myid,zxid),播送给集群其余节点
myid= 1 的机器,投票(1,0),(myid,zxid),播送给集群其余节点
myid= 0 的机器,接管到(1,0)
myid= 1 的机器,接管到(1,0)
投票统一了,都抉择了 myid= 1 的这台机器,所以 leader 就产生了,最初就算 myid= 3 启动了,因为 leader 曾经存在,所以只能 follower 追随
8、客户端源码
8.1、创立一个 Zookeeper 对象,Zookeeper 对象中蕴含了一个 ClientCnxn,这个对象不简略,代表一个客户端
8.2、ClientCnxn 三个重要的组件,
第一个组件是 ClientCnxnSocketNIO,应用 NIO 进行网络连接
第二个组件是 SendThread 线程 用于客户端申请的发送
第三个组件是 EventThread 线程 用于客户端接管 watcher 的回调
8.3、客户端命令会放入到 outgoingQueue 队列中,判断是否 finished,同时本人 wait 住,同时唤醒 SendThread selector,让其干活,进行 read/write
8.4、一旦 SendThread run 将 outgoingQueue 中的 Packet 写出去之后,会放入到 pendingQueue 中(阐明已发送进来,然而还没有收到响应,一旦响应结束,才会革除 pendingQueue 中的 Packet)
8.5、SendThread read 事件触发,之前 outgoingQueue 发送到 zk server 收到了响应
读取后果如果没有回调监听,间接示意 finished=true,nofityAll packet,让客户端操作实现
如果有监听,比如说创立节点的时候指定了一个 Watch,会标记 finished=true,同时将 Packet 放入到 waitingEvents(在 EventThread 中)
8.6、EventThread 线程 run 办法会阻塞获取 waitingEvents 事件,解决监听回调 watcher.process(pair.event)和回调函数 processResult
9、session id 的生成
Session 生成算法
以后工夫左移 24,又挪动 8 位
myid 左移 56
前两者相或,说白了,其实就是不单单用零碎工夫作为 SessionId,要加 myid(其实说白就是机器 id)能力保障唯一性
session 进行分桶设计,这样的益处是批量进行超时,不做无用功,比如说 4 和 5 个这个工夫在同一个桶外面
(4/2 + 1) * 2 = 6
(5/2 + 1) * 2 = 6
(8/2 + 1) * 2 = 10
(9/2 + 1) * 2 = 10
SessionTrackerImpl 是一个线程,有后盾线程,一直 check 线程超时,check 的时候也是有逻辑的
tickTime 默认 2s roundToInterval = (time / expirationInterval + 1) * expirationInterval 等于下一次 超时工夫 nextExpirationTime
ClientCnxn 客户端周期性 sendPing,通过 ClientCnxnSocketNIO 发送,维持和 NIOServerCnxn 也就是 leader 的心跳,每次申请也会进行 touchSession
其实也是依据下面的 roundToInterval 算进去的,所以下面的 nextExpirationTime 工夫必定比要超时的桶工夫小 tickTime 倍数
所以超时 check 就是逻辑就是如果以后工夫不到,就间接 wait(nextExpirationTime – currentTime),而后间接超时 session 就能够,要晓得不论是 ping 还是
touchSession 都会让 session 往前桶走,所以相当于 touch 是前走,超时是后追
10、Leader 选举流程(3888 端口)
1、QuorumPeerMain 是入口
2、解析 zoo.cfg
3、创立并启动 DatadirCleanupManager(用来周期性的清理 log 和 snapshot)
4、创立网络通信组件,默认是 NIOServerCnxnFactory
5、创立 QuourmPeer 线程,是一个线程,这个可不简略,在 QuorumPeerMain 中会 start(是办法,不是线程 start) QuourmPeer 线程,外面做了很多要害事件
6、从快照文件中复原数据,复原 log 和 snapshot
7、启动 NIOServerCnxnFactory 网络连接,是一个线程,让 NIO 开始监听干活,NIOServerCnxnFactory 实现多路复用,ACCEPT、READ 和 WRITE 事件,之后是
责任链的链条解决对应的是 LeaderZooKeeperServer
PreRequestProcessor -> ProposqlProcessor -> CommitProcessor -> toBeAppliedProcessor -> FinalProcessor
8、开始选举,两个事件
8.1、QuorumCnxManager.Listener 创立 3888,选举服务端端口监听,监听客户端的连贯,创立 SendWorker 和 RecvWorker 线程,并启动
8.2、创立 FastLeaderElection 的时候不得了,starter -> new Messenger 构造方法中,启动了 WorkerSender 和 WorkerReceiver 线程
9、启动 QuorumPeer 线程,走 run 办法,因为状态是 LOOKING,走的是 FastLeaderElection.lookForLeader
sendNotifications 给其余节点发送告诉的时候,会创立和 Listener 的连贯,这里其实有一个问题,Zookeeper 选举网络互连的核心思想是 : 只有大的 server id 能力连贯小的 server id,否则就算连贯上也会被 close
然而在 3.4.5 版本中 sendNotifications 就开始和其余的 peer 开始建设连贯,意思是说,我能够小的连贯大的,会在 Listener,也就是服务端判断,如果过去的 server id 比我小,断开
socket 连贯,其实应该在建设网络连接的时候判断,而不是建设结束了再 close
一直从 recvqueue 拿票
归票,这里有一个点须要留神 recvset 这个外面其实是蕴含了其余 peer 节点的票和本人的票 (怎么会有本人的票呢?没有看到放进去啊?)
其实在 sendNotifications 的时候会判断,org.apache.zookeeper.server.quorum.QuorumCnxManager#toSend
这段代码很重要,就是本人给本人发送,间接放入到 recvQueue 中,就不走网络通信了
还有一层意义是本人不和本人网络连接,这样最终其实就放入到了
if (self.getId() == sid) {b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
}
org.apache.zookeeper.server.quorum.FastLeaderElection#termPredicate 走
org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum
public boolean containsQuorum(HashSet`<Long>` set){
// half = 1
return (set.size() > half);
}
票的外围数据结构 : (epoch, zxid, myid),上返回后一一比拟,投票还有一个 logicalclock,能够不在同一个周期中进行选举
选举算法
sendQueue -> WorkerSender -> queueSendMap -> SendWorker -> RecvWorker -> recvQueue -> WorkerReceive -> recvqueue
归票的时候是会从 recvqueue 外面拿票
11、数据同步
选举结束 leader 节点进入 LEADING 状态,follower 节点将进入 FOLLOWING 状态,此时集群中节点将数据进行同步操作,以保证数据一致性。只有
数据同步实现,能力对外提供服务
leader :
会启动一个 LearnerCnxAcceptor,用来监听 follower 对其的网络连接,针对每一个 follower 的连贯都会创立一个 LearnerHandler 线程服务于他
follower :
connectToLeader : 和 leader 建设网络连接
syncWithLeader 同步数据
次要要做的事件是三个:
1、leader 向 follower 发送 LEADERINFO 信息,告知 follower 新的 epoch 值
follower 接管解析 LEADERINFO 信息,若 new epoch 值大于 current accepted epoch 值则更新 acceptedEpoch
follower 向 leader 发送 ACKEPOCH 信息,反馈 leader 已收到新的 epoch 值,并附带 follower 节点的 last zxid
2、LearnerHandler 中 leader 在收到过半的 ACKEPOCH 信息之后将进入数据同步阶段
1 和 2 其实在做一个事件,就是咱们版本一样了哈,能够同步数据了
3、同步策略
全量同步 SNAP
若 follower 的 peerLastZxid 小于 leader 的 minCommittedLog 或者 leader 节点上不存在提案缓存队列时,将采纳 SNAP 全量同步形式。该模式下 leader 首先会向 follower 发送 SNAP 报文,随后从内存数据库中获取全量数据序列化传输给 follower,follower 在接管全量数据后会进行反序列化加载到内存数据库中
TRUNC(回滚同步)
若 follower 的 peerLastZxid 大于 leader 的 maxCommittedLog,则告知 follower 回滚至 maxCommittedLog;该场景能够认为是 TRUNC+DIFF 的简化模式
RUNC+DIFF(先回滚再差异化同步)
在上文 DIFF 差异化同步时会存在一个非凡场景就是 尽管 follower 的 peerLastZxid 介于 maxCommittedLog,minCommittedLog 两者之间,然而 follower 的 peerLastZxid 在 leader 节点中不存在;此时 leader 需告知 follower 先回滚到 peerLastZxid 的前一个 zxid, 回滚后再进行差异化同步
最初 UPTODATE 对外服务
12、链条化解决
org.apache.zookeeper.server.NIOServerCnxn#readRequest,NIO 服务端读取到客户端申请,甩到 processor 链条中进行解决
leader :
makeLeader -> new LeaderZooKeeperServer -> 创立一个责任链链条
PreRequestProcessor -> ProposalRequestProcessor 之后分两条分支
1、CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor
2、SyncRequestprocessor -> AckRequestProcessor
follower :
两条线
存在意义是,其实客户端也是能够将申请发送给 follower 的,zk 为了保障分布式数据一致性,应用 ZAB 协定,在客户端发动一次写申请的时候,假如申请到的是
follower,follower 不会间接解决这个申请,而是转发给 leader,由 leader 发动投票决定申请是最终是否执行胜利
FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor 这个链条有分为两个
如果 client 申请间接打到 follower 上,会转发给 leader,走的是 FollowerRequestProcessor 之后的链条
如果是 leader 的 commit 申请走的是 CommitProcessor -> FinalRequestProcessor 的链条
SyncRequestProcessor -> SendAckRequestProcessor 这个链条是承受 leader 的 proposal 申请
13、Watcher 机制
大抵流程是 Client 向 ZK 中注册 Watcher,如果注册胜利的话,会将对应的 Watcher 存储在本地。当 ZK 服务器端触发 Watcher 事件之后,会向客户端发送告诉,
客户端会从 ClientWatchManager 中取出对应的 Watcher 进行回调
1、客户端注册 Watcher
2、服务端解决 Watcher
3、客户端回调 Watcher
客户的 Watcher 是由 ZKWatchManager 治理的
创立 zookeeper 客户端的时候给一个默认的 Watcher
getData,getChildren,exist
ServerCnxn 是一个要害,默认实现是 NIOServerCnxn,代表了一个客户端和服务的连贯,ServerCnxn 实现了 Watcher 接口
FinalRequestProcessor#processRequest
比如说 getData 的时候,如果加了 Watcher 的话,其实传递过去就是一个 boolean 值,是否加了 Watcher,如果服务端会加到 WatchManager 中
在 setData 的时候,就是进行 dataWatches.triggerWatch(path,EventType.NodeDataChanged)传递回去,其实是通过 NIOServerCnxn 传递回去的
NIOServerCnxn 也是一个网络连接,间接发送给客户端
客户端间接应用 EventThread 进行解决,其实也是在 getData 的时候客户端判断有 watcher,在本地保留
org.apache.zookeeper.ClientCnxn#finishPacket
// TODO 请解释分明这里,其实很简略,就是比如说我要注册一个 Watcher 对吧,怎么也得让我这次注册胜利之后,// TODO 才有可能是服务端告诉吧,所以这就是要做的注册 Watcher
if (p.watchRegistration != null) {
// 最初回到 WatchRegistration 将对应的 Watcher 注册到对应的 Map<String,Set<Watcher>> 中
p.watchRegistration.register(p.replyHeader.getErr());
}
org.apache.zookeeper.ClientCnxn.SendThread#readResponse 如果有 Watche 告诉,会放入到
EventThread#queueEvent 封装 WatcherSetEventPair(就是从 ZKWatchManager 中移除,放入到他外面的 watchers 中) 的,
而后将 WatcherSetEventPair 放入 waitingEvents 队列中,EventThread 线程 run 遍历 WatcherSetEventPair 中的 watchers,调用 process 办法
Watcher 的个性
1、一次性 : 物理是客户端还是服务端,一旦 Watcher 触发、都会将其从存储中移除
2、客户端串行执行 : 出啊性同步执行的过程,千万不要因为一个 Watcher 而影响整个客户端回调 Watcher
3、轻量 : WatchedEvent 是告诉机制中最小的告诉单元,只蕴含三局部内容 : 告诉状态、工夫类型、节点门路。而不会将节点的内容以告诉的形式告知
客户端,而是须要客户端接管到告诉之后,被动去服务端获取数据
DataTree
ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
DataNode 是由层级 children 的
SASL 次要利用于跨节点通信时的认证与数据加密
14、Follower 申请转发
如感兴趣,点赞加关注哦!