乐趣区

zookeeper的一篇概述

之前在公司由于业务需要,对 zookeeper 进行了一些知识点的梳理进行分享,对一些刚刚接触 zookeeper 的小伙伴来说,或许可以借鉴一下

一、zookeeper 介绍

简介
Zookeeper 致力于提供一个 高性能 高可用 ,且具备 严格的顺序访问 控制能力的分布式协调服务。

设计目标

  • 简单的数据结构:共享的树形结构,类似文件系统,存储于内存;
  • 可以构建集群:避免单点故障,3- 5 台机器就可以组成集群,超过半数,正常工作就能对外提供服务;
  • 顺序访问:对于每个写请求,zk 会分配一个全局唯一的递增编号,利用 这个特性可以实现高级协调服务;
  • 高性能:基于内存操作,服务于非事务请求,适用于读操作为主的业务 场景。3 台 zk 集群能达到 13w QPS;

应用场景

  1. 数据发布订阅
  2. 负载均衡
  3. 命名服务
  4. Master 选举
  5. 集群管理
  6. 配置管理
  7. 分布式队列
  8. 分布式锁

二、zookeeper 特性

会话 (session): 客户端与服务端的一次会话连接,本质是 TCP 长连接,通过会话可以进行心跳检测和数据传输;

数据节点(znode)

  1. 持久节点(PERSISTENT)
  2. 持久顺序节点(PERSISTENT_SEQUENTIAL)
  3. 临时节点(EPHEMERAL)
  4. 临时顺序节点(EPHEMERAL_SEQUENTIAL)

对于持久节点和临时节点,同一个 znode 下,节点的名称是唯一的:[center red 20px]

Watcher 事件监听器:客户端可以在节点上注册监听器,当特定的事件发生后,zk 会通知到感兴趣的客户端。
EventType: NodeCreated、NodeDeleted、NodeDataChanged、NodeChildrenChange

ACL:Zk 采用 ACL(access control lists)策略来控制权限
权限类型:create,read,write,delete,admin

三、zookeeper 常用命令

  1. 启动 ZK 服务: bin/zkServer.sh start
  2. 查看 ZK 服务状态:bin/zkServer.sh status
  3. 停止 ZK 服务: bin/zkServer.sh stop
  4. 重启 ZK 服务: bin/zkServer.sh restart
  5. 客户端连接:zkCli.sh -server 127.0.0.1:2181
  6. 显示目录:ls /
  7. 创建:create /zk “test”
  8. 获得值:get /zk
  9. 修改值:set /zk “test”
  10. 删除:delete /zk
  11. ACL:

    • getAcl / setAcl
    • addauth

四、zookeeper 的 java 客户端

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
</dependency>
<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
 </dependency>
public class App {public static void main(String[] args) throws Exception {
        String connectString = "211.159.174.226:2181";

        RetryPolicy retryPolicy = getRetryPolicy();
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, 5000, 5000, retryPolicy);
        client.start();

        // 增删改查
        client.create().withMode(CreateMode.PERSISTENT).forPath("/test-Curator-PERSISTENT-nodata");
        client.create().withMode(CreateMode.PERSISTENT).forPath("/test-Curator-PERSISTENT-data", "test-Curator-PERSISTENT-data".getBytes());
        client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-Curator-EPHEMERAL-nodata");
        client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-Curator-EPHEMERAL-data", "/test-Curator-EPHEMERAL-data".getBytes());

        for (int i = 0; i < 5; i++) {client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-Curator-PERSISTENT_SEQUENTIAL-nodata");
        }

        byte[] bytes = client.getData().forPath("/test-Curator-PERSISTENT-data");
        System.out.println("----------zk 节点数据:" + new String(bytes) + "------------");

        client.create().withMode(CreateMode.PERSISTENT).forPath("/test-listener", "test-listener".getBytes());
        final NodeCache nodeCache = new NodeCache(client, "/test-listener");
        nodeCache.start();
        NodeCacheListener listener = new NodeCacheListener() {

            @Override
            public void nodeChanged() throws Exception {System.out.println("node changed :" + nodeCache.getCurrentData());
            }
        };
        nodeCache.getListenable().addListener(listener);

        client.setData().forPath("/test-listener", "/test-listener-change".getBytes());

    }
    /**
     * RetryOneTime: 只重连一次.
     * RetryNTime: 指定重连的次数 N.
     * RetryUtilElapsed: 指定最大重连超时时间和重连时间间隔, 间歇性重连直到超时或者链接成功.
     * ExponentialBackoffRetry: 基于 "backoff" 方式重连, 和 RetryUtilElapsed 的区别是重连的时间间隔是动态的
     * BoundedExponentialBackoffRetry: 同 ExponentialBackoffRetry, 增加了最大重试次数的控制.
     */
    public static RetryPolicy getRetryPolicy() {return new ExponentialBackoffRetry(1000, 3);
    }
}

五、分布式锁

public class ZookeeperLock {

    private final String lockPath = "/distributed-lock";
    private String connectString;
    private RetryPolicy retry;
    private CuratorFramework client;
    private InterProcessLock interProcessMutex;


    public void init() throws Exception {
        connectString = "211.159.174.226:2181";
        retry = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
        client.start();

        // 共享可重入锁
        interProcessMutex = new InterProcessMutex(client,lockPath);
    }

    public void lock(){
        try {interProcessMutex.acquire();
        } catch (Exception e) {System.out.println("锁失败了, 真惨");
        }
    }

    public void unlock(){
        try {interProcessMutex.release();
        } catch (Exception e) {System.out.println("释放失败了,更惨");
        }
    }

    public static void main(String[] args) throws Exception {final ZookeeperLock zookeeperLock = new ZookeeperLock();
        zookeeperLock.init();

        Executor executor = Executors.newFixedThreadPool(5);
        for (int i = 0;i<50;i++) {executor.execute(new Runnable() {
                @Override
                public void run() {zookeeperLock.lock();
                    Long time = System.nanoTime();
                    System.out.println(time);
                    try {Thread.sleep(1000);
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                    System.out.println(time);
                    zookeeperLock.unlock();}
            });
        }

        while (true){}}
}

六、zab 协议

  1. ZAB 协议所定义的三种节点状态

    • Looking:选举状态。
    • Following:Follower 节点(从节点)所处的状态。
    • Leading:Leader 节点(主节点)所处状态。
  2. Zxid(64 位的数据结构)
    前 32 位:Leader 周期编号 myid
    低 32 位:事务的自增序列(单调递增的序列)只要客户端有请求,就 +1
    当产生新 Leader 的时候,就从这个 Leader 服务器上取出本地 log 中最大事务 zxid,从里面读出 epoch+1,作为一个新 epoch,并将低 32 位置 0(保证 id 绝对自增)。
  3. 崩溃恢复

    • 每个 server 都有一张选票 <myid,zxid>,选票投自己。
    • 搜集各个服务器的投票。
    • 比较投票,比较逻辑:优先比较 zxid,然后才比较 myid。
    • 改变服务器状态(崩溃恢复 =》数据同步,或者崩溃恢复 =》消息广播)

  1. 消息广播(类似 2P 提交):

    • Leader 接受请求后,讲这个请求赋予全局的唯一 64 位自增 Id(zxid)。
    • 将 zxid 作为议案发给所有 follower。
    • 所有的 follower 接受到议案后,想将议案写入硬盘后,马上回复 Leader 一个 ACK(OK)。
    • 当 Leader 接受到合法数量 Acks,Leader 给所有 follower 发送 commit 命令。
    • follower 执行 commit 命令。
    • PS:: 到了这个阶段,ZK 集群才正式对外提供服务,并且 Leader 可以进行消息广播,如果有新节点加入,还需要进行同步。

更多文章关注博客:https://www.zplxjj.com 和公众号

退出移动版