本文视频教程

https://www.bilibili.com/vide...

关注公众号 学习更多精彩课程

应用三大外围语言Java、Python、Golang玩转zookeeper

应用Java原生api操作zookeeper

创立一个maven我的项目

创立一个maven我的项目

增加我的项目依赖

<dependencies>    <dependency>        <groupId>org.apache.zookeeper</groupId>        <artifactId>zookeeper</artifactId>        <version>3.4.8</version>    </dependency></dependencies>
https://mvnrepository.com/

实现代码

package com.duoke360;import org.apache.zookeeper.*;import java.io.IOException;import java.util.concurrent.CountDownLatch;public class Test {    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {        String connStr = "192.168.18.128:2181";        CountDownLatch countDown = new CountDownLatch(1);        Watcher watcher= event -> {            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {                System.err.println("eventType:"+event.getType());                if(event.getType()== Watcher.Event.EventType.None){                    countDown.countDown();                }else if(event.getType()== Watcher.Event.EventType.NodeCreated){                    System.out.println("listen:节点创立");                }else if(event.getType()== Watcher.Event.EventType.NodeChildrenChanged){                    System.out.println("listen:子节点批改");                }            }        };        ZooKeeper zookeeper = new ZooKeeper(connStr, 5000,watcher);        countDown.await();        //注册监听,每次都要从新注册,否则监听不到        // 先创立一个根节点root        zookeeper.exists("/root/ghz", watcher);        // 创立节点        String result = zookeeper.create("/root/ghz", "老郭".getBytes(),                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        System.out.println(result);        Thread.sleep(10);        // 获取节点        byte[] bs = zookeeper.getData("/root/ghz", true, null);        result = new String(bs);        System.out.println("创立节点后的数据是:" + result);        // 批改节点        zookeeper.setData("/root/ghz", "多课网-老郭".getBytes(), -1);        Thread.sleep(10);        bs = zookeeper.getData("/root/ghz", true, null);        result = new String(bs);        System.out.println("批改节点后的数据是:" + result);        // 删除节点        zookeeper.delete("/root/ghz", -1);        System.out.println("节点删除胜利");    }}

运行后果

eventType:NoneeventType:NodeCreatedlisten:节点创立/root/ghz创立节点后的数据是:老郭eventType:NodeDataChanged批改节点后的数据是:多课网-老郭节点删除胜利eventType:NodeDeleted

应用java zkclient库操作zookeeper.md

创立一个maven我的项目

创立一个maven我的项目

增加依赖

<!-- https://mvnrepository.com/artifact/com.101tec/zkclient --><dependency>    <groupId>com.101tec</groupId>    <artifactId>zkclient</artifactId>    <version>0.10</version></dependency>

实现代码

package com.duoke360;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.apache.zookeeper.*;import java.util.List;public class Test {    public static void main(String[] args) throws InterruptedException {        String connStr = "192.168.18.128:2181";        ZkClient zk = new ZkClient(connStr);        // 注册【数据】事件        zk.subscribeDataChanges("/root/ghz", new IZkDataListener() {            @Override            public void handleDataDeleted(String arg0) throws Exception {                System.err.println("数据删除:" + arg0);            }            @Override            public void handleDataChange(String arg0, Object arg1) throws Exception {                System.err.println("数据批改:" + arg0 + "------" + arg1);            }        });        zk.subscribeChildChanges("/root", (arg0, arg1) -> {            System.err.println("子节点发生变化:" + arg0);            arg1.forEach(f -> {                System.out.println("content:" + f);            });        });        List<String> list = zk.getChildren("/");        list.forEach(e -> {            System.out.println(e);        });        String res = zk.create("/root/ghz", "多课网-老郭", CreateMode.PERSISTENT);        System.out.println("创立节点/root/ghz胜利:" + res);        zk.writeData("/root/ghz", "多课网-zookeeper");        System.out.println("批改节点/root/ghz数据胜利");        res = zk.readData("/root/ghz");        System.out.println("节点数据:" + res);        Thread.sleep(1000);        zk.delete("/root/ghz");        System.out.println("删除节点/root/ghz胜利");        Thread.sleep(1000);        System.out.println("------------------------------------------------");        for (int i = 0; i < 10; i++) {            zk.create("/root/ghz", "多课网-老郭", CreateMode.PERSISTENT);            Thread.sleep(1000);            zk.delete("/root/ghz");            Thread.sleep(1000);        }    }}

运行后果

node20000000002zookeepernode10000000001root创立节点/root/ghz胜利:/root/ghz批改节点/root/ghz数据胜利节点数据:多课网-zookeepercontent:ghz数据批改:/root/ghz------多课网-zookeeper子节点发生变化:/root数据批改:/root/ghz------多课网-zookeeper删除节点/root/ghz胜利数据删除:/root/ghz子节点发生变化:/root------------------------------------------------数据批改:/root/ghz------多课网-老郭子节点发生变化:/rootcontent:ghz数据删除:/root/ghz子节点发生变化:/root数据批改:/root/ghz------多课网-老郭子节点发生变化:/rootcontent:ghz数据删除:/root/ghz子节点发生变化:/root数据批改:/root/ghz------多课网-老郭子节点发生变化:/root

应用Java curator库操作zookeeper

创立一个maven我的项目

创立一个maven我的项目

增加依赖

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --><dependency>    <groupId>org.apache.curator</groupId>    <artifactId>curator-recipes</artifactId>    <version>4.2.0</version></dependency>

实现代码

package com.duoke360;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.cache.PathChildrenCache;import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.data.Stat;public class Test {    public static void main(String[] args) throws Exception {        String connStr = "192.168.18.128:2181";        CuratorFramework cur= CuratorFrameworkFactory.builder()                .connectString(connStr)                .connectionTimeoutMs(5000)                .retryPolicy(new ExponentialBackoffRetry(1000,3))                .build();        cur.start();//连贯        //创立监听        PathChildrenCache cache=new PathChildrenCache(cur,"/root",true);        cache.start();        cache.rebuild();        cache.getListenable().addListener(new PathChildrenCacheListener() {            @Override            public void childEvent(CuratorFramework framwork, PathChildrenCacheEvent event) throws Exception {                System.err.println("节点发生变化:"+event.getType());            }        });        Stat stat=cur.checkExists().forPath("/root/ghz");        if(stat!=null){            System.out.println("/root/ghz 节点存在,间接删除");            cur.delete().forPath("/root/ghz");        }        System.in.read();        System.out.println("筹备创立 /root/ghz");        cur.create().withMode(CreateMode.PERSISTENT)                .forPath("/root/ghz", "多课网-老郭".getBytes());        System.out.println("节点 /root/ghz 创立胜利");        Thread.sleep(1000);        byte[] bs=cur.getData().forPath("/root/ghz");        System.out.println("数据:"+new String(bs));        Thread.sleep(1000);        cur.delete().forPath("/root/ghz");        Thread.sleep(1000);    }}

运行后果

test筹备创立 /root/ghz节点 /root/ghz 创立胜利节点发生变化:CHILD_ADDED数据:多课网-老郭节点发生变化:CHILD_REMOVED

应用Python操作zookeeper

装置

pip install kazoo

连贯 ZooKeeper

可通过 KazooClient 类间接连贯 ZooKeeper ,反对多个 host ,端口默认 2181。

import jsonfrom kazoo.client import KazooClientzk = KazooClient(hosts='192.168.18.128:2181')zk.start()

创立节点

先看下 create() 办法定义

def create(self, path, value=b"", acl=None, ephemeral=False,               sequence=False, makepath=False):        :param path: Path of node.        :param value: Initial bytes value of node.        :param acl: :class:`~kazoo.security.ACL` list.        :param ephemeral: Boolean indicating whether node is ephemeral                          (tied to this session).        :param sequence: Boolean indicating whether path is suffixed                         with a unique index.        :param makepath: Whether the path should be created if it                         doesn't exist.

咱们来解释下这些参数:

  • path: 节点门路
  • value: 节点对应的值,留神值的类型是 bytes
  • ephemeral: 若为 True 则创立一个长期节点,session 中断后主动删除该节点。默认 False
  • sequence: 若为 True 则在你创立节点名前面减少10位数字(例如:你创立一个 testplatform/test 节点,理论创立的是 testplatform/test0000000003,这串数字是程序递增的)。默认 False
  • makepath: 若为 False 父节点不存在时抛 NoNodeError。若为 True 父节点不存在则创立父节点。默认 False

实例

from kazoo.client import KazooClientzk = KazooClient(hosts='192.168.18.128:2181')zk.start()# 创立节点:makepath 设置为 True ,父节点不存在则创立,其余参数不填均为默认zk.create('/root/ghz',b'',makepath=True)# 操作完后,别忘了敞开zk连贯zk.stop()

查看节点

KazooClient 类用提供 get_children()get() 办法获取 子节点 和 节点对应的值

from kazoo.client import KazooClientzk = KazooClient(hosts='192.168.18.128:2181')zk.start()# 获取某个节点下所有子节点node = zk.get_children('/root')# 获取某个节点对应的值value = zk.get('/root/ghz')# 操作完后,别忘了敞开zk连贯zk.stop()print(node,value)

更改节点

更改上文创立的 node 值,应用 set() 办法

from kazoo.client import KazooClientzk = KazooClient(hosts='192.168.18.128:2181')zk.start()# 更改节点对应的valuezk.set('/root/ghz', b'duoke-ghz')# 获取某个节点对应的值value = zk.get('/root/ghz')zk.stop()print(value)

删除节点

删除上文创立的节点,应用 delete() 办法

from kazoo.client import KazooClientzk = KazooClient(hosts='192.168.18.128:2181')zk.start()# 删除节点对应的valuezk.delete('/root/ghz',recursive=False)zk.stop()
参数 recursive:若为 False,当须要删除的节点存在子节点,会抛异样 NotEmptyError 。若为True,则删除 此节点 以及 删除该节点的所有子节点

watches 事件

zookeeper 所有读操作都有设置 watch 选项(get_children() 、get() 和 exists())。watch 是一个触发器,当检测到 zookeeper 有子节点变动 或者 节点value产生变动时触发。上面以 get() 办法为例。

from kazoo.client import KazooClientzk = KazooClient(hosts='192.168.18.128:2181')zk.start()def test(event):    print('触发事件')if __name__ == "__main__":    # 节点必须存在,否则保留    zk.get('/root/ghz', watch=test)    print("第一次获取value")    zk.set('/root/ghz', b'duoke-ghz')    zk.get('/root/ghz', watch=test)    zk.set('/root/ghz', b'duoke-ghz2')    print("第二次获取value")    zk.stop()

运行后果

第一次获取value触发事件第二次获取value

应用golang操作zookeeper

下载包

go get github.com/samuel/go-zookeeper

连贯到Server

package mainimport (    "fmt"    "time"    "github.com/samuel/go-zookeeper/zk")func conn() *zk.Conn {    var hosts = []string{"192.168.18.128:2181"}    conn, _, err := zk.Connect(hosts, time.Second*5)    defer conn.Close()    if err != nil {        fmt.Println(err)        return nil    } else {        fmt.Println("连贯胜利!")        return conn    }}func main() {    conn()}

创立节点

func create() {    var conn *zk.Conn = conn()    defer conn.Close()    var path = "/home"    var data = []byte("多课网")    var flags int32 = 0    //flags有4种取值:    //0:永恒,除非手动删除    //zk.FlagEphemeral = 1:短暂,session断开则改节点也被删除    //zk.FlagSequence  = 2:会主动在节点前面增加序号    //3:Ephemeral和Sequence,即,短暂且主动增加序号    var acls = zk.WorldACL(zk.PermAll) //管制拜访权限模式    p, err_create := conn.Create(path, data, flags, acls)    if err_create != nil {        fmt.Println(err_create)        return    }    fmt.Println("create:", p)}

批改节点

func set() {    var conn *zk.Conn = conn()    defer conn.Close()    var path = "/home"    var data = []byte("多课网-老郭")    conn.Set(path, data, -1)    b, _, _ := conn.Get(path)    fmt.Println("数据:" + string(b))}

删除节点

func del() {    var conn *zk.Conn = conn()    defer conn.Close()    var path = "/home"    err := conn.Delete(path, -1)    if err != nil {        fmt.Println("删除失败!")    } else {        fmt.Println("删除胜利!")    }}

watch

func callback(event zk.Event) {    fmt.Println("*******************")    fmt.Println("path:", event.Path)    fmt.Println("type:", event.Type.String())    fmt.Println("state:", event.State.String())    fmt.Println("*******************")}func watch() {    var hosts = []string{"192.168.18.128:2181"}    option := zk.WithEventCallback(callback)    conn, _, err := zk.Connect(hosts, time.Second*5, option)    defer conn.Close()    if err != nil {        fmt.Println(err)        return    }    var path = "/home"    _, _, _, err = conn.ExistsW(path)    if err != nil {        fmt.Println(err)        return    }    // 创立    create(conn)    time.Sleep(time.Second * 2)    _, _, _, err = conn.ExistsW(path)    if err != nil {        fmt.Println(err)        return    }    // 删除    del(conn)}

关注公众号 学习更多精彩课程