前言
因为本人最近在学习zookeeper分布式相干的常识,发现在其代码实现上存在较多难以想分明的点,尤其是响应式编程的操作,为此在这里记录残缺的代码书写流程(每一步的思维),这里是第一篇zookeeper分布式注册配置核心的实现代码过程,前面还会有第二篇对于zookeeper分布式锁的简略实现过程。
分布式注册配置核心
zookeeper因为领有watcher机制,使得其领有公布订阅的性能,而公布与订阅模型,即所谓的配置核心,顾名思义就是发布者将数据公布到 ZK节点上,供订阅者动静获取数据,实现配置信息的集中式治理和动静更新。 利用在启动的时候会被动来获取一次配置,同时,在节点上注册一个 Watcher,这样一来,当前每次配置有更新的时候,都会实时告诉到订阅的客户端,素来达到获取最新配置信息的目标。
代码实现流程
首选交代试验环境,本人的zookeeper的版本是3.5.8的版本,代码工具为IDEA,创立了一个MAVEN我的项目,仅仅增加了如下依赖。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.8</version>
</dependency>
因为客户端须要与zookeeper建设连贯,获取数据,增加监控等等一系列的事件,所以这里封装一个Utils工具类供咱们应用。
而后对于zookeeper连贯客户端的地址的前面能够紧跟一个path,作为在根目录下的工作目录。该目录就是作为所有操作的根目录,这里应用/test、
同时因为zookeeper基于watch机制实现公布订阅,咱们所有的watcher都采纳自定义的形式实现,首先是对连贯胜利的时候的DefaultWatcher。
DefaultWatcher代码如下:
package org.qzx.config;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:05 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class DefaultWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
System.out.println(event.toString());
}
}
Utils工具类:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:02 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class Utils {
// zookeeper对象
private static ZooKeeper zooKeeper;
// 连贯地址
private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test";
private static DefaultWatcher defaultWatcher = new DefaultWatcher();
public static ZooKeeper getZooKeeper() throws Exception{
zooKeeper = new ZooKeeper(address,3000,defaultWatcher);
return zooKeeper;
}
}
因为zookeeper采纳的是异步调用,所以这里须要应用一把锁锁住主线程,在连贯胜利后主动解锁,主线程再往下进行。这里应用CountDownLatch实现锁,在主线程创立,传递到DafaultWatcher的回掉函数中。
DefaultWatcher代码批改如下:
package org.qzx.config;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:05 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class DefaultWatcher implements Watcher {
private CountDownLatch latch;
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent event) {
switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
latch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
System.out.println(event.toString());
}
}
Utils批改代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:02 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class Utils {
// zookeeper对象
private static ZooKeeper zooKeeper;
// 连贯地址
private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test";
private static DefaultWatcher defaultWatcher = new DefaultWatcher();
// 锁
private static CountDownLatch latch = new CountDownLatch(1);
public static ZooKeeper getZooKeeper() throws Exception{
zooKeeper = new ZooKeeper(address,3000,defaultWatcher);
defaultWatcher.setLatch(latch);
latch.await();
return zooKeeper;
}
}
接下来就是编写配置类TestConfig,首先是在操作之前进行连贯,操作后得敞开,别离对应conn和close办法,而后就是配置办法getConfig,因为并不分明zookeeper客户端是否肯定含有自定义的工作目录,所以个别偏向于应用exists办法来进行测试。又因为exists办法中有1个watcher和一个回调函数,在回调函数中返回存在的话又得调用getData办法获取数据,在getData办法中又存在一个watcher和回调函数,这样会造成代码深度太大不易浏览,所以这里也自定义一个工具类,封装好所有的watcher和回调函数。该类的名称就叫MyWatcherAndCallBack.
MyWatcherAndCallBack的大体框架如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
}
// Watcher
@Override
public void process(WatchedEvent event) {
}
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
}
}
对应的TestConfig类代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
@Before
public void conn() throws Exception {
zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {
zooKeeper.close();
}
@Test
public void getConf(){
// 这里的/AppConf在zookeeper中实际上是/test/AppConf,
zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123");
}
}
这个时候就得思考在胜利判断在工作目录下存在AppConf的时候须要做的事件,其实也很简略,就是获取以后节点的数据就行了。
MyWatcherAndCallBack批改后的代码如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) {
}
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
}
}
TestConfig批改后的代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
@Before
public void conn() throws Exception {
zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {
zooKeeper.close();
}
@Test
public void getConf(){
watcherAndCallBack.setZooKeeper(zooKeeper);
// 这里的/AppConf在zookeeper中实际上是/test/AppConf,
zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123");
}
}
当初,咱们再来思考另外一个问题,当咱们取数据的时候,zookeeper实际上是应用的异步调用模型,这里不会期待数据取回而是间接继续执行主线程的工作,那么在数据取回的时候要如何让主线程晓得呢?所以在这里咱们得筹备一个承受数据的对象,该类叫MyConf,对应的代码如下:
package org.qzx.config;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 2:00 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyConf {
private String confData;
public String getConfData() {
return confData;
}
public void setConfData(String confData) {
this.confData = confData;
}
}
因为须要让主线程承受数据,得在TestConfig类中聚合该对象,并且在getData的回调函数中须要为MyConf设置数据,所以在MyWatcherAndCallBack中也得聚合该对象。
TestConfig类批改代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
// 承受数据
MyConf myConf = new MyConf();
@Before
public void conn() throws Exception {
zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {
zooKeeper.close();
}
@Test
public void getConf(){
watcherAndCallBack.setZooKeeper(zooKeeper);
watcherAndCallBack.setMyConf(myConf);
// 这里的/AppConf在zookeeper中实际上是/test/AppConf,
zooKeeper.exists("/AppConf",watcherAndCallBack,watcherAndCallBack,"123");
}
}
MyWatcherAndCallBack代码批改如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
private MyConf myConf;
public void setMyConf(MyConf myConf) {
this.myConf = myConf;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) {
}
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if(stat!=null){
myConf.setConfData(new String(data));
}
}
}
这样在数据取回来后,在TestConfig中就能够看见该数据了。这里存在着一个问题,在exists执行的时候不会期待数据的获取而会始终执行上来,然而对于判断时候,如果有该节点并且获取数据应该是一个原子性的操作,在这里咱们将这两步封装成一部操作实现。咱们能够在MyWatcherAndCallBack类中增加一个办法用来期待该操作的执行,从而获取数据后果,该办法就叫aWait().咱们这里将exists办法挪动到aWait办法中,同时应用CountDownLatch阻塞该操作,直到获取数据胜利为止解锁。这里应用一个CountDownLatch实现了对于判断节点存在和获取数据的封装,如果在TestConfig中对exists办法进行加锁,那就还得将这把锁传递到MyWatcherAndCallBack中在getData回调完结能力解锁,这种实现形式显然在语义上没有将其挪动到aWait办法中的更好。
MyWatcherAndCallBack批改后的代码如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
private MyConf myConf;
private CountDownLatch latch = new CountDownLatch(1);
public void setMyConf(MyConf myConf) {
this.myConf = myConf;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) {
}
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if(stat!=null){
myConf.setConfData(new String(data));
latch.countDown();
}
}
public void aWait() throws InterruptedException {
// 这里的/AppConf在zookeeper中实际上是/test/AppConf,
zooKeeper.exists("/AppConf",this,this,"123");
latch.await();
}
}
TestConfig批改代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
// 承受数据
MyConf myConf = new MyConf();
@Before
public void conn() throws Exception {
zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {
zooKeeper.close();
}
@Test
public void getConf() throws InterruptedException {
watcherAndCallBack.setZooKeeper(zooKeeper);
watcherAndCallBack.setMyConf(myConf);
watcherAndCallBack.aWait();
}
}
当初咱们对于判断节点存在和胜利获取节点数据的这条门路就编写结束了,接下来思考节点被批改的状况,首先是当节点不存在的时候,exists的回调不会执行,在节点被创立的时候,注册在exists的watcher会被执行,那么咱们只须要调用数据即可,其次是节点中的数据被批改,咱们须要从新取得新的节点数据并且设置到confData中,再就是节点被删除,咱们须要将confData的数据置为空。为了察看数据的变动,这里在TestConfig中循环打印设置的数据。
TestConfig代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
// 承受数据
MyConf myConf = new MyConf();
@Before
public void conn() throws Exception {
zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {
zooKeeper.close();
}
@Test
public void getConf() throws InterruptedException {
watcherAndCallBack.setZooKeeper(zooKeeper);
watcherAndCallBack.setMyConf(myConf);
watcherAndCallBack.aWait();
while (true){
System.out.println(myConf.getConfData());
TimeUnit.SECONDS.sleep(2);
}
}
}
MyWatcherAndCallBack代码如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
private MyConf myConf;
private final CountDownLatch latch = new CountDownLatch(1);
public void setMyConf(MyConf myConf) {
this.myConf = myConf;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
// 节点创立须要获取数据
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeDeleted:
// 节点删除须要清空数据
myConf.setConfData("");
break;
case NodeDataChanged:
// 数据批改
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
}
}
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if(stat!=null){
myConf.setConfData(new String(data));
latch.countDown();
}
}
public void aWait() throws InterruptedException {
// 这里的/AppConf在zookeeper中实际上是/test/AppConf,
zooKeeper.exists("/AppConf",this,this,"123");
latch.await();
}
}
接下来就是测试程序是否正确了,首先启动4台zookeeper,而后在根目录下创立test工作目录.
而后开始启动程序,而后在zookeeper客户端手动创立AppConf节点,并且设置数据olddata。
能够看到程序输入olddata.
当初再批改该节点数据为newdata.
而后能够看到程序输入newdata.
在测试的时候发现如果删除了节点会一直的输入空字符串,这个比拟占用IO和资源,批改为阻塞期待数据不空。同时在输入的时候如果数据为空打印一句数据为空的提醒,这里对于MyWatcherAndCallBack中节点删除的代码须要留神的是,咱们是通过调用aWait办法来实现的阻塞,因为这样会在节点数据存在时候主动解锁,进而输入节点数据,然而因为CountDownLatch曾经被减过了,所以这里须要将latch从新赋值。
MyWatcherAndCallBack代码批改如下:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
private MyConf myConf;
private CountDownLatch latch = new CountDownLatch(1);
public void setMyConf(MyConf myConf) {
this.myConf = myConf;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
// 节点创立须要获取数据
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeDeleted:
// 节点删除须要清空数据并且期待数据达到
myConf.setConfData("");
latch = new CountDownLatch(1);
break;
case NodeDataChanged:
// 数据批改
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
}
}
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if(stat!=null){
myConf.setConfData(new String(data));
latch.countDown();
}
}
public void aWait() throws InterruptedException {
// 这里的/AppConf在zookeeper中实际上是/test/AppConf,
zooKeeper.exists("/AppConf",this,this,"123");
latch.await();
}
}
TestConfig的代码如下:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
// 承受数据
MyConf myConf = new MyConf();
@Before
public void conn() throws Exception {
zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {
zooKeeper.close();
}
@Test
public void getConf() throws InterruptedException {
watcherAndCallBack.setZooKeeper(zooKeeper);
watcherAndCallBack.setMyConf(myConf);
watcherAndCallBack.aWait();
while (true){
if(myConf.getConfData().equals("")){
System.out.println("数据为空");
watcherAndCallBack.aWait();// 期待数据达到
}
System.out.println(myConf.getConfData());
TimeUnit.SECONDS.sleep(2);
}
}
}
接下来从新测试节点被删除的状况.
删除节点后会发发现程序输入数据为空的提醒后就阻塞住了。
当初从新创立节点:
会发现又从新取得了节点的数据。
到此对于zookeeper的配置注册的代码就编写结束。
这里对于zookeeper的配置注册做一个小小的总结,配置注册实质上是在对立治理服务器共享的节点,其配置信息全副写在了那1M的数据中,在一个服务器批改了该节点后,其余的服务器会通过zookeeper的watcher机制承受到该音讯,也就胜利看到节点的实时变动实现更新配置的操作,这样就实现了分布式服务的协调性能。
zookeeper的配置核心代码整顿如下
Utils类:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:02 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class Utils {
// zookeeper对象
private static ZooKeeper zooKeeper;
// 连贯地址
private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test";
private static DefaultWatcher defaultWatcher = new DefaultWatcher();
// 锁
private static CountDownLatch latch = new CountDownLatch(1);
public static ZooKeeper getZooKeeper() throws Exception{
zooKeeper = new ZooKeeper(address,3000,defaultWatcher);
defaultWatcher.setLatch(latch);
latch.await();
return zooKeeper;
}
}
DefaultWatcher类:
package org.qzx.config;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:05 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class DefaultWatcher implements Watcher {
private CountDownLatch latch;
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent event) {
switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
latch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
System.out.println(event.toString());
}
}
MyWatcherAndCallBack类:
package org.qzx.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:40 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zooKeeper;
private MyConf myConf;
private CountDownLatch latch = new CountDownLatch(1);
public void setMyConf(MyConf myConf) {
this.myConf = myConf;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
// StatCallback
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat!=null){
// 节点存在获取数据
zooKeeper.getData("/AppConf",this,this,"aaa");
}
}
// Watcher
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
// 节点创立须要获取数据
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeDeleted:
// 节点删除须要清空数据并且期待数据达到
myConf.setConfData("");
latch = new CountDownLatch(1);
break;
case NodeDataChanged:
// 数据批改
zooKeeper.getData("/AppConf",this,this,"bbb");
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
}
}
// DataCallback
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if(stat!=null){
myConf.setConfData(new String(data));
latch.countDown();
}
}
public void aWait() throws InterruptedException {
// 这里的/AppConf在zookeeper中实际上是/test/AppConf,
zooKeeper.exists("/AppConf",this,this,"123");
latch.await();
}
}
MyConf类:
package org.qzx.config;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 2:00 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class MyConf {
private String confData;
public String getConfData() {
return confData;
}
public void setConfData(String confData) {
this.confData = confData;
}
}
TestConfig类:
package org.qzx.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipOutputStream;
/**
* @Auther: qzx
* @Date: 2020/10/29 - 10 - 29 - 1:29 下午
* @Description: org.qzx.config
* @version: 1.0
*/
public class TestConfig {
private ZooKeeper zooKeeper;
private final MyWatcherAndCallBack watcherAndCallBack = new MyWatcherAndCallBack();
// 承受数据
MyConf myConf = new MyConf();
@Before
public void conn() throws Exception {
zooKeeper = Utils.getZooKeeper();
}
@After
public void close() throws InterruptedException {
zooKeeper.close();
}
@Test
public void getConf() throws InterruptedException {
watcherAndCallBack.setZooKeeper(zooKeeper);
watcherAndCallBack.setMyConf(myConf);
watcherAndCallBack.aWait();
while (true){
if(myConf.getConfData().equals("")){
System.out.println("数据为空");
watcherAndCallBack.aWait();// 期待数据达到
}
System.out.println(myConf.getConfData());
TimeUnit.SECONDS.sleep(2);
}
}
}
写在最初
可能有人本文篇幅较冗余(尤其是代码局部)而且过于简略,然而自己只是想残缺的记录响应式编程的思考过程和残缺的代码书写流程,能够供本人温习和为小白提供一个入门zookeeper响应式编程的小demo。
发表回复