共计 20874 个字符,预计需要花费 53 分钟才能阅读完成。
前言
因为本人最近在学习 zookeeper 分布式相干的常识,发现在其代码实现上存在较多难以想分明的点,尤其是响应式编程的操作,为此在这里记录残缺的代码书写流程(每一步的思维),这里是第一篇 zookeeper 分布式注册配置核心的实现代码过程,前面还会有第二篇对于 zookeeper 分布式锁的简略实现过程。
分布式注册配置核心
zookeeper 因为领有 watcher 机制,使得其领有公布订阅的性能,而公布与订阅模型,即所谓的配置核心,顾名思义就是发布者将数据公布到 ZK 节点上,供订阅者动静获取数据,实现配置信息的集中式治理和动静更新。利用在启动的时候会被动来获取一次配置,同时,在节点上注册一个 Watcher,这样一来,当前每次配置有更新的时候,都会实时告诉到订阅的客户端,素来达到获取最新配置信息的目标。
代码实现流程
首选交代试验环境,本人的 zookeeper 的版本是 3.5.8 的版本,代码工具为 IDEA,创立了一个 MAVEN 我的项目,仅仅增加了如下依赖。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.8</version>
</dependency>
因为客户端须要与 zookeeper 建设连贯,获取数据,增加监控等等一系列的事件,所以这里封装一个 Utils 工具类供咱们应用。
而后对于 zookeeper 连贯客户端的地址的前面能够紧跟一个 path,作为在根目录下的工作目录。该目录就是作为所有操作的根目录,这里应用 /test、
同时因为 zookeeper 基于 watch 机制实现公布订阅,咱们所有的 watcher 都采纳自定义的形式实现,首先是对连贯胜利的时候的 DefaultWatcher。
DefaultWatcher 代码如下:
package org.qzx.config;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:05 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class DefaultWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {System.out.println(event.toString());
}
}
Utils 工具类:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:02 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class Utils {
// zookeeper 对象
private static ZooKeeper zooKeeper;
// 连贯地址
private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test";
private static DefaultWatcher defaultWatcher = new DefaultWatcher();
public static ZooKeeper getZooKeeper() throws Exception{zooKeeper = new ZooKeeper(address,3000,defaultWatcher);
return zooKeeper;
}
}
因为 zookeeper 采纳的是异步调用,所以这里须要应用一把锁锁住主线程,在连贯胜利后主动解锁,主线程再往下进行。这里应用 CountDownLatch 实现锁,在主线程创立,传递到 DafaultWatcher 的回掉函数中。
DefaultWatcher 代码批改如下:
package org.qzx.config;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:05 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class DefaultWatcher implements Watcher {
private CountDownLatch latch;
public void setLatch(CountDownLatch latch) {this.latch = latch;}
@Override
public void process(WatchedEvent event) {switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
latch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
System.out.println(event.toString());
}
}
Utils 批改代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:02 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class Utils {
// zookeeper 对象
private static ZooKeeper zooKeeper;
// 连贯地址
private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test";
private static DefaultWatcher defaultWatcher = new DefaultWatcher();
// 锁
private static CountDownLatch latch = new CountDownLatch(1);
public static ZooKeeper getZooKeeper() throws Exception{zooKeeper = new ZooKeeper(address,3000,defaultWatcher);
defaultWatcher.setLatch(latch);
latch.await();
return zooKeeper;
}
}
接下来就是编写配置类 TestConfig, 首先是在操作之前进行连贯,操作后得敞开,别离对应 conn 和 close 办法,而后就是配置办法 getConfig, 因为并不分明 zookeeper 客户端是否肯定含有自定义的工作目录,所以个别偏向于应用 exists 办法来进行测试。又因为 exists 办法中有 1 个 watcher 和一个回调函数,在回调函数中返回存在的话又得调用 getData 办法获取数据,在 getData 办法中又存在一个 watcher 和回调函数,这样会造成代码深度太大不易浏览,所以这里也自定义一个工具类,封装好所有的 watcher 和回调函数。该类的名称就叫 MyWatcherAndCallBack.
MyWatcherAndCallBack 的大体框架如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) { }
// Watcher
@Override
public void process(WatchedEvent event) { }
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {}}
对应的 TestConfig 类代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
@Before
public void conn() throws Exception {zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {zooKeeper.close();
}
@Test
public void getConf(){
// 这里的 /AppConf 在 zookeeper 中实际上是 /test/AppConf,
zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123");
}
}
这个时候就得思考在胜利判断在工作目录下存在 AppConf 的时候须要做的事件,其实也很简略,就是获取以后节点的数据就行了。
MyWatcherAndCallBack 批改后的代码如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
public void setZooKeeper(ZooKeeper zooKeeper) {this.zooKeeper = zooKeeper;}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) { }
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {}}
TestConfig 批改后的代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
@Before
public void conn() throws Exception {zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {zooKeeper.close();
}
@Test
public void getConf(){watcherAndCallBack.setZooKeeper(zooKeeper);
// 这里的 /AppConf 在 zookeeper 中实际上是 /test/AppConf,
zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123");
}
}
当初,咱们再来思考另外一个问题,当咱们取数据的时候,zookeeper 实际上是应用的异步调用模型,这里不会期待数据取回而是间接继续执行主线程的工作,那么在数据取回的时候要如何让主线程晓得呢?所以在这里咱们得筹备一个承受数据的对象,该类叫 MyConf, 对应的代码如下:
package org.qzx.config;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 2:00 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyConf {
private String confData;
public String getConfData() {return confData;}
public void setConfData(String confData) {this.confData = confData;}
}
因为须要让主线程承受数据,得在 TestConfig 类中聚合该对象,并且在 getData 的回调函数中须要为 MyConf 设置数据,所以在 MyWatcherAndCallBack 中也得聚合该对象。
TestConfig 类批改代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
// 承受数据
MyConf myConf = new MyConf();
@Before
public void conn() throws Exception {zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {zooKeeper.close();
}
@Test
public void getConf(){watcherAndCallBack.setZooKeeper(zooKeeper);
watcherAndCallBack.setMyConf(myConf);
// 这里的 /AppConf 在 zookeeper 中实际上是 /test/AppConf,
zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123");
}
}
MyWatcherAndCallBack 代码批改如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
private MyConf myConf;
public void setMyConf(MyConf myConf) {this.myConf = myConf;}
public void setZooKeeper(ZooKeeper zooKeeper) {this.zooKeeper = zooKeeper;}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) { }
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {if(stat!=null){myConf.setConfData(new String(data));
}
}
}
这样在数据取回来后,在 TestConfig 中就能够看见该数据了。这里存在着一个问题,在 exists 执行的时候不会期待数据的获取而会始终执行上来,然而对于判断时候,如果有该节点并且获取数据应该是一个原子性的操作,在这里咱们将这两步封装成一部操作实现。咱们能够在 MyWatcherAndCallBack 类中增加一个办法用来期待该操作的执行,从而获取数据后果,该办法就叫 aWait(). 咱们这里将 exists 办法挪动到 aWait 办法中,同时应用 CountDownLatch 阻塞该操作,直到获取数据胜利为止解锁。这里应用一个 CountDownLatch 实现了对于判断节点存在和获取数据的封装,如果在 TestConfig 中对 exists 办法进行加锁,那就还得将这把锁传递到 MyWatcherAndCallBack 中在 getData 回调完结能力解锁,这种实现形式显然在语义上没有将其挪动到 aWait 办法中的更好。
MyWatcherAndCallBack 批改后的代码如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
private MyConf myConf;
private CountDownLatch latch = new CountDownLatch(1);
public void setMyConf(MyConf myConf) {this.myConf = myConf;}
public void setZooKeeper(ZooKeeper zooKeeper) {this.zooKeeper = zooKeeper;}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) { }
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {if(stat!=null){myConf.setConfData(new String(data));
latch.countDown();}
}
public void aWait() throws InterruptedException {
// 这里的 /AppConf 在 zookeeper 中实际上是 /test/AppConf,
zooKeeper.exists("/AppConf",this,this,"123");
latch.await();}
}
TestConfig 批改代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
// 承受数据
MyConf myConf = new MyConf();
@Before
public void conn() throws Exception {zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {zooKeeper.close();
}
@Test
public void getConf() throws InterruptedException {watcherAndCallBack.setZooKeeper(zooKeeper);
watcherAndCallBack.setMyConf(myConf);
watcherAndCallBack.aWait();}
}
当初咱们对于判断节点存在和胜利获取节点数据的这条门路就编写结束了,接下来思考节点被批改的状况,首先是当节点不存在的时候,exists 的回调不会执行,在节点被创立的时候, 注册在 exists 的 watcher 会被执行,那么咱们只须要调用数据即可,其次是节点中的数据被批改,咱们须要从新取得新的节点数据并且设置到 confData 中,再就是节点被删除,咱们须要将 confData 的数据置为空。为了察看数据的变动,这里在 TestConfig 中循环打印设置的数据。
TestConfig 代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
// 承受数据
MyConf myConf = new MyConf();
@Before
public void conn() throws Exception {zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {zooKeeper.close();
}
@Test
public void getConf() throws InterruptedException {watcherAndCallBack.setZooKeeper(zooKeeper);
watcherAndCallBack.setMyConf(myConf);
watcherAndCallBack.aWait();
while (true){System.out.println(myConf.getConfData());
TimeUnit.SECONDS.sleep(2);
}
}
}
MyWatcherAndCallBack 代码如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
private MyConf myConf;
private final CountDownLatch latch = new CountDownLatch(1);
public void setMyConf(MyConf myConf) {this.myConf = myConf;}
public void setZooKeeper(ZooKeeper zooKeeper) {this.zooKeeper = zooKeeper;}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) {switch (event.getType()) {
case None:
break;
case NodeCreated:
// 节点创立须要获取数据
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeDeleted:
// 节点删除须要清空数据
myConf.setConfData("");
break;
case NodeDataChanged:
// 数据批改
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
}
}
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {if(stat!=null){myConf.setConfData(new String(data));
latch.countDown();}
}
public void aWait() throws InterruptedException {
// 这里的 /AppConf 在 zookeeper 中实际上是 /test/AppConf,
zooKeeper.exists("/AppConf",this,this,"123");
latch.await();}
}
接下来就是测试程序是否正确了,首先启动 4 台 zookeeper,而后在根目录下创立 test 工作目录.
而后开始启动程序,而后在 zookeeper 客户端手动创立 AppConf 节点,并且设置数据 olddata。
能够看到程序输入 olddata.
当初再批改该节点数据为 newdata.
而后能够看到程序输入 newdata.
在测试的时候发现如果删除了节点会一直的输入空字符串,这个比拟占用 IO 和资源,批改为阻塞期待数据不空。同时在输入的时候如果数据为空打印一句数据为空的提醒, 这里对于 MyWatcherAndCallBack 中节点删除的代码须要留神的是,咱们是通过调用 aWait 办法来实现的阻塞,因为这样会在节点数据存在时候主动解锁,进而输入节点数据,然而因为 CountDownLatch 曾经被减过了,所以这里须要将 latch 从新赋值。
MyWatcherAndCallBack 代码批改如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
private MyConf myConf;
private CountDownLatch latch = new CountDownLatch(1);
public void setMyConf(MyConf myConf) {this.myConf = myConf;}
public void setZooKeeper(ZooKeeper zooKeeper) {this.zooKeeper = zooKeeper;}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) {switch (event.getType()) {
case None:
break;
case NodeCreated:
// 节点创立须要获取数据
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeDeleted:
// 节点删除须要清空数据并且期待数据达到
myConf.setConfData("");
latch = new CountDownLatch(1);
break;
case NodeDataChanged:
// 数据批改
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
}
}
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {if(stat!=null){myConf.setConfData(new String(data));
latch.countDown();}
}
public void aWait() throws InterruptedException {
// 这里的 /AppConf 在 zookeeper 中实际上是 /test/AppConf,
zooKeeper.exists("/AppConf",this,this,"123");
latch.await();}
}
TestConfig 的代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
// 承受数据
MyConf myConf = new MyConf();
@Before
public void conn() throws Exception {zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {zooKeeper.close();
}
@Test
public void getConf() throws InterruptedException {watcherAndCallBack.setZooKeeper(zooKeeper);
watcherAndCallBack.setMyConf(myConf);
watcherAndCallBack.aWait();
while (true){if(myConf.getConfData().equals("")){System.out.println("数据为空");
watcherAndCallBack.aWait();// 期待数据达到}
System.out.println(myConf.getConfData());
TimeUnit.SECONDS.sleep(2);
}
}
}
接下来从新测试节点被删除的状况.
删除节点后会发发现程序输入数据为空的提醒后就阻塞住了。
当初从新创立节点:
会发现又从新取得了节点的数据。
到此对于 zookeeper 的配置注册的代码就编写结束。
这里对于 zookeeper 的配置注册做一个小小的总结,配置注册实质上是在对立治理服务器共享的节点,其配置信息全副写在了那 1M 的数据中,在一个服务器批改了该节点后,其余的服务器会通过 zookeeper 的 watcher 机制承受到该音讯,也就胜利看到节点的实时变动实现更新配置的操作,这样就实现了分布式服务的协调性能。
zookeeper 的配置核心代码整顿如下
Utils 类:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:02 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class Utils {
// zookeeper 对象
private static ZooKeeper zooKeeper;
// 连贯地址
private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test";
private static DefaultWatcher defaultWatcher = new DefaultWatcher();
// 锁
private static CountDownLatch latch = new CountDownLatch(1);
public static ZooKeeper getZooKeeper() throws Exception{zooKeeper = new ZooKeeper(address,3000,defaultWatcher);
defaultWatcher.setLatch(latch);
latch.await();
return zooKeeper;
}
}
DefaultWatcher 类:
package org.qzx.config;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:05 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class DefaultWatcher implements Watcher {
private CountDownLatch latch;
public void setLatch(CountDownLatch latch) {this.latch = latch;}
@Override
public void process(WatchedEvent event) {switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
latch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
System.out.println(event.toString());
}
}
MyWatcherAndCallBack 类:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
private MyConf myConf;
private CountDownLatch latch = new CountDownLatch(1);
public void setMyConf(MyConf myConf) {this.myConf = myConf;}
public void setZooKeeper(ZooKeeper zooKeeper) {this.zooKeeper = zooKeeper;}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) {switch (event.getType()) {
case None:
break;
case NodeCreated:
// 节点创立须要获取数据
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeDeleted:
// 节点删除须要清空数据并且期待数据达到
myConf.setConfData("");
latch = new CountDownLatch(1);
break;
case NodeDataChanged:
// 数据批改
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
}
}
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {if(stat!=null){myConf.setConfData(new String(data));
latch.countDown();}
}
public void aWait() throws InterruptedException {
// 这里的 /AppConf 在 zookeeper 中实际上是 /test/AppConf,
zooKeeper.exists("/AppConf",this,this,"123");
latch.await();}
}
MyConf 类:
package org.qzx.config;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 2:00 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyConf {
private String confData;
public String getConfData() {return confData;}
public void setConfData(String confData) {this.confData = confData;}
}
TestConfig 类:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
// 承受数据
MyConf myConf = new MyConf();
@Before
public void conn() throws Exception {zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {zooKeeper.close();
}
@Test
public void getConf() throws InterruptedException {watcherAndCallBack.setZooKeeper(zooKeeper);
watcherAndCallBack.setMyConf(myConf);
watcherAndCallBack.aWait();
while (true){if(myConf.getConfData().equals("")){System.out.println("数据为空");
watcherAndCallBack.aWait();// 期待数据达到}
System.out.println(myConf.getConfData());
TimeUnit.SECONDS.sleep(2);
}
}
}
写在最初
可能有人本文篇幅较冗余 (尤其是代码局部) 而且过于简略,然而自己只是想残缺的记录响应式编程的思考过程和残缺的代码书写流程,能够供本人温习和为小白提供一个入门 zookeeper 响应式编程的小 demo。