乐趣区

大数据系列6ZooKeeper

1. ZooKeeper

开源的分布式的协调服务, 是 Google 的 Chubby 一个开源的实现, 它是一个为分布式应用提供一致性服务的软件

2. ZooKeeper 提供的功能

  • 配置维护
  • 域名服务
  • 分布式锁
  • 组服务

3. ZooKeeper 的特点

  • 简单

    • ZooKeeper 的核心是一个精简的 文件系统 , 它支持一些简单的操作和一些抽象操作
  • 丰富

    • ZooKeeper 的操作是很丰富的,可实现一些协调数据结构和协议。例如,分布式队列、分布式锁和一组同级别节点中的“领导者选举”
  • 高可靠

    • ZooKeeper 支持集群模式,可以很容易的解决单点故障问题
  • 松耦合交互

    • 不同进程间的交互不需要了解彼此,甚至可以不必同时存在,某进程在 ZooKeeper 中留下消息后,该进程结束后其它进程还可以读这条消息
  • 资源库

    • ZooKeeper 实现了一个关于通用协调模式的开源共享存储库,能使开发者免于编写这类通用协议

4. ZooKeeper 的角色

  • leader(领导者)

    • 负责进行投票的发起和决议,更新系统状态
  • learner (学习者)

    • 包括跟随者 (follower) 和观察者(observer),follower 用于接

      受客户端请求并向客户端返回结果,在选举过程中参与投票。Observer 可以接受客户端连接,

      将写请求转发给 leader,但 observer 不参与投票过程,只同步 leader 的状态,observer 的

      目的是为了扩展系统,提高读取速度

  • 客户端(client)

    • 请求发起方

5. ZooKeeper 数据模型

  • 层次化的目录结构,命名符合常规文件系统规范
  • 每个节点在 zookeeper 中叫做znode, 并且其有一个唯一的路径标识
  • 节点 znode 可以包含数据和子节点,但是 EPHEMERAL 类型的节点不能有子节点
  • znode 中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本
  • 客户端应用可以在节点上设置监视器
  • 节点不支持部分读写,而是一次性完整读写

6. ZooKeeper 的节点

  • zookeeper 中的节点包含下面两个类型的节点

    • 临时节点(ephemeral)
    • 持久节点(persistent)

znode 的类型在创建时确定并且之后不能再修改

znode 默认不指定类型是持久节点

  • ephemeral 类型的节点

    • 在节点客户端会话结束时, 会将 zookeeper 中的节点删除
    • 不能有子节点
  • persistent 节点不依赖与客户端会话,只有当客户端明确要删除该 persistent 节点时才会被删除
  • ZooKeeper 的客户端和服务器通信采用长连接方式 , 每个客户端和服务器通过心跳来保持连接,这个连接状态称之为 session , 如果 znode 是临时节点,这个 seesion 失效,znode 也就删除了

7. ZooKeeper 中的选举机制

  • 服务器的 ID

    • 分别 1,2,3
    • 编号越大在选择算法中的 权重 越大
  • 数据的 ID

    • 服务器中存放的最大数据 ID.
    • 值越大说明数据越新,在选举算法中数据越新权重越大
  • 编辑时钟

    • 投票的次数
    • 同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断
  • 选举状态

    • LOOKING,竞选状态。
    • FOLLOWING,随从状态,同步 leader 状态,参与投票。
    • OBSERVING,观察状态, 同步 leader 状态,不参与投票。
    • LEADING,领导者状态。
  • 选举信息的内容

    ​ 在投票完成后,需要将投票信息发送给集群中的所有服务器,它包含如下内容。

    • 服务器 ID
    • 数据 ID
    • 逻辑时钟
    • 选举状态
  • 选举机制的结果

    • zk 启动之后通过选举机制, 来选举出来一个 leader

8. ZooKeeper 集群搭建

安装 zookeeper 集群要求大于 1 的奇数台机器

8.1 准备安装包

zookeeper-3.4.6.tar.gz

8.2 解压

tar -zxvf zookeeper-3.4.6.tar.gz -C /opt/

8.3 重命名

mv zookeeper-3.4.6/ zookeeper

8.4 配置环境变量

export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=PATH:ZOOKEEPER_HOME/bin

8.5 配置 zookeeper

cp zoo_sample.cfg zoo.cfg

dataDir=/opt/zookeeper/data 

8.6 myid 文件

在 zookeeper 的各个机器中分别创建 myid 文件(/opt/zookeeper/data), 内容分别为

1 2 3

8.7 配置 zoo.cfg

server.1=uplooking03:2888:3888
server.2=uplooking04:2888:3888
server.3=uplooking05:2888:3888

8.8 查看 zookeeper 集群的时间

保证 zookeeper 集群中的时间不能有超过 20 秒的误差

ntpdate -u ntp.api.bz 根据时间同步服务器同步时间,- u 是绕过防火请

8.9 启动 zookeeper 服务

zkServer.sh start

9. ZooKeeper

9.1 zookeeper 的 Shell 操作

  • create znodename data : 创建 znode
  • get znodename : 查看 znode
  • set znodename data : 修改 znode 的数据
  • rmr znodename : 删除 znode 节点
  • quit : 退出会话
  • delete : 删除节点
  • setquota -b 长度配额 : 设置节点长度配额
  • setquota -n 数量配额 : 设置节点数量配额
  • listquoat path : 列出配额
  • delquota path : 删除配额

    • zookeeper 中的配额管理超出配额的大小之后依然可以进行操作
  • create -e znode : 创建临时节点

    • 临时节点不能创建子节点
    • 临时节点可以有数据
    • 临时节点会话结束时会自动删除

9.2 zookeeper 原生的 API 操作

package com.uplooking.bigdata.zookeeper;

import org.apache.zookeeper.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class ZookeeperTest {
    private ZooKeeper zooKeeper;

    @Before
    public void init() throws Exception {
        String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
        zooKeeper = new ZooKeeper(connStr, 3000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {System.out.println("watch..." + watchedEvent.getType());
            }
        });
    }

    /**
     * 创建节点
     */
    @Test
    public void testCreateZNode() throws Exception {
        String path = "/test01";
        zooKeeper.exists(path, true);
        String ret = zooKeeper.create(path, "HELLO2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(ret);
    }

    @Test
    public void testSetZnode() throws KeeperException, InterruptedException {zooKeeper.setData("/test02", "uplooking02".getBytes(), 1);
    }


    @Test
    public void testGetZnode() throws KeeperException, InterruptedException {byte[] data = zooKeeper.getData("/test02", true, null);
        System.out.println(new String(data, 0, data.length));
    }

    @Test
    public void testDeleteZnode() throws KeeperException, InterruptedException {zooKeeper.delete("/test02", -1);
    }

    @After
    public void destory() throws Exception {zooKeeper.close();
    }
}
  • 实现注册监听的方法

    • exists
    • getData
    • getChild
  • 事件的类型

    • NodeCreated
    • NodeDeleted
    • NodeDataChanged
    • …….

10 ZooKeeper 客户端神器 Curator

  • Netflix 公司开源的一套 Zookeeper 客户端框架
  • 封装了原生的 zookeeper 的 API 操作
  • apache 的顶级项目
  • 基于 Fluent 的编程风格(链式编程)
  • Curator 有 2.x.x 和 3.x.x 两个系列的版本,支持不同版本的 Zookeeper。其中 Curator 2.x.x 兼容 Zookeeper 的 3.4.x 和 3.5.x。而 Curator 3.x.x 只兼容 Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch 删除等新特性
  • 推荐使用 curator2.x 的版本

11 Curator 中的组件

名称 描述
Recipes Zookeeper 典型应用场景的实现,这些实现是基于 Curator Framework。
Framework Zookeeper API 的高层封装,大大简化 Zookeeper 客户端编程,添加了例如 Zookeeper 连接管理、重试机制等。
Utilities 为 Zookeeper 提供的各种实用程序。
Client Zookeeper client 的封装,用于取代原生的 Zookeeper 客户端(ZooKeeper 类),提供一些非常有用的客户端特性。
Errors Curator 如何处理错误,连接问题,可恢复的例外等。

12 依赖 POM

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

package com.uplooking.bigdata.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest {
    public String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
    private CuratorFramework zkClient;

    @Before
    public void init() {zkClient = CuratorFrameworkFactory.newClient(connStr, new RetryForever(6000));
        zkClient.start();}

    /**
     * 创建空节点(其实不是空节点, 是给节点默认设置了 ip 地址)
     *
     * @throws Exception
     */
    @Test
    public void createZnode() throws Exception {zkClient.create().forPath("/test03");
    }

    @Test
    public void createZnode1() throws Exception {zkClient.create().forPath("/test02", "h".getBytes());
    }


    @Test
    public void deleteZnode() throws Exception {zkClient.delete().deletingChildrenIfNeeded().forPath("/test02");
    }

    @Test
    public void setZnode() throws Exception {zkClient.setData().forPath("/test03");
    }

    @Test
    public void getZnode() throws Exception {byte[] bytes = zkClient.getData().forPath("/test03");
        System.out.println(new String(bytes, 0, bytes.length));
    }
}

10 ZooKeeper 客户端神器 Curator

10.1 Curator 简介

  • Netflix 公司开源的一套 Zookeeper 客户端框架
  • 封装了原生的 zookeeper 的 API 操作
  • apache 的顶级项目
  • 基于 Fluent 的编程风格(链式编程)
  • Curator 有 2.x.x 和 3.x.x 两个系列的版本,支持不同版本的 Zookeeper。其中 Curator 2.x.x 兼容 Zookeeper 的 3.4.x 和 3.5.x。而 Curator 3.x.x 只兼容 Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch 删除等新特性
  • 推荐使用 curator2.x 的版本

10.2 Curator 中的组件

名称 描述
Recipes Zookeeper 典型应用场景的实现,这些实现是基于 Curator Framework。
Framework Zookeeper API 的高层封装,大大简化 Zookeeper 客户端编程,添加了例如 Zookeeper 连接管理、重试机制等。
Utilities 为 Zookeeper 提供的各种实用程序。
Client Zookeeper client 的封装,用于取代原生的 Zookeeper 客户端(ZooKeeper 类),提供一些非常有用的客户端特性。
Errors Curator 如何处理错误,连接问题,可恢复的例外等。

10.3 Curator 的基本 API 操作

/*Curator 的基本的节点操作 */
package com.uplooking.bigdata.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest {
    public String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
    private CuratorFramework zkClient;

    @Before
    public void init() {zkClient = CuratorFrameworkFactory.newClient(connStr, new RetryForever(6000));
        zkClient.start();}

    /**
     * 创建空节点(其实不是空节点, 是给节点默认设置了 ip 地址)
     *
     * @throws Exception
     */
    @Test
    public void createZnode() throws Exception {zkClient.create().forPath("/test03");
    }

    @Test
    public void createZnode1() throws Exception {zkClient.create().forPath("/test02", "h".getBytes());
    }


    @Test
    public void deleteZnode() throws Exception {zkClient.delete().deletingChildrenIfNeeded().forPath("/test02");
    }

    @Test
    public void setZnode() throws Exception {zkClient.setData().forPath("/test03");
    }

    @Test
    public void getZnode() throws Exception {byte[] bytes = zkClient.getData().forPath("/test03");
        System.out.println(new String(bytes, 0, bytes.length));
    }
}

10.4 Curator 的监听器

  • PathChildrenCache(监听一级子节点的改变)

    • 监听的节点如果不存在则会自动创建一个
    • 监听的节点如果途中被删除了, 那么监听器则无效了
  • NodeCache(当前节点的的改变)

    • 节点的改变(节点的添加也属于节点的改变)
    • 节点的删除
  • TreeCache(当前节点以及后代节点的改变)

10.4.1 NodeCache 监听器

String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
RetryPolicy retryPolicy = new RetryNTimes(3, 3000);
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(connStr, retryPolicy);
zkClient.start();
String path = "/test01";
// 创建监听监听器
NodeCache nodeCache = new NodeCache(zkClient, path);
nodeCache.start();

nodeCache.getListenable().addListener(new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {if (nodeCache.getCurrentData() == null) {System.out.println("删除了节点...."+path);
        } else {System.out.println("节点改变.." + "路径为:" + nodeCache.getCurrentData().getPath() + "数据为:" + new String(nodeCache.getCurrentData().getData()));
        }
    }
});
Thread.sleep(Integer.MAX_VALUE);
zkClient.close();

10.4.2 PathChildrenCache 监听器

String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
RetryPolicy retryPolicy = new RetryNTimes(3, 3000);
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(connStr, retryPolicy);
zkClient.start();
String path = "/test01";
// 创建监听监听器
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, path, true);
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("一级子节点改变.."+event.getData()+ event.getType());
    }
});
Thread.sleep(Integer.MAX_VALUE);
zkClient.close();
退出移动版