前置声明
maven
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
</dependencies>
创建会话
public class MyZookeeper implements Watcher {private static CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) {
try {ZooKeeper zooKeeper = new ZooKeeper("192.168.1.102:2181", 5000, new MyZookeeper());
System.out.println(zooKeeper.getState());
try {latch.await();
} catch (InterruptedException e) { }
System.out.println("zookeeper session established");
} catch (IOException e) {}}
@Override
public void process(WatchedEvent event) {System.out.println("Receive watched event:" + event);
if (KeeperState.SyncConnected == event.getState()) {latch.countDown();
}
}
}
创建节点
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
zooKeeper.create("/zk-1", "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
删除节点
public void delete(final String path, int version) throws InterruptedException, KeeperException
zooKeeper.delete("/zk-book", -1);
描述:
- version: 这里的 version,表示的是 dataVersion。-1,表示匹配任何一个版本。
public void delete(final String path, int version, VoidCallback cb, Object ctx)
zooKeeper.delete("/zk-book", -1, (int rc, String path, Object context) -> {System.out.printf("rc = %d\npath = %s\ncontext = %s\n", rc, path, path, context.toString());
}, "I am context data");
Thread.sleep(3000);
描述:
- 异步删除
- Object ctx: 上下文数据,可传递到回调函数
- VoidCallback cb: 回调函数
获取节点
public List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException
List<String> children = zooKeeper.getChildren("/zk-book", new MyZookeeper());
children.forEach(System.out::println);
描述:
- 获取该节点下的直接子节点,无法递归获取(zk client 的 ls -R /zk-book 可递归获取)
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
zooKeeper.getChildren("/zk-book", new MyZookeeper(), (int rc, String path, Object ctx, List<String> children, Stat stat) -> System.out.printf("rc = %d\tpath = %s\tctx = %s\tchidren = %s\tstat = %s\n", rc, path, ctx, String.join(",", children),stat.toString()), "I am callback data");
zooKeeper.create("/zk-book/zb3", "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(3000);
描述:
- 异步获取数据
- 当注册监听的时候,/zk-book 下的节点若改变,则会触发 NodeChildrenChanged 事件。
读取数据
public byte[] getData(final String path, Watcher watcher, Stat stat)throws KeeperException, InterruptedException
byte[] data = zooKeeper.getData("/zk-book", new MyZookeeper(), null);
System.out.println(new String(data, 0, data.length));
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
zooKeeper.getData("/zk-book", new MyZookeeper(), (int rc, String path, Object ctx, byte data[], Stat stat) -> {}, null);
Thread.sleep(3000);
描述
- 异步获取数据
更新数据
public Stat setData(final String path, byte data[], int version) throws KeeperException, InterruptedException
Stat stat1 = zooKeeper.setData("/zk-book", "hello".getBytes(), -1);
public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
zooKeeper.setData("/zk-book", "hello".getBytes(), -1, (int rc, String path, Object ctx, Stat stat) ->{}, null);
Thread.sleep(3000);
描述
- 异步更新数据
检查是否存在
public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException
Stat exists = zooKeeper.exists("/zk-book", new MyZookeeper());
描述
-
Watcher watcher 监听 3 类事件。
- 节点被创建
- 节点被删除
- 节点被更新
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
zooKeeper.exists("/zk-book", new MyZookeeper(), (int rc, String path, Object ctx, Stat stat) -> {}, null);
描述
- 异步判断是否存在