乐趣区

zookeeper-快速入门

zookeeper

zookeeper 是什么

Apache ZooKeeper 是 Apache 软件基金会的一个软件项目,他为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。ZooKeeper 曾经是 Hadoop 的一个子项目,但现在是一个独立的顶级项目。

ZooKeeper 的架构通过冗余服务实现高可用性。因此,如果第一次无应答,客户端就可以询问另一台 ZooKeeper 主机。ZooKeeper 节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。

使用 ZooKeeper 的公司包括 Rackspace、雅虎和 eBay,以及类似于像 Solr 这样的开源企业级搜索系统。

zookeeper 提供了什么

  • 文件系统:zookeeper 维护一个类似文件系统的数据结构,每个子目录项如 NameService 都被称作为 znode,和文件系统一样,自由增加及删除,唯一不同其可存储数据。Znode 分为四种类型

    • PERSISTENT- 持久化目录节点。(客户端与 zookeeper 断开连接后,该节点依旧存在)。
    • PERSISTENT_SEQUENTIAL- 持久化顺序编号目录节点。(客户端与 zookeeper 断开连接后,该节点依旧存在,只是 Zookeeper 给该节点名称进行顺序编号)
    • EPHEMERAL- 临时目录节点(客户端与 zookeeper 断开连接后,该节点被删除)
    • EPHEMERAL_SEQUENTIAL- 临时顺序编号目录节点。(客户端与 zookeeper 断开连接后,该节点被删除,只是 Zookeeper 给该节点名称进行顺序编号)
  • 通知机制:客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper 会通知客户端。

zookeeper 能为我们做什么?

  • 命名服务:在 zookeeper 的文件系统里创建一个目录,即有唯一的 path。在我们使用 tborg 无法确定上游程序的部署机器时即可与下游程序约定好 path,通过 path 即能互相探索发现。
  • 配置管理:把应用配置放置 zookeeper 上去, 保存在 Zookeeper 的某个目录节点中,然后所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中就好。
  • 集群管理:节点(机器)增删及 Master 选取。节点增删:所有机器约定在父目录 GroupMembers 下创建临时目录节点,然后监听父目录节点的子节点变化消息。一旦有机器挂掉,该机器与 zookeeper 的连接断开,其所创建的临时目录节点被删除,所有其他机器都收到通知:某个兄弟目录被删除,于是,所有人都知道:它上船了。新机器加入 也是类似,所有机器收到通知:新兄弟目录加入,highcount 又有了。Master 选取:所有机器创建临时顺序编号目录节点,每次选取编号最小的机器作为 master 就好。
  • 分布式锁:基于 zookeeper 一致性文件系统, 实现锁服务。锁服务分为保存独占及时序控制两类。保存独占:将 zookeeper 上的一个 znode 看作是一把锁,通过 createznode 的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。用完删除自己创建的 distribute_lock 节点就释放锁。时序控制:基于 /distribute_lock 锁,所有客户端在它下面创建临时顺序编号目录节点,和选 master 一样,编号最小的获得锁,用完删除,依次方便。
  • 队列管理:分同步队列,FIFO 队列(入队与出队),同步队列:当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。在约定目录下创建临时目录节点,监听节点数目是否是我们要求的数目。FIFO 队列:和分布式锁服务中的控制时序场景基本原理一致,入列有编号,出列按编号。
  • 分布式与数据复制:Zookeeper 作为一个集群提供一致的数据服务,必然在所有机器间做数据复制。数据复制好处:(1)容错:一个节点出错,不致于让整个系统停止工作,别的节点可以接管它的工作。(2)提高系统的扩展能力:把负载分布到多个节点上,或者增加节点来提高系统的负载能力;(3)性能提升:让客户端本地访问就近节点, 提高用户访问速度。

zookeeper 基本概念

角色简介

Zookeeper 角色分为三类,领导者:负责进行投票的发起和决议, 更新系统状态。跟随者:Follower 用于接收客户请求并向客户端返回结果,在选中过程中参与投票。观察者:Observer 可以接收客户端连接,将写请求转发给 leader 节点。但不参加投票过程, 只同步 leader 状态。Observer 目的在于扩展系统,提高读取速度。

设计目的

  • 一致性:client 不论连接到哪个 Server,展示给它都是同一个视图,这是 zookeeper 最重要的性能。
  • 可靠性:具有简单、健壮、良好的性能,如果消息 m 被到一台服务器接受,那么它将被所有的服务器接受。
  • 实时性:Zookeeper 保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。但由于网络延时等原因,Zookeeper 不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用 sync()接口。
  • 等待无关(wait-free):慢的或者失效的 client 不得干预快速的 client 的请求,使得每个 client 都能有效的等待。
  • 原子性:更新只能成功或者失败,没有中间状态。
  • 顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息 a 在消息 b 前发布,则在所有 Server 上消息 a 都将在消息 b 前被发布;偏序是指如果一个消息 b 在消息 a 后被同一个发送者发布,a 必将排在 b 前面。

选主流程

当 leader 崩溃或者 leader 失去大多数的 follower,这时候 zk 进入恢复模式,恢复模式需要重新选举出一个新的 leader,让所有的 Server 都恢复到一个正确的状态。Zk 的选举算法有两种:一种是基于 basic paxos 实现的,另外一种是基于 fast paxos 算法实现的。系统默认的选举算法为 fast paxos。先介绍 basic paxos 流程:

选举线程由当前 Server 发起选举的线程担任,其主要功能是对投票结果进行统计,并选出推荐的 Server;
选举线程首先向所有 Server 发起一次询问 (包括自己);
选举线程收到回复后,验证是否是自己发起的询问 (验证 zxid 是否一致),然后获取对方的 id(myid),并存储到当前询问对象列表中,最后获取对方提议的 leader 相关信息(id,zxid),并将这些信息存储到当次选举的投票记录表中;
收到所有 Server 回复以后,就计算出 zxid 最大的那个 Server,并将这个 Server 相关信息设置成下一次要投票的 Server;
线程将当前 zxid 最大的 Server 设置为当前 Server 要推荐的 Leader,如果此时获胜的 Server 获得 n /2 + 1 的 Server 票数,设置当前推荐的 leader 为获胜的 Server,将根据获胜的 Server 相关信息设置自己的状态,否则,继续这个过程,直到 leader 被选举出来。
通过流程分析我们可以得出:要使 Leader 获得多数 Server 的支持,则 Server 总数必须是奇数 2n+1,且存活的 Server 的数目不得少于 n +1.

每个 Server 启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的 server 还会从磁盘快照中恢复数据和会话信息,zk 会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。

fast paxos 流程是在选举过程中,某 Server 首先向所有 Server 提议自己要成为 leader,当其它 Server 收到提议以后,解决 epoch 和 zxid 的冲突,并接受对方的提议,然后向对方发送接受提议完成的消息,重复这个流程,最后一定能选举出 Leader。

zookeeper 的安装使用

wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz

tar zxvf zookeeper-3.4.8.tar.gz -C /usr/local/

cd $ZOOKEEPER_HOME

cp conf/zoo_sample.cfg conf/zoo.cfg

# 集群需要在 zoo.cfg 配置

server.1=192.168.1.148:2888:3888
server.2=192.168.1.149:2888:3888
server.3=192.168.1.150:2888:3888


# 在 zookeeper 的临时目录创建 myid
mkdir -p /tmp/zookeeper

# 分别在不同节点创建 myid 文件里面的数字对应节点的编号比如 server.1 就对应 1,server.2 就对应 2
echo 1 > /tmp/zookeeper/myid

# 最后分别启动集群上的节点
$ZOOKEEPER_HOME/bin/zkServer.sh start

# 查看 zookeeper 的状态
$ZOOKEEPER_HOME/bin/zkServer.sh status

# 停止 zookeeper 服务
$ZOOKEEPER_HOME/bin/zkServer.sh stop 

zookeeper 命令行操作

启动 zookeeper 服务后到 bin 目录启动 zookeeper 的客户端 $ZOOKEEPER_HOME/bin/zkCli.sh

# 输入 help

[zk: localhost:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
        stat path [watch]
        set path data [version]
        ls path [watch]
        delquota [-n|-b] path
        ls2 path [watch]
        setAcl path acl
        setquota -n|-b val path
        history 
        redo cmdno
        printwatches on|off
        delete path [version]
        sync path
        listquota path
        rmr path
        get path [watch]
        create [-s] [-e] path data acl
        addauth scheme auth
        quit 
        getAcl path
        close 
        connect host:port

创建节点

[zk: localhost:2181(CONNECTED) 14] create /test test-data
Created /test

查看节点

[zk: localhost:2181(CONNECTED) 1] ls /
[abc, zookeeper, eclipse]

获取节点

[zk: localhost:2181(CONNECTED) 10] get /test
test-update
cZxid = 0x38
ctime = Sat Dec 22 10:11:46 CST 2018
mZxid = 0x39
mtime = Sat Dec 22 10:12:05 CST 2018
pZxid = 0x38
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0

修改节点

[zk: localhost:2181(CONNECTED) 11] set /test test-update
cZxid = 0x38
ctime = Sat Dec 22 10:11:46 CST 2018
mZxid = 0x3a
mtime = Sat Dec 22 10:15:04 CST 2018
pZxid = 0x38
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0

删除节点

[zk: localhost:2181(CONNECTED) 13] delete /test

zookeeper java 客户端操作



/**
 * @author leone
 * @since 2018-06-16
 **/
public class ZkClient {private final static Logger logger = LoggerFactory.getLogger(ZkClient.class);

    private final static String ZK_URL = "xxx.xxx.xxx.xxx:2181";

    private final static int TIME_OUT = 5000;

    private static ZooKeeper zkClient = null;


    @Before
    public void init() throws Exception {zkClient = new ZooKeeper(ZK_URL, TIME_OUT, (WatchedEvent event) -> {
            // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)logger.info(event.getType() + "---" + event.getPath());
            try {zkClient.getChildren("/", true);
            } catch (Exception e) {e.printStackTrace();
            }
        });
    }

    /**
     * 设置值
     *
     * @throws Exception
     */
    @Test
    public void testSetData() throws Exception {zkClient.setData("/eclipse", "world".getBytes(), -1);
        byte[] data = zkClient.getData("/eclipse", false, null);
        System.out.println(new String(data));
    }

    /**
     * 创建节点
     *
     * @throws Exception
     */
    @Test
    public void testCreate() throws Exception {
        // 参数 1:要创建的节点的路径 参数 2:节点数据 参数 3:节点的权限 参数 4:节点的类型
        zkClient.create("/eclipse/aaa", "aaaData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }


    /**
     * 测试某节点是否存在
     *
     * @throws Exception
     */
    @Test
    public void testExists() throws Exception {Stat stat = zkClient.exists("/eclipse", false);
        System.out.println(stat == null ? "not exist" : "exist");
    }

    /**
     * 获取子节点
     *
     * @throws Exception
     */
    @Test
    public void testGetChild() throws Exception {List<String> children = zkClient.getChildren("/", true);
        for (String child : children) {System.out.println(child);
        }
    }

    /**
     * 删除节点
     *
     * @throws Exception
     */
    @Test
    public void testDelete() throws Exception {
        // 参数 2:指定要删除的版本,- 1 表示删除所有版本
        zkClient.delete("/abc", -1);
    }


    /**
     * 获取节点的数据
     *
     * @throws Exception
     */
    @Test
    public void testGetDate() throws Exception {byte[] data = zkClient.getData("/eclipse", false, null);
        System.out.println(new String(data));
    }

}
退出移动版