共计 8458 个字符,预计需要花费 22 分钟才能阅读完成。
1. ZooKeeper
开源的分布式的协调服务, 是 Google 的 Chubby 一个开源的实现, 它是一个为分布式应用提供一致性服务的软件
2. ZooKeeper 提供的功能
- 配置维护
- 域名服务
- 分布式锁
- 组服务
3. ZooKeeper 的特点
-
简单
- ZooKeeper 的核心是一个精简的 文件系统 , 它支持一些简单的操作和一些抽象操作
-
丰富
- ZooKeeper 的操作是很丰富的,可实现一些协调数据结构和协议。例如,分布式队列、分布式锁和一组同级别节点中的“领导者选举”
-
高可靠
- ZooKeeper 支持集群模式,可以很容易的解决单点故障问题
-
松耦合交互
- 不同进程间的交互不需要了解彼此,甚至可以不必同时存在,某进程在 ZooKeeper 中留下消息后,该进程结束后其它进程还可以读这条消息
-
资源库
- ZooKeeper 实现了一个关于通用协调模式的开源共享存储库,能使开发者免于编写这类通用协议
4. ZooKeeper 的角色
-
leader(领导者)
- 负责进行投票的发起和决议,更新系统状态
-
learner (学习者)
- 包括跟随者 (follower) 和观察者(observer),follower 用于接
受客户端请求并向客户端返回结果,在选举过程中参与投票。Observer 可以接受客户端连接,
将写请求转发给 leader,但 observer 不参与投票过程,只同步 leader 的状态,observer 的
目的是为了扩展系统,提高读取速度
- 包括跟随者 (follower) 和观察者(observer),follower 用于接
-
客户端(client)
- 请求发起方
5. ZooKeeper 数据模型
- 层次化的目录结构,命名符合常规文件系统规范
- 每个节点在 zookeeper 中叫做znode, 并且其有一个唯一的路径标识
- 节点 znode 可以包含数据和子节点,但是 EPHEMERAL 类型的节点不能有子节点
- znode 中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本
- 客户端应用可以在节点上设置监视器
- 节点不支持部分读写,而是一次性完整读写
6. ZooKeeper 的节点
-
zookeeper 中的节点包含下面两个类型的节点
- 临时节点(ephemeral)
- 持久节点(persistent)
znode 的类型在创建时确定并且之后不能再修改
znode 默认不指定类型是持久节点
-
ephemeral 类型的节点
- 在节点客户端会话结束时, 会将 zookeeper 中的节点删除
- 不能有子节点
- persistent 节点不依赖与客户端会话,只有当客户端明确要删除该 persistent 节点时才会被删除
- ZooKeeper 的客户端和服务器通信采用长连接方式 , 每个客户端和服务器通过心跳来保持连接,这个连接状态称之为 session , 如果 znode 是临时节点,这个 seesion 失效,znode 也就删除了
7. ZooKeeper 中的选举机制
-
服务器的 ID
- 分别 1,2,3
- 编号越大在选择算法中的 权重 越大
-
数据的 ID
- 服务器中存放的最大数据 ID.
- 值越大说明数据越新,在选举算法中数据越新权重越大
-
编辑时钟
- 投票的次数
- 同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断
-
选举状态
- LOOKING,竞选状态。
- FOLLOWING,随从状态,同步 leader 状态,参与投票。
- OBSERVING,观察状态, 同步 leader 状态,不参与投票。
- LEADING,领导者状态。
-
选举信息的内容
在投票完成后,需要将投票信息发送给集群中的所有服务器,它包含如下内容。
- 服务器 ID
- 数据 ID
- 逻辑时钟
- 选举状态
-
选举机制的结果
- zk 启动之后通过选举机制, 来选举出来一个 leader
8. ZooKeeper 集群搭建
安装 zookeeper 集群要求大于 1 的奇数台机器
8.1 准备安装包
zookeeper-3.4.6.tar.gz
8.2 解压
tar -zxvf zookeeper-3.4.6.tar.gz -C /opt/
8.3 重命名
mv zookeeper-3.4.6/ zookeeper
8.4 配置环境变量
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=PATH:ZOOKEEPER_HOME/bin
8.5 配置 zookeeper
cp zoo_sample.cfg zoo.cfg
dataDir=/opt/zookeeper/data
8.6 myid 文件
在 zookeeper 的各个机器中分别创建 myid 文件(/opt/zookeeper/data), 内容分别为
1 2 3
8.7 配置 zoo.cfg
server.1=uplooking03:2888:3888
server.2=uplooking04:2888:3888
server.3=uplooking05:2888:3888
8.8 查看 zookeeper 集群的时间
保证 zookeeper 集群中的时间不能有超过 20 秒的误差
ntpdate -u ntp.api.bz 根据时间同步服务器同步时间,- u 是绕过防火请
8.9 启动 zookeeper 服务
zkServer.sh start
9. ZooKeeper
9.1 zookeeper 的 Shell 操作
- create znodename data : 创建 znode
- get znodename : 查看 znode
- set znodename data : 修改 znode 的数据
- rmr znodename : 删除 znode 节点
- quit : 退出会话
- delete : 删除节点
- setquota -b 长度配额 : 设置节点长度配额
- setquota -n 数量配额 : 设置节点数量配额
- listquoat path : 列出配额
-
delquota path : 删除配额
- zookeeper 中的配额管理超出配额的大小之后依然可以进行操作
-
create -e znode : 创建临时节点
- 临时节点不能创建子节点
- 临时节点可以有数据
- 临时节点会话结束时会自动删除
9.2 zookeeper 原生的 API 操作
package com.uplooking.bigdata.zookeeper;
import org.apache.zookeeper.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ZookeeperTest {
private ZooKeeper zooKeeper;
@Before
public void init() throws Exception {
String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
zooKeeper = new ZooKeeper(connStr, 3000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {System.out.println("watch..." + watchedEvent.getType());
}
});
}
/**
* 创建节点
*/
@Test
public void testCreateZNode() throws Exception {
String path = "/test01";
zooKeeper.exists(path, true);
String ret = zooKeeper.create(path, "HELLO2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(ret);
}
@Test
public void testSetZnode() throws KeeperException, InterruptedException {zooKeeper.setData("/test02", "uplooking02".getBytes(), 1);
}
@Test
public void testGetZnode() throws KeeperException, InterruptedException {byte[] data = zooKeeper.getData("/test02", true, null);
System.out.println(new String(data, 0, data.length));
}
@Test
public void testDeleteZnode() throws KeeperException, InterruptedException {zooKeeper.delete("/test02", -1);
}
@After
public void destory() throws Exception {zooKeeper.close();
}
}
-
实现注册监听的方法
- exists
- getData
- getChild
-
事件的类型
- NodeCreated
- NodeDeleted
- NodeDataChanged
- …….
10 ZooKeeper 客户端神器 Curator
- Netflix 公司开源的一套 Zookeeper 客户端框架
- 封装了原生的 zookeeper 的 API 操作
- apache 的顶级项目
- 基于 Fluent 的编程风格(链式编程)
- Curator 有 2.x.x 和 3.x.x 两个系列的版本,支持不同版本的 Zookeeper。其中 Curator 2.x.x 兼容 Zookeeper 的 3.4.x 和 3.5.x。而 Curator 3.x.x 只兼容 Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch 删除等新特性
- 推荐使用 curator2.x 的版本
11 Curator 中的组件
名称 | 描述 |
---|---|
Recipes | Zookeeper 典型应用场景的实现,这些实现是基于 Curator Framework。 |
Framework | Zookeeper API 的高层封装,大大简化 Zookeeper 客户端编程,添加了例如 Zookeeper 连接管理、重试机制等。 |
Utilities | 为 Zookeeper 提供的各种实用程序。 |
Client | Zookeeper client 的封装,用于取代原生的 Zookeeper 客户端(ZooKeeper 类),提供一些非常有用的客户端特性。 |
Errors | Curator 如何处理错误,连接问题,可恢复的例外等。 |
12 依赖 POM
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
package com.uplooking.bigdata.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest {
public String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
private CuratorFramework zkClient;
@Before
public void init() {zkClient = CuratorFrameworkFactory.newClient(connStr, new RetryForever(6000));
zkClient.start();}
/**
* 创建空节点(其实不是空节点, 是给节点默认设置了 ip 地址)
*
* @throws Exception
*/
@Test
public void createZnode() throws Exception {zkClient.create().forPath("/test03");
}
@Test
public void createZnode1() throws Exception {zkClient.create().forPath("/test02", "h".getBytes());
}
@Test
public void deleteZnode() throws Exception {zkClient.delete().deletingChildrenIfNeeded().forPath("/test02");
}
@Test
public void setZnode() throws Exception {zkClient.setData().forPath("/test03");
}
@Test
public void getZnode() throws Exception {byte[] bytes = zkClient.getData().forPath("/test03");
System.out.println(new String(bytes, 0, bytes.length));
}
}
10 ZooKeeper 客户端神器 Curator
10.1 Curator 简介
- Netflix 公司开源的一套 Zookeeper 客户端框架
- 封装了原生的 zookeeper 的 API 操作
- apache 的顶级项目
- 基于 Fluent 的编程风格(链式编程)
- Curator 有 2.x.x 和 3.x.x 两个系列的版本,支持不同版本的 Zookeeper。其中 Curator 2.x.x 兼容 Zookeeper 的 3.4.x 和 3.5.x。而 Curator 3.x.x 只兼容 Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch 删除等新特性
- 推荐使用 curator2.x 的版本
10.2 Curator 中的组件
名称 | 描述 |
---|---|
Recipes | Zookeeper 典型应用场景的实现,这些实现是基于 Curator Framework。 |
Framework | Zookeeper API 的高层封装,大大简化 Zookeeper 客户端编程,添加了例如 Zookeeper 连接管理、重试机制等。 |
Utilities | 为 Zookeeper 提供的各种实用程序。 |
Client | Zookeeper client 的封装,用于取代原生的 Zookeeper 客户端(ZooKeeper 类),提供一些非常有用的客户端特性。 |
Errors | Curator 如何处理错误,连接问题,可恢复的例外等。 |
10.3 Curator 的基本 API 操作
/*Curator 的基本的节点操作 */
package com.uplooking.bigdata.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest {
public String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
private CuratorFramework zkClient;
@Before
public void init() {zkClient = CuratorFrameworkFactory.newClient(connStr, new RetryForever(6000));
zkClient.start();}
/**
* 创建空节点(其实不是空节点, 是给节点默认设置了 ip 地址)
*
* @throws Exception
*/
@Test
public void createZnode() throws Exception {zkClient.create().forPath("/test03");
}
@Test
public void createZnode1() throws Exception {zkClient.create().forPath("/test02", "h".getBytes());
}
@Test
public void deleteZnode() throws Exception {zkClient.delete().deletingChildrenIfNeeded().forPath("/test02");
}
@Test
public void setZnode() throws Exception {zkClient.setData().forPath("/test03");
}
@Test
public void getZnode() throws Exception {byte[] bytes = zkClient.getData().forPath("/test03");
System.out.println(new String(bytes, 0, bytes.length));
}
}
10.4 Curator 的监听器
-
PathChildrenCache(监听一级子节点的改变)
- 监听的节点如果不存在则会自动创建一个
- 监听的节点如果途中被删除了, 那么监听器则无效了
-
NodeCache(当前节点的的改变)
- 节点的改变(节点的添加也属于节点的改变)
- 节点的删除
- TreeCache(当前节点以及后代节点的改变)
10.4.1 NodeCache 监听器
String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
RetryPolicy retryPolicy = new RetryNTimes(3, 3000);
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(connStr, retryPolicy);
zkClient.start();
String path = "/test01";
// 创建监听监听器
NodeCache nodeCache = new NodeCache(zkClient, path);
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {if (nodeCache.getCurrentData() == null) {System.out.println("删除了节点...."+path);
} else {System.out.println("节点改变.." + "路径为:" + nodeCache.getCurrentData().getPath() + "数据为:" + new String(nodeCache.getCurrentData().getData()));
}
}
});
Thread.sleep(Integer.MAX_VALUE);
zkClient.close();
10.4.2 PathChildrenCache 监听器
String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
RetryPolicy retryPolicy = new RetryNTimes(3, 3000);
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(connStr, retryPolicy);
zkClient.start();
String path = "/test01";
// 创建监听监听器
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, path, true);
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("一级子节点改变.."+event.getData()+ event.getType());
}
});
Thread.sleep(Integer.MAX_VALUE);
zkClient.close();