数据发布订阅 / 配置中心
实现配置信息的集中式管理和数据的动态更新
实现配置中心有两种模式:push、pull。
长轮训
zookeeper 采用的是推拉相结合的方式。客户端向服务器端注册自己需要关注的节点。一旦节点数据发生变化,那么服务器端就会向客户端
发送 watcher 事件通知。客户端收到通知后,主动到服务器端获取更新后的数据
数据量比较小
数据内容在运行时会发生动态变更
集群中的各个机器共享配置
负载均衡
请求 / 数据分摊多个计算机单元上
分布式锁
通常实现分布式锁有几种方式
redis。setNX 存在则会返回 0,不存在
数据方式去实现
创建一个表,通过索引唯一的方式
create table (id , methodname …) methodname 增加唯一索引
insert 一条数据 XXX delete 语句删除这条记录
mysql for update 行锁,杜占锁
zookeeper 实现排他锁
利用路径唯一
共享锁(读锁)
locks 当中是有序节点,控制使用权限,每一个客户端写一个节点之后,获取到最小节点,获取数据,有写的操作,优先处理写的节点,利用节点特性
实现共享锁,使用 java api 的方式
package zk.lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class DistributeLock {
// 根节点
private static final String ROOT_LOCKS = “/LOCKS”;
private ZooKeeper zooKeeper;
// 节点的数据
private static final byte[] data = {1, 2};
// 会话超时时间
private int sessionTimeOut;
// 记录锁节点 id
private String lockID;
private CountDownLatch countDownLatch = new CountDownLatch(1);
public DistributeLock() throws IOException, InterruptedException {
this.zooKeeper = ZookeeperClient.getInstance();
this.sessionTimeOut = ZookeeperClient.getSESSIONTIMEOUT();
}
/**
* 获取锁的方法
*
* @return
*/
public synchronized boolean lock() {
try {
//LOCKS/000001
lockID = zooKeeper.create(ROOT_LOCKS + “/”, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + “->” + “ 成功创建了 lock 节点[” + lockID + “, 开始竞争锁]”);
// 获取根节点下的所有子节点
List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true);
// 排序,从小到大
TreeSet<String> sortedSet = new TreeSet<>();
for (String children : childrenNodes) {
sortedSet.add(ROOT_LOCKS + “/” + children);
}
// 获取到最小的节点
String first = sortedSet.first();
if (lockID.equals(first)) {
// 表示当前就是最小的节点
System.out.println(Thread.currentThread().getName() + “–> 成功获得锁,locak 节点为:【” + lockID + “]”);
return true;
}
SortedSet<String> lessThanLockId = sortedSet.headSet(lockID);
if (!lessThanLockId.isEmpty()) {
// 获取到比当前 LockId 这个节点更小的上一个节点
String prevLockId = lessThanLockId.last();
// 监控上一个节点
zooKeeper.exists(prevLockId, new LockWatcher(countDownLatch));
// 如果会话超时或者节点被删除(释放)了
countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName() + “ 成功获取锁:【” + lockID + “】”);
return true;
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public synchronized boolean unLock() {
System.out.println(Thread.currentThread().getName() + “–> 开始释放锁 ”);
try {
zooKeeper.delete(lockID, -1);
System.out.println(“ 节点 ” + lockID + “ 被释放了 ”);
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(10);
Random random = new Random();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
DistributeLock lock = null;
try {
lock = new DistributeLock();
countDownLatch.countDown();
countDownLatch.await();
lock.lock();
Thread.sleep(random.nextInt(500));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock != null) {
lock.unLock();
}
}
}).start();
}
}
}
package zk.lock;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
public class LockWatcher implements Watcher {
private CountDownLatch countDownLatch;
public LockWatcher(CountDownLatch latch) {
this.countDownLatch = latch;
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
countDownLatch.countDown();
}
}
}
命名服务
master 选举
7*24 小时可用,99.999% 可用
master-slave 模式
slave 监听 master 节点,如果 master 节点挂掉,slave 自动接替 master, 心跳机制维持,出现网络异常,slave 认为 master 挂掉了,可能出现双主节点的情况,重复处理数据
使用 zookeeper 解决上述问题,往某一个节点注册 master 节点,只有一个能够注册成功,注册成功的为 master,如果失去联系,节点会被删除,不会出现脑裂问题
per 实现原理讲解
分布式队列
master 选举改成多线程 (多进程) 模型(master-slave)创建三个工程,while 去抢
分布式队列 activeMQ、kafka、….
先进先出队列
通过 getChildren 获取指定根节点下的所有子节点,子节点就是任务
确定自己节点在子节点中的顺序
如果自己不是最小的子节点,那么监控比自己小的上一个子节点,否则处于等待
接收 watcher 通知,重复流程
Barrier 模式
在一个节点下只能允许多少数据,只有子节点达到一定数量,才执行
curator 提供应用场景的封装
curator-reciples
master/leader 选举
分布式锁(读锁、写锁)
分布式队列
LeaderLatch
写一个 master
LeaderSelector
每一个应用都写一个临时有序节点,根据最小的节点来获得优先权
curator 提供应用场景的封装
curator-reciples
master/leader 选举
分布式锁(读锁、写锁)
分布式队列
…
LeaderLatch
写一个 master
LeaderSelector
每一个应用都写一个临时有序节点,根据最小的节点来获得优先权
zookeeper 集群角色
leader
leader 是 zookeeper 集群的核心。
事务请求的唯一调度者和处理者,保证集群事务处理的顺序性
集群内部各个服务器的调度者
follower
处理客户端非事务请求,以及转发事务请求给 leader 服务器
参与事务请求提议(proposal)的投票(客户端的一个事务请求,需要半数服务器投票通过以后才能通知 leader commit;leader 会发起一个提案,要求 follower 投票)
参与 leader 选举的投票
observer
观察 zookeeper 集群中最新状态的变化并将这些状态同步到 observer 服务器上。
增加 observer 不影响集群中事务处理能力,同时还能提升集群的非事务处理能力
zookeeper 的集群组成
zookeeper 一般是由 2n+ 1 台服务器组成
leader 选举
选举算法:
leaderElection
AuthFastLeaderElection
FastLeaderElection
QuorumPeer startLeaderElection
源码地址:https://github.com/apache/zoo…
需要的条件:jdk 1.7 以上、ant、idea
FastLeaderElection
serverid : 在配置 server 集群的时候,给定服务器的标识 id(myid)
zxid : 服务器在运行时产生的数据 ID,zxid 的值越大,表示数据越新
Epoch: 选举的轮数
server 的状态:Looking、Following、Observering、Leading
第一次初始化启动的时候:LOOKING
所有在集群中的 server 都会推荐自己为 leader,然后把(myid、zxid、epoch)作为广播信息,广播给集群中的其他 server, 然后等待其他服务器返回
每个服务器都会接收来自集群中的其他服务器的投票。集群中的每个服务器在接受到投票后,开始判断投票的有效性 a) 判断逻辑时钟(Epoch),如果 Epoch 大于自己当前的 Epoch,说明自己保存的 Epoch 是过期。更新 Epoch,同时 clear 其他服务器发送过来的选举数据。判断是否需要更新当前自己的选举情况
b) 如果 Epoch 小于目前的 Epoch,说明对方的 epoch 过期了,也就意味着对方服务器的选举轮数是过期的。这个时候,只需要讲自己的信息发送给对方
c) 如果 sid 等于当前 sid,根据规则来判断是否有资格获得 leader
接受到来自其他服务器的投票后,针对每一个投票,都需要将别人的投票和自己的投票进行对比,zxid,zxid 最大的服务器优先
统计投票
ZAB 协议
拜占庭问题
一组拜占庭将军分别各率领一支军队共同围困一座城市。为了简化问题,将各支军队的行动策略限定为进攻或撤离两种。因为部分军队进攻部分军队撤离可能会造成灾难性后果,因此各位将军必须通过投票来达成一致策略,即所有军队一起进攻或所有军队一起撤离。因为各位将军分处城市不同方向,他们只能通过信使互相联系。在投票过程中每位将军都将自己投票给进攻还是撤退的信息通过信使分别通知其他所有将军,这样一来每位将军根据自己的投票和其他所有将军送来的信息就可以知道共同的投票结果而决定行动策略。系统的问题在于,将军中可能出现叛徒,他们不仅可能向较为糟糕的策略投票,还可能选择性地发送投票信息。假设有 9 位将军投票,其中 1 名叛徒。8 名忠诚的将军中出现了 4 人投进攻,4 人投撤离的情况。这时候叛徒可能故意给 4 名投进攻的将领送信表示投票进攻,而给 4 名投撤离的将领送信表示投撤离。这样一来在 4 名投进攻的将领看来,投票结果是 5 人投进攻,从而发起进攻;而在 4 名投撤离的将军看来则是 5 人投撤离。这样各支军队的一致协同就遭到了破坏。
由于将军之间需要通过信使通讯,叛变将军可能通过伪造信件来以其他将军的身份发送假投票。而即使在保证所有将军忠诚的情况下,也不能排除信使被敌人截杀,甚至被敌人间谍替换等情况。因此很难通过保证人员可靠性及通讯可靠性来解决问题。
假始那些忠诚(或是没有出错)的将军仍然能通过多数决定来决定他们的战略,便称达到了拜占庭容错。在此,票都会有一个默认值,若消息(票)没有被收到,则使用此默认值来投票。
上述的故事映射到计算机系统里,将军便成了计算机,而信差就是通信系统。虽然上述的问题涉及了电子化的决策支持与信息安全,却没办法单纯的用密码学与数字签名来解决。因为电路错误仍可能影响整个加密过程,这不是密码学与数字签名算法在解决的问题。因此计算机就有可能将错误的结果提交去,亦可能导致错误的决策。
paxos 协议主要就是如何保证在分布式环网络环境下,各个服务器如何达成一致最终保证数据的一致性问题
ZAB 协议,基于 paxos 协议的一个改进。
zab 协议为分布式协调服务 zookeeper 专门设计的一种支持崩溃恢复的原子广播协议
zookeeper 并没有完全采用 paxos 算法,而是采用 zab Zookeeper atomic broadcast
zab 协议的原理
在 zookeeper 的主备模式下,通过 zab 协议来保证集群中各个副本数据的一致性
zookeeper 使用的是单一的主进程来接收并处理所有的事务请求,并采用 zab 协议,把数据的状态变更以事务请求的形式广播到其他的节点
zab 协议在主备模型架构中,保证了同一时刻只能有一个主进程来广播服务器的状态变更
所有的事务请求必须由全局唯一的服务器来协调处理,这个的服务器叫 leader,其他的叫 followerleader 节点主要负责把客户端的事务请求转化成一个事务提议(proposal),并分发给集群中的所有 follower 节点
再等待所有 follower 节点的反馈。一旦超过半数服务器进行了正确的反馈,那么 leader 就会 commit 这条消息
崩溃恢复
原子广播
zab 协议的工作原理
什么情况下 zab 协议会进入崩溃恢复模式(没有接受到的数据进行同步)
当服务器启动时
当 leader 服务器出现网络中断、崩溃或者重启的情况
集群中已经不存在过半的服务器与该 leader 保持正常通信
zab 协议进入崩溃恢复模式会做什么
当 leader 出现问题,zab 协议进入崩溃恢复模式,并且选举出新的 leader。当新的 leader 选举出来以后,如果集群中已经有过半机器完成了 leader 服务器的状态同(数据同步),退出崩溃恢复,进入消息广播模式
当新的机器加入到集群中的时候,如果已经存在 leader 服务器,那么新加入的服务器就会自觉进入数据恢复模式,找到 leader 进行数据同步
问题
假设一个事务在 leader 服务器被提交了,并且已经有过半的 follower 返回了 ack。在 leader 节点把 commit 消息发送给 follower 机器之前 leader 服务器挂了怎么办
zab 协议,一定需要保证已经被 leader 提交的事务也能够被所有 follower 提交
zab 协议需要保证,在崩溃恢复过程中跳过哪些已经被丢弃的事务
回顾
zookeeper 数据模型
临时节点(有序)、持久化节点(有序)
临时节点其他节点也能看到
zookeeper 是一个开源的分布式协调框架; 数据发布订阅、负载均衡、集群、master 选举。。。
原子性:要么同时成功、要么同时失败(分布式事务)
单一视图:无论客户端连接到哪个服务器,所看到的模型都是一样
可靠性:一旦服务器端提交了一个事务并且获得了服务器端返回成功的标识,那么这个事务所引起的服务器端的变更会一直保留
实时性:近实时
zookeeper 并不是用来存储数据的,通过监控数据状态的变化,达到基于数据的集群管理。分布式
集群配置
修改 zoo.cfgserver.id=ip:port:port 第一个 Port 数据同步通信、第二个 port:leader 选举(3181)
id=myid (myid 参与 leader 选举、在整个集群中表示唯一服务器的标识)
dataDir 目录下 创建一个 myid 的文件,内容:server.id 对应当前服务器的 id 号
如果增加 observer 需要在第一步中,server.id=ip:port:port:observer ; peerType=observer
会话
NOT_CONNECTED – > CONNECTING ->CONNECTED ->ClOSE
数据模型
数据模型是一个树形结构,最小的数据单元是 ZNODE
临时节点和持久化节点
临时有序节点
持久化有序节点
状态信息
Stat
cZxid = 0xb0000000f
ctime = Sun Aug 13 20:24:03 CST 2017
mZxid = 0xb0000000f
mtime = Sun Aug 13 20:24:03 CST 2017
pZxid = 0xb0000000f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15dda30f72f0000
dataLength = 2
numChildren = 0
zab 协议:如果客户端发了一个事务请求给到 leader,而 leader 发送给各个 follower 以后,并且收到了 ack,leader 已经 commit。在准备 ack 给各个 follower 节点 comit 的时候,leader 挂了,怎么处理的。
选举新的 leader(zxid 的最大值)
同步给其他的 folower
watcher
EventyType
None 客户端与服务器端成功建立会话
NodeCreated 节点创建
NodeDeleted 节点删除
NodeDataChanged 数据变更:数据内容
NodeChildrenChanged 子节点发生变更:子节点删除、新增的时候,才会触发
watcher 的特性一次性触发:事件被处理一次后,会被移除,如果需要永久监听,则需要反复注册
zkClient(永久监听的封装)
curator
java api 的话,zk.exists , zk.getData 创建一个 watcher 监听
zookeeper 序列化使用的是 Jute
Acl 权限的操作
保证存储在 zookeeper 上的数据安全性问题
schema(ip/Digest/world/super)授权对象(192.168.1.1/11 , root:root / world:anyone/ super)
数据存储
内存数据和磁盘数据
zookeeper 会定时把数据存储在磁盘上。
DataDir = 存储的是数据的快照
快照:存储某一个时刻全量的内存数据内容
DataLogDir 存储事务日志
zxid : 服务器运行时产生的日志 id
log.zxid
查看事务日志的命令
java -cp :/mic/data/program/zookeeper-3.4.10/lib/slf4j-api-1.6.1.jar:/mic/data/program/zookeeper-3.4.10/zookeeper-3.4.10.jar org.apache.zookeeper.server.LogFormatter log.200000001
zookeeper 有三种日志
zookeeper.out // 运行日志
快照 存储某一时刻的全量数据
事务日志 事务操作的日志记录