关于java:ZooKeeper入门看这篇就够了

36次阅读

共计 7255 个字符,预计需要花费 19 分钟才能阅读完成。

思维导图

前言

在很多时候,咱们都能够在各种框架利用中看到 ZooKeeper 的身影,比方 Kafka 中间件,Dubbo 框架,Hadoop 等等。为什么到处都看到 ZooKeeper?

一、什么是 ZooKeeper

ZooKeeper 是一个分布式服务协调框架 ,提供了分布式数据一致性的解决方案,基于 ZooKeeper 的 数据结构,Watcher,选举机制 等特点,能够 实现数据的公布 / 订阅,软负载平衡,命名服务,对立配置管理,分布式锁,集群治理 等等。

二、为什么应用 ZooKeeper

ZooKeeper 能保障:

  • 更新申请程序进行。来自同一个 client 的更新申请按其发送程序顺次执行
  • 数据更新原子性。一次数据更新要么胜利,要么失败
  • 全局惟一数据视图。client 无论连贯到哪个 server,数据视图都是统一的
  • 实时性。在肯定工夫范畴内,client 读到的数据是最新的

三、数据结构

ZooKeeper 的数据结构和 Unix 文件系统很相似,总体上能够看做是一棵树,每一个节点称之为一个 ZNode,每一个 ZNode默认能存储 1M 的数据 。每一个 ZNode 可 通过惟一的门路标识。如下图所示:

创立 ZNode 时,能够指定以下四种类型,包含:

  • PERSISTENT,持久性 ZNode。创立后,即便客户端与服务端断开连接也不会删除,只有客户端被动删除才会隐没。
  • PERSISTENT_SEQUENTIAL,持久性程序编号 ZNode。和持久性节点一样不会因为断开连接后而删除,并且 ZNode 的编号会主动减少。
  • EPHEMERAL,临时性 ZNode。客户端与服务端断开连接,该 ZNode 会被删除。
  • EPEMERAL_SEQUENTIAL,临时性程序编号 ZNode。和临时性节点一样,断开连接会被删除,并且 ZNode 的编号会主动减少。

四、监听告诉机制

Watcher 是基于 观察者模式 实现的一种机制。如果咱们须要实现当某个 ZNode 节点发生变化时收到告诉,就能够应用 Watcher 监听器。

客户端通过设置监督点(watcher)向 ZooKeeper 注册须要接管告诉的 znode,在 znode 发生变化时 ZooKeeper 就会向客户端发送音讯

这种告诉机制是一次性的。一旦 watcher 被触发,ZooKeeper 就会从相应的存储中删除。如果须要一直监听 ZNode 的变动,能够在收到告诉后再设置新的 watcher 注册到 ZooKeeper。

监督点的类型有很多,如 监控 ZNode 数据变动、监控 ZNode 子节点变动、监控 ZNode 创立或删除

五、选举机制

ZooKeeper 是一个高可用的利用框架,因为 ZooKeeper 是反对集群的。ZooKeeper 在集群状态下,配置文件是不会指定 Master 和 Slave,而是在 ZooKeeper 服务器初始化时就在外部进行选举,产生一台做为 Leader,多台做为 Follower,并且恪守半数可用准则。

因为恪守半数可用准则,所以 5 台服务器和 6 台服务器,实际上最大容许宕机数量都是 3 台,所以为了节约老本,集群的服务器数量个别设置为奇数

如果在运行时,如果长时间无奈和 Leader 放弃连贯的话,则会再次进行选举,产生新的 Leader,以保障服务的可用

六、初の体验

首先在官网下载 ZooKeeper,我这里用的是 3.3.6 版本。

而后解压,复制一下 /conf 目录下的 zoo_sample.cfg 文件,重命名为 zoo.cfg。

批改 zoo.cfg 中 dataDir 的值,并创立对应的目录:

最初到 /bin 目录下启动,我用的是 window 零碎,所以启动 zkServer.cmd,双击即可:

启动胜利的话就能够看到这个对话框:

可视化界面的话,我举荐应用ZooInspector,操作比拟简便:

6.1 应用 java 连贯 ZooKeeper

首先引入 Maven 依赖:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

接着咱们写一个 Main 办法,进行操作:

    // 连贯地址及端口号
    private static final String SERVER_HOST = "127.0.0.1:2181";

    // 会话超时工夫
    private static final int SESSION_TIME_OUT = 2000;

    public static void main(String[] args) throws Exception {
        // 参数一:服务端地址及端口号
        // 参数二:超时工夫
        // 参数三:监听器
        ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 获取事件的状态
                Event.KeeperState state = watchedEvent.getState();
                // 判断是否是连贯事件
                if (Event.KeeperState.SyncConnected == state) {Event.EventType type = watchedEvent.getType();
                    if (Event.EventType.None == type) {System.out.println("zk 客户端已连贯...");
                    }
                }
            }
        });
        zooKeeper.create("/java", "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("新增 ZNode 胜利");
        zooKeeper.close();}

创立一个持久性 ZNode,门路是 /java,值为 ”Hello World”:

七、API 概述

7.1 创立

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)

参数解释:

  • path ZNode 门路
  • data ZNode 存储的数据
  • acl ACL 权限管制
  • createMode ZNode 类型

ACL 权限管制,有三个是 ZooKeeper 定义的罕用权限,在 ZooDefs.Ids 类中:

/**
 * This is a completely open ACL.
 * 齐全凋谢的 ACL,任何连贯的客户端都能够操作该属性 znode
 */
public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));

/**
 * This ACL gives the creators authentication id's all permissions.
 * 只有创建者才有 ACL 权限
 */
public final ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));

/**
 * This ACL gives the world the ability to read.
 * 只能读取 ACL
 */
public final ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));

createMode 就是后面讲过的四种 ZNode 类型:

public enum CreateMode {
    /**
     * 持久性 ZNode
     */
    PERSISTENT (0, false, false),
    /**
     * 持久性主动减少顺序号 ZNode
     */
    PERSISTENT_SEQUENTIAL (2, false, true),
    /**
     * 临时性 ZNode
     */
    EPHEMERAL (1, true, false),
    /**
     * 临时性主动减少顺序号 ZNode
     */
    EPHEMERAL_SEQUENTIAL (3, true, true);
}

7.2 查问

// 同步获取节点数据
public byte[] getData(String path, boolean watch, Stat stat){...}

// 异步获取节点数据
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx){...}

同步 getData()办法中的 stat 参数是用于接管返回的节点形容信息:

public byte[] getData(final String path, Watcher watcher, Stat stat){
    // 省略...
    GetDataResponse response = new GetDataResponse();
    // 发送申请到 ZooKeeper 服务器,获取到 response
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (stat != null) {
        // 把 response 的 Stat 赋值到传入的 stat 中
        DataTree.copyStat(response.getStat(), stat);
    }
}

应用同步 getData()获取数据:

    // 数据的形容信息,包含版本号,ACL 权限,子节点信息等等
    Stat stat = new Stat();
    // 返回后果是 byte[]数据,getData()办法底层会把形容信息复制到 stat 对象中
    byte[] bytes = zooKeeper.getData("/java", false, stat);
    // 打印后果
    System.out.println("ZNode 的数据 data:" + new String(bytes));//Hello World
    System.out.println("获取到 dataVersion 版本号:" + stat.getVersion());// 默认数据版本号是 0 

7.3 更新

public Stat setData(final String path, byte data[], int version){...}

值得注意的是第三个参数 version,应用 CAS 机制,这是为了避免多个客户端同时更新节点数据,所以须要在更新时传入版本号,每次更新都会使版本号 +1,如果服务端接管到版本号,比照发现不统一的话,则会抛出异样。

所以,在更新前须要先查问获取到版本号,否则你不晓得以后版本号是多少,就没法更新:

    // 获取节点形容信息
    Stat stat = new Stat();
    zooKeeper.getData("/java", false, stat);
    System.out.println("更新 ZNode 数据...");
    // 更新操作,传入门路,更新值,版本号三个参数, 返回后果是新的形容信息
    Stat setData = zooKeeper.setData("/java", "fly!!!".getBytes(), stat.getVersion());
    System.out.println("更新后的版本号为:" + setData.getVersion());// 更新后的版本号为:1

更新后,版本号减少了:

如果传入的版本参数是 ”-1″,就是通知 zookeeper 服务器,客户端须要基于数据的最新版本进行更新操作。然而 - 1 并不是一个非法的版本号,而是一个标识符。

7.4 删除

public void delete(final String path, int version){...}
  • path 删除节点的门路
  • version 版本号

这里也须要传入版本号,调用 getData()办法即可获取到版本号,很简略:

Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
// 删除 ZNode
zooKeeper.delete("/java", stat.getVersion());

7.5 watcher 机制

在下面第三点提到,ZooKeeper 是能够应用告诉监听机制,当 ZNode 发生变化会收到告诉音讯,进行解决。基于 watcher 机制,ZooKeeper 能玩出很多花色。怎么应用?

ZooKeeper 的告诉监听机制,总的来说能够分为三个过程:

①客户端注册 Watcher
②服务器解决 Watcher
③客户端回调 Watcher 客户端。

注册 watcher 有 4 种办法,new ZooKeeper()、getData()、exists()、getChildren()。上面演示一下应用 exists()办法注册 watcher:

首先 须要实现 Watcher 接口,新建一个监听器:

public class MyWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        // 获取事件类型
        Event.EventType eventType = event.getType();
        // 告诉状态
        Event.KeeperState eventState = event.getState();
        // 节点门路
        String eventPath = event.getPath();
        System.out.println("监听到的事件类型:" + eventType.name());
        System.out.println("监听到的告诉状态:" + eventState.name());
        System.out.println("监听到的 ZNode 门路:" + eventPath);
    }
}

而后调用 exists()办法,注册监听器:

zooKeeper.exists("/java", new MyWatcher());
// 对 ZNode 进行更新数据的操作,触发监听器
zooKeeper.setData("/java", "fly".getBytes(), -1);

而后在控制台就能够看到打印的信息:

这里咱们能够看到Event.EventType 对象就是事件类型,咱们能够对事件类型进行判断,再配合Event.KeeperState 告诉状态,做相干的业务解决,事件类型有哪些?

关上 EventType、KeeperState 的源码查看:

// 事件类型是一个枚举
public enum EventType {None (-1),// 无
    NodeCreated (1),//Watcher 监听的数据节点被创立
    NodeDeleted (2),//Watcher 监听的数据节点被删除
    NodeDataChanged (3),//Watcher 监听的数据节点内容产生变更
    NodeChildrenChanged (4);//Watcher 监听的数据节点的子节点列表产生变更
}

// 告诉状态也是一个枚举
public enum KeeperState {Unknown (-1),// 属性过期
    Disconnected (0),// 客户端与服务端断开连接
    NoSyncConnected (1),// 属性过期
    SyncConnected (3),// 客户端与服务端失常连贯
    AuthFailed (4),// 身份认证失败
    ConnectedReadOnly (5),// 返回这个状态给客户端,客户端只能解决读申请
    SaslAuthenticated(6),// 服务器采纳 SASL 做校验时
    Expired (-112);// 会话 session 生效
}

7.5.1 watcher 的个性

  • 一次性。一旦 watcher 被触发,ZK 都会从相应的存储中移除。
    zooKeeper.exists("/java", new Watcher() {
        @Override
        public void process(WatchedEvent event) {System.out.println("我是 exists()办法的监听器");
        }
    });
    // 对 ZNode 进行更新数据的操作,触发监听器
    zooKeeper.setData("/java", "fly".getBytes(), -1);
    // 希图第二次触发监听器
    zooKeeper.setData("/java", "spring".getBytes(), -1);

  • 串行执行。客户端 Watcher 回调的过程是一个串行同步的过程,这是为了保障程序。
    zooKeeper.exists("/java", new Watcher() {
        @Override
        public void process(WatchedEvent event) {System.out.println("我是 exists()办法的监听器");
        }
    });
    Stat stat = new Stat();
    zooKeeper.getData("/java", new Watcher() {
        @Override
        public void process(WatchedEvent event) {System.out.println("我是 getData()办法的监听器");
        }
    }, stat);
    // 对 ZNode 进行更新数据的操作,触发监听器
    zooKeeper.setData("/java", "fly".getBytes(), stat.getVersion());

打印后果,阐明先调用 exists()办法的监听器,再调用 getData()办法的监听器。因为 exists()办法先注册了。

  • 轻量级。WatchedEvent 是 ZK 整个 Watcher 告诉机制的最小告诉单元。WatchedEvent 蕴含三局部:告诉状态,事件类型,节点门路。Watcher 告诉仅仅通知客户端产生了什么事件,而不会阐明事件的具体内容。

正文完
 0