引言
本文续写上一篇博客 dubbo框架,这里次要简略演示dubbo如何实现动静负载平衡的。
模块构建
新建子模块 springboot_dubbo_load
该模块 pom
文件中引入ZK客户端依赖:
<dependencies> <!--ZK客户端工具--> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <!-- <exclusions>--> <!-- <exclusion>--> <!-- <groupId>org.slf4j</groupId>--> <!-- <artifactId>slf4j-log4j12</artifactId>--> <!-- </exclusion>--> <!-- </exclusions>--> <version>0.10</version> </dependency> </dependencies>
ZkServerSocket
Socket服务端:
package com.baba.wlb.server;import org.I0Itec.zkclient.ZkClient;import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;/** * @Author wulongbo * @Date 2021/1/5 9:56 * @Version 1.0 */// ServerSocket服务端public class ZkServerSocket implements Runnable { private static int port = 18081; private String parentService = "/service"; /** * 服务器端:<br> * 1.服务器启动的时候,会将以后服务器信息注册到注册核心。首先先创立一个父节点为service,在父节点上面再创立一个子节点, * 每个子节点都寄存以后服务接口地址。 * ## 子节点构造 * /service 长久节点 * ##/8080 value 39.102.56.91:18080 长期节点 * ##/8081 value 39.102.56.91:18081 长期节点 * * @param args */ private ZkClient zkClient = new ZkClient("39.102.56.91:2181"); public static void main(String[] args) { ZkServerSocket server = new ZkServerSocket(port); Thread thread = new Thread(server); thread.start(); } public ZkServerSocket(int port) { this.port = port; } public void regServer() { // 1.先创立父节点server 为长久节点 if (!zkClient.exists(parentService)) {// // 删除旧节点// zkClient.delete(parentService); // 2.创立父节点 // 长久节点 zkClient.createPersistent(parentService); } String serverKey = parentService + "/server_" + port; if (!zkClient.exists(serverKey)) {// // 删除旧节点// zkClient.delete(serverKey); // 3.创立子节点value为服务接口地址 // 长期节点 zkClient.createEphemeral(serverKey, "127.0.0.1:" + port); } } @Override public void run() { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); System.out.println("Server start port:" + port); regServer(); Socket socket = null; while (true) { socket = serverSocket.accept(); new Thread(new ServerHandler(socket)).start(); } } catch (IOException e) { e.printStackTrace(); } finally { try { if (serverSocket != null) { serverSocket.close(); } } catch (Exception e2) { } } }}
ZkServerSocket2
Socket服务端 用于模仿集群:
package com.baba.wlb.server;import org.I0Itec.zkclient.ZkClient;import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;/** * @Author wulongbo * @Date 2021/1/5 9:56 * @Version 1.0 */// ServerSocket服务端public class ZkServerSocket2 implements Runnable { private static int port = 18080; private String parentService = "/service"; /** * 服务器端:<br> * 1.服务器启动的时候,会将以后服务器信息注册到注册核心。首先先创立一个父节点为service,在父节点上面再创立一个子节点, * 每个子节点都寄存以后服务接口地址。 * ## 子节点构造 * /service 长久节点 * ##/8080 value 39.102.56.91:18080 长期节点 * ##/8081 value 39.102.56.91:18081 长期节点 * * @param args */ private ZkClient zkClient = new ZkClient("39.102.56.91:2181"); public static void main(String[] args) { ZkServerSocket2 server = new ZkServerSocket2(port); Thread thread = new Thread(server); thread.start(); } public ZkServerSocket2(int port) { this.port = port; } public void regServer() { // 1.先创立父节点server 为长久节点 if (!zkClient.exists(parentService)) {// // 删除旧节点// zkClient.delete(parentService); // 2.创立父节点 // 长久节点 zkClient.createPersistent(parentService); } String serverKey = parentService + "/server_" + port; if (!zkClient.exists(serverKey)) {// // 删除旧节点// zkClient.delete(serverKey); // 3.创立子节点value为服务接口地址 // 长期节点 zkClient.createEphemeral(serverKey, "127.0.0.1:" + port); } } @Override public void run() { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); System.out.println("Server start port:" + port); regServer(); Socket socket = null; while (true) { socket = serverSocket.accept(); new Thread(new ServerHandler(socket)).start(); } } catch (IOException e) { e.printStackTrace(); } finally { try { if (serverSocket != null) { serverSocket.close(); } } catch (Exception e2) { } } }}
ZkServerClient
Socket客户端 :
package com.baba.wlb.client;import org.I0Itec.zkclient.IZkChildListener;import org.I0Itec.zkclient.ZkClient;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.Socket;import java.util.ArrayList;import java.util.List;/** * @Author wulongbo * @Date 2021/1/5 10:24 * @Version 1.0 */public class ZkServerClient { // 寄存服务列表信息 public static List<String> listServer = new ArrayList<String>(); // 客户端:读取service节点,获取上面的子节点value值,本地实现近程调用。 private static String parentService = "/service"; private static ZkClient zkClient = new ZkClient("39.102.56.91:2181"); public static void main(String[] args) { initServer(); ZkServerClient client = new ZkServerClient(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while (true) { String name; try { name = bufferedReader.readLine(); if ("exit".equals(name)) { System.exit(0); } client.send(name); } catch (IOException e) { e.printStackTrace(); } } } private void send(String name) { String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(cfg[0], Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println(name); while (true) { String resp = in.readLine(); if (resp == null) break; else if (resp.length() > 0) { System.out.println("Receive:" + resp); break; } } } catch (Exception e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } if (in != null) { try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } // 注册所有server private static void initServer() {// listServer.clear();// listServer.add("39.102.56.91:18080"); // 从zookeeper上获取服务列表信息 List<String> children = zkClient.getChildren(parentService); getChildData(zkClient, children); // 应用Zk工夫告诉获取最新服务列表信息 zkClient.subscribeChildChanges(parentService, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception { System.out.println("注册核心服务里列表信息发生变化..."); getChildData(zkClient, currentChildren); } }); } public static void getChildData(ZkClient zkClient, List<String> children) { listServer.clear(); children.stream().forEach( node -> { String serverAddress = zkClient.readData(parentService + "/" + node); listServer.add(serverAddress); } ); System.out.println("服务接口地址:" + listServer.toString()); } // 申请总数 private static int reqCount = 1; // 获取以后server信息 public static String getServer() {// return listServer.get(0); int index = reqCount % listServer.size(); String address = listServer.get(index); System.out.println("客户端申请服务器端:" + address); reqCount++; return address; }}
启动Socket服务端
别离启动 ZkServerSocket
,ZkServerSocket2
咱们能够看到zookeeper上注册了一个长久节点 server
和两个长期节点 server_18080
、server_18081
,他们对应的value值别离是:127.0.0.1:18080
、127.0.0.1:18081
启动Socket客户端
- 能够发现 服务接口地址:[127.0.0.1:18080, 127.0.0.1:18081]
- 咱们在控制台输出内容:
这是第一次传输
,通过取模算法,负载到18081这台服务上
- 在端口为:18081 的Socket服务端能够收到音讯
- 咱们在控制台输出内容:
这是第二次传输
- 在端口为:18080 的Socket服务端能够收到音讯
- 当初强制停掉 18081 的服务,只剩下 18080 这台,
需期待约15s
因为节点发生变化,通过事件告诉的形式给到订阅方,订阅方通过事件监听的形式来动静获取服务端节点。
- 咱们在控制台输出内容:
这是第三次传输
、这是第四次传输
,都只会转发给 18080
- 再次启动18081,复活服务,又可实现动静负载。
总结
本文演示的是dubbo如何实现动静负载平衡的,只有咱们对注册核心zookeeper
有肯定的理解,便很容易明确!