1.前言

之前本人写了一些对于Zookeeper的基础知识,Zookeeper作为一种协调分布式应用高性能的调度服务,理论的利用场景也十分的宽泛,这里次要通过几个例子来具体的阐明Zookeeper在特定场景下的应用形式(上面的这些性能预计consul和etcd也能实现,当前学到了再说吧)。

2.具体利用

2.1.一致性配置管理

咱们在开发的时候,有时候须要获取一些公共的配置,比方数据库连贯信息等,并且偶尔可能须要更新配置。如果咱们的服务器有N多台的话,那批改起来会特地的麻烦,并且还须要重新启动。这里Zookeeper就能够很不便的实现相似的性能。

2.1.1.思路

将公共的配置寄存在Zookeeper的节点中

应用程序能够连贯到Zookeeper中并对Zookeeper中配置节点进行读取或者批改(对于写操作能够进行权限验证设置),上面是具体的流程图:

2.1.2.事例

数据库配置信息一致性的保护

配置类:

public class CommonConfig implements Serializable{ // 数据库连贯配置 private String dbUrl; private String username; private String password; private String driverClass; public CommonConfig() {} public CommonConfig(String dbUrl, String username, String password, String driverClass) {  super();  this.dbUrl = dbUrl;  this.username = username;  this.password = password;  this.driverClass = driverClass; } public String getDbUrl() {  return dbUrl; } public void setDbUrl(String dbUrl) {  this.dbUrl = dbUrl; } public String getUsername() {  return username; } public void setUsername(String username) {  this.username = username; } public String getPassword() {  return password; } public void setPassword(String password) {  this.password = password; } public String getDriverClass() {  return driverClass; } public void setDriverClass(String driverClass) {  this.driverClass = driverClass; } @Override public String toString() {  return "CommonConfig:{dbUrl:" + this.dbUrl +    ", username:" + this.username +    ", password:" + this.password +    ", driverClass:" + this.driverClass + "}"; }}

配置管理核心

  • 获取本地配置信息
  • 批改配置,并同步

同步配置信息到Zookeeper服务器

public class ZkConfigMng { private String nodePath = "/commConfig"; private CommonConfig commonConfig; private ZkClient zkClient; public CommonConfig initConfig(CommonConfig commonConfig) {  if(commonConfig == null) {   this.commonConfig = new CommonConfig("jdbc:mysql://127.0.0.1:3306/mydata?useUnicode=true&characterEncoding=utf-8",     "root", "root", "com.mysql.jdbc.Driver");  } else {   this.commonConfig = commonConfig;  }  return this.commonConfig; } /**  * 更新配置  *  * @param commonConfig  * @return  */ public CommonConfig update(CommonConfig commonConfig) {  if(commonConfig != null) {   this.commonConfig = commonConfig;  }  syncConfigToZookeeper();  return this.commonConfig; } public void syncConfigToZookeeper() {  if(zkClient == null) {   zkClient = new ZkClient("127.0.0.1:2181");  }  if(!zkClient.exists(nodePath)) {   zkClient.createPersistent(nodePath);  }  zkClient.writeData(nodePath, commonConfig); }}

以上是提供者,上面咱们须要一个客户端获取这些配置

public class ZkConfigClient implements Runnable { private String nodePath = "/commConfig"; private CommonConfig commonConfig; @Override public void run() {  ZkClient zkClient = new ZkClient(new ZkConnection("127.0.0.1:2181", 5000));  while (!zkClient.exists(nodePath)) {   System.out.println("配置节点不存在!");   try {    TimeUnit.SECONDS.sleep(1);   } catch (InterruptedException e) {    e.printStackTrace();   }  }  // 获取节点  commonConfig = (CommonConfig)zkClient.readData(nodePath);  System.out.println(commonConfig.toString());  zkClient.subscribeDataChanges(nodePath, new IZkDataListener() {   @Override   public void handleDataDeleted(String dataPath) throws Exception {    if(dataPath.equals(nodePath)) {     System.out.println("节点:" + dataPath + "被删除了!");    }   }   @Override   public void handleDataChange(String dataPath, Object data) throws Exception {    if(dataPath.equals(nodePath)) {     System.out.println("节点:" + dataPath + ", 数据:" + data + " - 更新");     commonConfig = (CommonConfig) data;    }   }  }); }}

上面启动Main函数

配置管理服务启动

public static void main(String[] args) throws InterruptedException {  SpringApplication.run(ZookeeperApiDemoApplication.class, args);  ZkConfigMng zkConfigMng = new ZkConfigMng();  zkConfigMng.initConfig(null);  zkConfigMng.syncConfigToZookeeper();  TimeUnit.SECONDS.sleep(10);  // 批改值  zkConfigMng.update(new CommonConfig("jdbc:mysql://192.168.1.122:3306/mydata?useUnicode=true&characterEncoding=utf-8",    "root", "wxh", "com.mysql.jdbc.Driver")); }}

客户端启动:

public static void main(String[] args) throws InterruptedException {  SpringApplication.run(ZookeeperApiDemoApplication.class, args);  ExecutorService executorService = Executors.newFixedThreadPool(3);  // 模仿多个客户端获取配置  executorService.submit(new ZkConfigClient());  executorService.submit(new ZkConfigClient());  executorService.submit(new ZkConfigClient()); }}

2.2.分布式锁

在咱们日常的开发中,如果是单个过程中对共享资源的拜访,咱们只须要用synchronized或者lock就能实现互斥操作。然而对于跨过程、跨主机、跨网络的共享资源仿佛就无能为力了。

另外,分布式系列面试题和答案全副整顿好了,微信搜寻Java技术栈,在后盾发送:面试,能够在线浏览。

2.1.1.思路
  • 首先zookeeper中咱们能够创立一个/distributed_lock长久化节点
  • 而后再在/distributed_lock节点下创立本人的长期程序节点,比方:/distributed_lock/task_00000000008
  • 获取所有的/distributed_lock下的所有子节点,并排序
  • 判读本人创立的节点是否最小值(第一位)
  • 如果是,则获取失去锁,执行本人的业务逻辑,最初删除这个长期节点。
  • 如果不是最小值,则须要监听本人创立节点前一位节点的数据变动,并阻塞。
  • 以后一位节点被删除时,咱们须要通过递归来判断本人创立的节点是否在是最小的,如果是则执行5);如果不是则执行6)(就是递归循环的判断)

上面是具体的流程图:

2.1.3.事例
public class DistributedLock { // 常亮 static class Constant {  private static final int SESSION_TIMEOUT = 10000;  private static final String CONNECTION_STRING = "127.0.0.1:2181";  private static final String LOCK_NODE = "/distributed_lock";  private static final String CHILDREN_NODE = "/task_"; } private ZkClient zkClient; public DistributedLock() {  // 连贯到Zookeeper  zkClient = new ZkClient(new ZkConnection(Constant.CONNECTION_STRING));  if(!zkClient.exists(Constant.LOCK_NODE)) {   zkClient.create(Constant.LOCK_NODE, "分布式锁节点", CreateMode.PERSISTENT);  } } public String getLock() {  try {   // 1。在Zookeeper指定节点下创立长期程序节点   String lockName = zkClient.createEphemeralSequential(Constant.LOCK_NODE + Constant.CHILDREN_NODE, "");   // 尝试获取锁   acquireLock(lockName);   return lockName;  } catch(Exception e) {   e.printStackTrace();  }  return null; } /**  * 获取锁  * @throws InterruptedException  */ public Boolean acquireLock(String lockName) throws InterruptedException {  // 2.获取lock节点下的所有子节点  List<String> childrenList = zkClient.getChildren(Constant.LOCK_NODE);  // 3.对子节点进行排序,获取最小值  Collections.sort(childrenList, new Comparator<String>() {   @Override   public int compare(String o1, String o2) {    return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);   }  });  // 4.判断以后创立的节点是否在第一位  int lockPostion = childrenList.indexOf(lockName.split("/")[lockName.split("/").length - 1]);  if(lockPostion < 0) {   // 不存在该节点   throw new ZkNodeExistsException("不存在的节点:" + lockName);  } else if (lockPostion == 0) {   // 获取到锁   System.out.println("获取到锁:" + lockName);   return true;  } else if (lockPostion > 0) {   // 未获取到锁,阻塞   System.out.println("...... 未获取到锁,阻塞期待 。。。。。。");   // 5.如果未获取失去锁,监听以后创立的节点前一位的节点   final CountDownLatch latch = new CountDownLatch(1);   IZkDataListener listener = new IZkDataListener() {    @Override    public void handleDataDeleted(String dataPath) throws Exception {     // 6.前一个节点被删除,当不保障轮到本人     System.out.println("。。。。。。前一个节点被删除  。。。。。。");     acquireLock(lockName);     latch.countDown();    }    @Override    public void handleDataChange(String dataPath, Object data) throws Exception {     // 不必理睬    }   };   try {    zkClient.subscribeDataChanges(Constant.LOCK_NODE + "/" + childrenList.get(lockPostion - 1), listener);    latch.await();   } finally {    zkClient.unsubscribeDataChanges(Constant.LOCK_NODE + "/" + childrenList.get(lockPostion - 1), listener);   }  }  return false; } /**  * 开释锁(删除节点)  *  * @param lockName  */ public void releaseLock(String lockName) {  zkClient.delete(lockName); } public void closeZkClient() {  zkClient.close(); }}@SpringBootApplicationpublic class ZookeeperDemoApplication { public static void main(String[] args) throws InterruptedException {  SpringApplication.run(ZookeeperDemoApplication.class, args);  DistributedLock lock = new DistributedLock();  String lockName = lock.getLock();  /**   * 执行咱们的业务逻辑   */  if(lockName != null) {   lock.releaseLock(lockName);  }  lock.closeZkClient(); }}

2.3.分布式队列

在日常应用中,特地是像生产者消费者模式中,常常会应用BlockingQueue来充当缓冲区的角色。然而在分布式系统中这种形式就不能应用BlockingQueue来实现了,然而Zookeeper能够实现。

2.1.1.思路
  • 首先利用Zookeeper中长期程序节点的特点
  • 当生产者创立节点生产时,须要判断父节点下长期程序子节点的个数,如果达到了下限,则阻塞期待;如果没有达到,就创立节点。
  • 当消费者获取节点时,如果父节点中不存在长期程序子节点,则阻塞期待;如果有子节点,则获取执行本人的业务,执行结束后删除该节点即可。
  • 获取时获取最小值,保障FIFO个性。
2.1.2.事例

这个是一个消费者对一个生产者,如果是多个消费者对多个生产者,对代码须要调整。

public interface AppConstant { static String ZK_CONNECT_STR = "127.0.0.1:2181"; static String NODE_PATH = "/mailbox"; static String CHILD_NODE_PATH = "/mail_"; static int MAILBOX_SIZE = 10;}public class MailConsumer implements Runnable, AppConstant{ private ZkClient zkClient; private Lock lock; private Condition condition; public MailConsumer() {  lock = new ReentrantLock();  condition = lock.newCondition();  zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));  System.out.println("sucess connected to zookeeper server!");  // 不存在就创立mailbox节点  if(!zkClient.exists(NODE_PATH)) {   zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);  } } @Override public void run() {  IZkChildListener listener = new IZkChildListener() {   @Override   public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {    System.out.println("Znode["+parentPath + "] size:" + currentChilds.size());    // 还是要判断邮箱是否为空    if(currentChilds.size() > 0) {     // 唤醒期待的线程     try {      lock.lock();      condition.signal();     } catch (Exception e) {      e.printStackTrace();     } finally {      lock.unlock();     }    }   }  };  // 监督子节点的扭转,不必放用while循环中,监听一次就行了,不须要反复绑定  zkClient.subscribeChildChanges(NODE_PATH, listener);  try {   //循环随机发送邮件模仿真是状况   while(true) {    // 判断是否能够发送邮件    checkMailReceive();    // 承受邮件    List<String> mailList = zkClient.getChildren(NODE_PATH);    // 如果mailsize==0,也没有关系;能够间接循环获取就行了    if(mailList.size() > 0) {     Collections.sort(mailList, new Comparator<String>() {      @Override      public int compare(String o1, String o2) {       return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);      }     });     // 模仿邮件解决(0-1S)     TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));     zkClient.delete(NODE_PATH + "/" + mailList.get(0));     System.out.println("mail has been received:" + NODE_PATH + "/" + mailList.get(0));    }   }  }catch (Exception e) {   e.printStackTrace();  } finally {   zkClient.unsubscribeChildChanges(NODE_PATH, listener);  } } private void checkMailReceive() {  try {   lock.lock();   // 判断邮箱是为空   List<String> mailList = zkClient.getChildren(NODE_PATH);   System.out.println("mailbox size: " + mailList.size());   if(mailList.size() == 0) {    // 邮箱为空,阻塞消费者,直到邮箱有邮件    System.out.println("mailbox is empty, please wait 。。。");    condition.await();    // checkMailReceive();   }  } catch (Exception e) {   e.printStackTrace();  } finally {   lock.unlock();  } }}public class MailProducer implements Runnable, AppConstant{ private ZkClient zkClient; private Lock lock; private Condition condition; /**  * 初始化状态  */ public MailProducer() {  lock = new ReentrantLock();  condition = lock.newCondition();  zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));  System.out.println("sucess connected to zookeeper server!");  // 不存在就创立mailbox节点  if(!zkClient.exists(NODE_PATH)) {   zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);  } } @Override public void run() {  IZkChildListener listener = new IZkChildListener() {   @Override   public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {    System.out.println("Znode["+parentPath + "] size:" + currentChilds.size());    // 还是要判断邮箱是否已满    if(currentChilds.size() < MAILBOX_SIZE) {     // 唤醒期待的线程     try {      lock.lock();      condition.signal();     } catch (Exception e) {      e.printStackTrace();     } finally {      lock.unlock();     }    }   }  };  // 监督子节点的扭转,不必放用while循环中,监听一次就行了,不须要反复绑定  zkClient.subscribeChildChanges(NODE_PATH, listener);  try {   //循环随机发送邮件模仿真是状况   while(true) {    // 判断是否能够发送邮件    checkMailSend();    // 发送邮件    String cretePath = zkClient.createEphemeralSequential(NODE_PATH + CHILD_NODE_PATH, "your mail");    System.out.println("your mail has been send:" + cretePath);    // 模仿随机距离的发送邮件(0-10S)    TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));   }  }catch (Exception e) {   e.printStackTrace();  } finally {   zkClient.unsubscribeChildChanges(NODE_PATH, listener);  } } private void checkMailSend() {  try {   lock.lock();   // 判断邮箱是否已满   List<String> mailList = zkClient.getChildren(NODE_PATH);   System.out.println("mailbox size: " + mailList.size());   if(mailList.size() >= MAILBOX_SIZE) {    // 邮箱已满,阻塞生产者,直到邮箱有空间    System.out.println("mailbox is full, please wait 。。。");    condition.await();    checkMailSend();   }  } catch (Exception e) {   e.printStackTrace();  } finally {   lock.unlock();  } }}

2.4.平衡负载

首先咱们须要简略的了解分布式和集群,艰深点说:分布式就是将一个零碎拆分到多个独立运行的利用中(有可能在同一台主机也有可能在不同的主机上),集群就是将单个独立的利用复制多分放在不同的主机上来加重服务器的压力。

而Zookeeper不仅仅能够作为分布式集群的服务注册调度核心(例如dubbo),也能够实现集群的负载平衡。

2.4.1.思路

首先咱们要了解,如果是一个集群,那么他就会有多台主机。所以,他在Zookeeper中信息的存在应该是如下所示:

如上的构造,当服务调用方调用服务时,就能够依据特定的平衡负载算法来实现对服务的调用(调用前须要监听/service/serviceXXX节点,以更新列表数据)

2.4.2.事例
/** * 服务提供者 * * @author Administrator * */public class ServiceProvider { // 动态常量 static String ZK_CONNECT_STR = "127.0.0.1:2181"; static String NODE_PATH = "/service"; static String SERIVCE_NAME = "/myService"; private ZkClient zkClient; public ServiceProvider() {  zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));  System.out.println("sucess connected to zookeeper server!");  // 不存在就创立NODE_PATH节点  if(!zkClient.exists(NODE_PATH)) {   zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);  } } public void registryService(String localIp, Object obj) {  if(!zkClient.exists(NODE_PATH + SERIVCE_NAME)) {   zkClient.create(NODE_PATH + SERIVCE_NAME, "provider services list", CreateMode.PERSISTENT);  }  // 对本人的服务进行注册  zkClient.createEphemeral(NODE_PATH + SERIVCE_NAME + "/" + localIp, obj);  System.out.println("注册胜利![" + localIp + "]"); }}/** * 消费者,通过某种平衡负载算法抉择某一个提供者 * * @author Administrator * */public class ServiceConsumer { // 动态常量 static String ZK_CONNECT_STR = "127.0.0.1:2181"; static String NODE_PATH = "/service"; static String SERIVCE_NAME = "/myService"; private List<String> serviceList = new ArrayList<String>(); private ZkClient zkClient; public ServiceConsumer() {  zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));  System.out.println("sucess connected to zookeeper server!");  // 不存在就创立NODE_PATH节点  if(!zkClient.exists(NODE_PATH)) {   zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);  } } /**  * 订阅服务  */ public void subscribeSerivce() {  serviceList = zkClient.getChildren(NODE_PATH + SERIVCE_NAME);  zkClient.subscribeChildChanges(NODE_PATH + SERIVCE_NAME, new IZkChildListener() {   @Override   public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {    serviceList = currentChilds;   }  }); } /**  * 模仿调用服务  */ public void consume() {  //负载平衡算法获取某台机器调用服务  int index = new Random().nextInt(serviceList.size());  System.out.println("调用[" + NODE_PATH + SERIVCE_NAME + "]服务:" + serviceList.get(index)); }}

3.总结

Zookeeper是一个性能十分弱小的利用,除了下面几种利用外,还有命名服务、分布式协调告诉等也是罕用的场景。

原文链接:https://blog.csdn.net/u013468...

版权申明:本文为CSDN博主「永远_不会懂」的原创文章,遵循CC 4.0 BY-SA版权协定,转载请附上原文出处链接及本申明。

近期热文举荐:

1.1,000+ 道 Java面试题及答案整顿(2021最新版)

2.别在再满屏的 if/ else 了,试试策略模式,真香!!

3.卧槽!Java 中的 xx ≠ null 是什么新语法?

4.Spring Boot 2.5 重磅公布,光明模式太炸了!

5.《Java开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞+转发哦!