关于golang:使用三大核心语言JavaPythonGolang玩转zookeeper

38次阅读

共计 9765 个字符,预计需要花费 25 分钟才能阅读完成。

本文视频教程

https://www.bilibili.com/vide…

关注公众号 学习更多精彩课程

应用三大外围语言 Java、Python、Golang 玩转 zookeeper

应用 Java 原生 api 操作 zookeeper

创立一个 maven 我的项目

创立一个 maven 我的项目

增加我的项目依赖

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.8</version>
    </dependency>
</dependencies>

https://mvnrepository.com/

实现代码

package com.duoke360;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class Test {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String connStr = "192.168.18.128:2181";
        CountDownLatch countDown = new CountDownLatch(1);

        Watcher watcher= event -> {if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {System.err.println("eventType:"+event.getType());
                if(event.getType()== Watcher.Event.EventType.None){countDown.countDown();
                }else if(event.getType()== Watcher.Event.EventType.NodeCreated){System.out.println("listen: 节点创立");
                }else if(event.getType()== Watcher.Event.EventType.NodeChildrenChanged){System.out.println("listen: 子节点批改");
                }
            }
        };

        ZooKeeper zookeeper = new ZooKeeper(connStr, 5000,watcher);
        countDown.await();

        // 注册监听, 每次都要从新注册,否则监听不到
        // 先创立一个根节点 root
        zookeeper.exists("/root/ghz", watcher);

        // 创立节点
        String result = zookeeper.create("/root/ghz", "老郭".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(result);

        Thread.sleep(10);

        // 获取节点
        byte[] bs = zookeeper.getData("/root/ghz", true, null);
        result = new String(bs);
        System.out.println("创立节点后的数据是:" + result);

        // 批改节点
        zookeeper.setData("/root/ghz", "多课网 - 老郭".getBytes(), -1);

        Thread.sleep(10);

        bs = zookeeper.getData("/root/ghz", true, null);
        result = new String(bs);


        System.out.println("批改节点后的数据是:" + result);
        // 删除节点
        zookeeper.delete("/root/ghz", -1);
        System.out.println("节点删除胜利");
    }
}

运行后果

eventType:None
eventType:NodeCreated
listen: 节点创立
/root/ghz
创立节点后的数据是: 老郭
eventType:NodeDataChanged
批改节点后的数据是: 多课网 - 老郭
节点删除胜利
eventType:NodeDeleted

应用 java zkclient 库操作 zookeeper.md

创立一个 maven 我的项目

创立一个 maven 我的项目

增加依赖

<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

实现代码

package com.duoke360;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.*;

import java.util.List;

public class Test {public static void main(String[] args) throws InterruptedException {
        String connStr = "192.168.18.128:2181";
        ZkClient zk = new ZkClient(connStr);

        // 注册【数据】事件
        zk.subscribeDataChanges("/root/ghz", new IZkDataListener() {

            @Override
            public void handleDataDeleted(String arg0) throws Exception {System.err.println("数据删除:" + arg0);

            }

            @Override
            public void handleDataChange(String arg0, Object arg1) throws Exception {System.err.println("数据批改:" + arg0 + "------" + arg1);

            }
        });

        zk.subscribeChildChanges("/root", (arg0, arg1) -> {System.err.println("子节点发生变化:" + arg0);
            arg1.forEach(f -> {System.out.println("content:" + f);
            });
        });

        List<String> list = zk.getChildren("/");
        list.forEach(e -> {System.out.println(e);
        });

        String res = zk.create("/root/ghz", "多课网 - 老郭", CreateMode.PERSISTENT);
        System.out.println("创立节点 /root/ghz 胜利:" + res);

        zk.writeData("/root/ghz", "多课网 -zookeeper");
        System.out.println("批改节点 /root/ghz 数据胜利");

        res = zk.readData("/root/ghz");
        System.out.println("节点数据:" + res);

        Thread.sleep(1000);

        zk.delete("/root/ghz");
        System.out.println("删除节点 /root/ghz 胜利");

        Thread.sleep(1000);

        System.out.println("------------------------------------------------");

        for (int i = 0; i < 10; i++) {zk.create("/root/ghz", "多课网 - 老郭", CreateMode.PERSISTENT);
            Thread.sleep(1000);
            zk.delete("/root/ghz");
            Thread.sleep(1000);
        }
    }
}

运行后果

node20000000002
zookeeper
node10000000001
root
创立节点 /root/ghz 胜利:/root/ghz
批改节点 /root/ghz 数据胜利
节点数据: 多课网 -zookeeper
content:ghz
数据批改:/root/ghz------ 多课网 -zookeeper
子节点发生变化:/root
数据批改:/root/ghz------ 多课网 -zookeeper
删除节点 /root/ghz 胜利
数据删除:/root/ghz
子节点发生变化:/root
------------------------------------------------
数据批改:/root/ghz------ 多课网 - 老郭
子节点发生变化:/root
content:ghz
数据删除:/root/ghz
子节点发生变化:/root
数据批改:/root/ghz------ 多课网 - 老郭
子节点发生变化:/root
content:ghz
数据删除:/root/ghz
子节点发生变化:/root
数据批改:/root/ghz------ 多课网 - 老郭
子节点发生变化:/root

应用 Java curator 库操作 zookeeper

创立一个 maven 我的项目

创立一个 maven 我的项目

增加依赖

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

实现代码

package com.duoke360;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class Test {public static void main(String[] args) throws Exception {
        String connStr = "192.168.18.128:2181";
        CuratorFramework cur= CuratorFrameworkFactory.builder()
                .connectString(connStr)
                .connectionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .build();
        cur.start();// 连贯

        // 创立监听
        PathChildrenCache cache=new PathChildrenCache(cur,"/root",true);
        cache.start();
        cache.rebuild();
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework framwork, PathChildrenCacheEvent event) throws Exception {System.err.println("节点发生变化:"+event.getType());
            }
        });

        Stat stat=cur.checkExists().forPath("/root/ghz");
        if(stat!=null){System.out.println("/root/ghz 节点存在,间接删除");
            cur.delete().forPath("/root/ghz");
        }

        System.in.read();

        System.out.println("筹备创立 /root/ghz");
        cur.create().withMode(CreateMode.PERSISTENT)
                .forPath("/root/ghz", "多课网 - 老郭".getBytes());
        System.out.println("节点 /root/ghz 创立胜利");

        Thread.sleep(1000);

        byte[] bs=cur.getData().forPath("/root/ghz");
        System.out.println("数据:"+new String(bs));

        Thread.sleep(1000);

        cur.delete().forPath("/root/ghz");

        Thread.sleep(1000);
    }
}

运行后果

test
筹备创立 /root/ghz
节点 /root/ghz 创立胜利
节点发生变化:CHILD_ADDED
数据: 多课网 - 老郭
节点发生变化:CHILD_REMOVED

应用 Python 操作 zookeeper

装置

pip install kazoo

连贯 ZooKeeper

可通过 KazooClient 类间接连贯 ZooKeeper,反对多个 host,端口默认 2181。

import json
from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()

创立节点

先看下 create() 办法定义

def create(self, path, value=b"", acl=None, ephemeral=False,
               sequence=False, makepath=False):
        :param path: Path of node.
        :param value: Initial bytes value of node.
        :param acl: :class:`~kazoo.security.ACL` list.
        :param ephemeral: Boolean indicating whether node is ephemeral
                          (tied to this session).
        :param sequence: Boolean indicating whether path is suffixed
                         with a unique index.
        :param makepath: Whether the path should be created if it
                         doesn't exist.

咱们来解释下这些参数:

  • path:节点门路
  • value:节点对应的值,留神值的类型是 bytes
  • ephemeral:若为 True 则创立一个长期节点,session 中断后主动删除该节点。默认 False
  • sequence: 若为 True 则在你创立节点名前面减少 10 位数字(例如:你创立一个 testplatform/test 节点,理论创立的是 testplatform/test0000000003,这串数字是程序递增的)。默认 False
  • makepath:若为 False 父节点不存在时抛 NoNodeError。若为 True 父节点不存在则创立父节点。默认 False

实例

from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()
# 创立节点:makepath 设置为 True,父节点不存在则创立,其余参数不填均为默认
zk.create('/root/ghz',b'',makepath=True)
# 操作完后,别忘了敞开 zk 连贯
zk.stop()

查看节点

KazooClient 类用提供 get_children()get() 办法获取 子节点 和 节点对应的值

from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()
# 获取某个节点下所有子节点
node = zk.get_children('/root')
# 获取某个节点对应的值
value = zk.get('/root/ghz')
# 操作完后,别忘了敞开 zk 连贯
zk.stop()
print(node,value)

更改节点

更改上文创立的 node 值,应用 set() 办法

from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()
# 更改节点对应的 value
zk.set('/root/ghz', b'duoke-ghz')
# 获取某个节点对应的值
value = zk.get('/root/ghz')
zk.stop()
print(value)

删除节点

删除上文创立的节点,应用 delete() 办法

from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()
# 删除节点对应的 value
zk.delete('/root/ghz',recursive=False)
zk.stop()

参数 recursive:若为 False,当须要删除的节点存在子节点,会抛异样 NotEmptyError。若为 True,则删除 此节点 以及 删除该节点的所有子节点

watches 事件

zookeeper 所有读操作都有设置 watch 选项(get_children()、get() 和 exists())。watch 是一个触发器,当检测到 zookeeper 有子节点变动 或者 节点 value 产生变动时触发。上面以 get() 办法为例。

from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()


def test(event):
    print('触发事件')


if __name__ == "__main__":
    # 节点必须存在,否则保留
    zk.get('/root/ghz', watch=test)
    print("第一次获取 value")
    zk.set('/root/ghz', b'duoke-ghz')
    zk.get('/root/ghz', watch=test)
    zk.set('/root/ghz', b'duoke-ghz2')
    print("第二次获取 value")

    zk.stop()

运行后果

 第一次获取 value
触发事件
第二次获取 value

应用 golang 操作 zookeeper

下载包

go get github.com/samuel/go-zookeeper

连贯到 Server

package main

import (
    "fmt"
    "time"

    "github.com/samuel/go-zookeeper/zk"
)

func conn() *zk.Conn {var hosts = []string{"192.168.18.128:2181"}
    conn, _, err := zk.Connect(hosts, time.Second*5)
    defer conn.Close()
    if err != nil {fmt.Println(err)
        return nil
    } else {fmt.Println("连贯胜利!")
        return conn
    }
}

func main() {conn()
}

创立节点

func create() {var conn *zk.Conn = conn()
    defer conn.Close()
    var path = "/home"
    var data = []byte("多课网")
    var flags int32 = 0
    //flags 有 4 种取值://0: 永恒,除非手动删除
    //zk.FlagEphemeral = 1: 短暂,session 断开则改节点也被删除
    //zk.FlagSequence  = 2: 会主动在节点前面增加序号
    //3:Ephemeral 和 Sequence,即,短暂且主动增加序号
    var acls = zk.WorldACL(zk.PermAll) // 管制拜访权限模式

    p, err_create := conn.Create(path, data, flags, acls)
    if err_create != nil {fmt.Println(err_create)
        return
    }
    fmt.Println("create:", p)
}

批改节点

func set() {var conn *zk.Conn = conn()
    defer conn.Close()
    var path = "/home"
    var data = []byte("多课网 - 老郭")
    conn.Set(path, data, -1)
    b, _, _ := conn.Get(path)
    fmt.Println("数据:" + string(b))
}

删除节点

func del() {var conn *zk.Conn = conn()
    defer conn.Close()
    var path = "/home"
    err := conn.Delete(path, -1)
    if err != nil {fmt.Println("删除失败!")
    } else {fmt.Println("删除胜利!")
    }
}

watch

func callback(event zk.Event) {fmt.Println("*******************")
    fmt.Println("path:", event.Path)
    fmt.Println("type:", event.Type.String())
    fmt.Println("state:", event.State.String())
    fmt.Println("*******************")
}

func watch() {var hosts = []string{"192.168.18.128:2181"}
    option := zk.WithEventCallback(callback)

    conn, _, err := zk.Connect(hosts, time.Second*5, option)
    defer conn.Close()
    if err != nil {fmt.Println(err)
        return
    }
    var path = "/home"
    _, _, _, err = conn.ExistsW(path)
    if err != nil {fmt.Println(err)
        return
    }

    // 创立
    create(conn)

    time.Sleep(time.Second * 2)

    _, _, _, err = conn.ExistsW(path)
    if err != nil {fmt.Println(err)
        return
    }
    // 删除
    del(conn)

}

关注公众号 学习更多精彩课程

正文完
 0