共计 6605 个字符,预计需要花费 17 分钟才能阅读完成。
引言
本文续写上一篇博客 dubbo 框架,这里次要简略演示 dubbo 如何实现动静负载平衡的。
模块构建
新建子模块 springboot_dubbo_load
该模块 pom
文件中引入 ZK 客户端依赖:
<dependencies>
<!--ZK 客户端工具 -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<version>0.10</version>
</dependency>
</dependencies>
ZkServerSocket
Socket 服务端:
package com.baba.wlb.server;
import org.I0Itec.zkclient.ZkClient;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Author wulongbo
* @Date 2021/1/5 9:56
* @Version 1.0
*/
// ServerSocket 服务端
public class ZkServerSocket implements Runnable {
private static int port = 18081;
private String parentService = "/service";
/**
* 服务器端:<br>
* 1. 服务器启动的时候,会将以后服务器信息注册到注册核心。首先先创立一个父节点为 service, 在父节点上面再创立一个子节点,* 每个子节点都寄存以后服务接口地址。* ## 子节点构造
* /service 长久节点
* ##/8080 value 39.102.56.91:18080 长期节点
* ##/8081 value 39.102.56.91:18081 长期节点
*
* @param args
*/
private ZkClient zkClient = new ZkClient("39.102.56.91:2181");
public static void main(String[] args) {ZkServerSocket server = new ZkServerSocket(port);
Thread thread = new Thread(server);
thread.start();}
public ZkServerSocket(int port) {this.port = port;}
public void regServer() {
// 1. 先创立父节点 server 为长久节点
if (!zkClient.exists(parentService)) {
// // 删除旧节点
// zkClient.delete(parentService);
// 2. 创立父节点
// 长久节点
zkClient.createPersistent(parentService);
}
String serverKey = parentService + "/server_" + port;
if (!zkClient.exists(serverKey)) {
// // 删除旧节点
// zkClient.delete(serverKey);
// 3. 创立子节点 value 为服务接口地址
// 长期节点
zkClient.createEphemeral(serverKey, "127.0.0.1:" + port);
}
}
@Override
public void run() {
ServerSocket serverSocket = null;
try {serverSocket = new ServerSocket(port);
System.out.println("Server start port:" + port);
regServer();
Socket socket = null;
while (true) {socket = serverSocket.accept();
new Thread(new ServerHandler(socket)).start();}
} catch (IOException e) {e.printStackTrace();
} finally {
try {if (serverSocket != null) {serverSocket.close();
}
} catch (Exception e2) {}}
}
}
ZkServerSocket2
Socket 服务端 用于模仿集群:
package com.baba.wlb.server;
import org.I0Itec.zkclient.ZkClient;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Author wulongbo
* @Date 2021/1/5 9:56
* @Version 1.0
*/
// ServerSocket 服务端
public class ZkServerSocket2 implements Runnable {
private static int port = 18080;
private String parentService = "/service";
/**
* 服务器端:<br>
* 1. 服务器启动的时候,会将以后服务器信息注册到注册核心。首先先创立一个父节点为 service, 在父节点上面再创立一个子节点,* 每个子节点都寄存以后服务接口地址。* ## 子节点构造
* /service 长久节点
* ##/8080 value 39.102.56.91:18080 长期节点
* ##/8081 value 39.102.56.91:18081 长期节点
*
* @param args
*/
private ZkClient zkClient = new ZkClient("39.102.56.91:2181");
public static void main(String[] args) {ZkServerSocket2 server = new ZkServerSocket2(port);
Thread thread = new Thread(server);
thread.start();}
public ZkServerSocket2(int port) {this.port = port;}
public void regServer() {
// 1. 先创立父节点 server 为长久节点
if (!zkClient.exists(parentService)) {
// // 删除旧节点
// zkClient.delete(parentService);
// 2. 创立父节点
// 长久节点
zkClient.createPersistent(parentService);
}
String serverKey = parentService + "/server_" + port;
if (!zkClient.exists(serverKey)) {
// // 删除旧节点
// zkClient.delete(serverKey);
// 3. 创立子节点 value 为服务接口地址
// 长期节点
zkClient.createEphemeral(serverKey, "127.0.0.1:" + port);
}
}
@Override
public void run() {
ServerSocket serverSocket = null;
try {serverSocket = new ServerSocket(port);
System.out.println("Server start port:" + port);
regServer();
Socket socket = null;
while (true) {socket = serverSocket.accept();
new Thread(new ServerHandler(socket)).start();}
} catch (IOException e) {e.printStackTrace();
} finally {
try {if (serverSocket != null) {serverSocket.close();
}
} catch (Exception e2) {}}
}
}
ZkServerClient
Socket 客户端:
package com.baba.wlb.client;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
/**
* @Author wulongbo
* @Date 2021/1/5 10:24
* @Version 1.0
*/
public class ZkServerClient {
// 寄存服务列表信息
public static List<String> listServer = new ArrayList<String>();
// 客户端:读取 service 节点,获取上面的子节点 value 值,本地实现近程调用。private static String parentService = "/service";
private static ZkClient zkClient = new ZkClient("39.102.56.91:2181");
public static void main(String[] args) {initServer();
ZkServerClient client = new ZkServerClient();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String name;
try {name = bufferedReader.readLine();
if ("exit".equals(name)) {System.exit(0);
}
client.send(name);
} catch (IOException e) {e.printStackTrace();
}
}
}
private void send(String name) {String server = ZkServerClient.getServer();
String[] cfg = server.split(":");
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println(name);
while (true) {String resp = in.readLine();
if (resp == null)
break;
else if (resp.length() > 0) {System.out.println("Receive:" + resp);
break;
}
}
} catch (Exception e) {e.printStackTrace();
} finally {if (out != null) {out.close();
}
if (in != null) {
try {in.close();
} catch (IOException e1) {e1.printStackTrace();
}
}
if (socket != null) {
try {socket.close();
} catch (IOException e) {e.printStackTrace();
}
}
}
}
// 注册所有 server
private static void initServer() {// listServer.clear();
// listServer.add("39.102.56.91:18080");
// 从 zookeeper 上获取服务列表信息
List<String> children = zkClient.getChildren(parentService);
getChildData(zkClient, children);
// 应用 Zk 工夫告诉获取最新服务列表信息
zkClient.subscribeChildChanges(parentService, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {System.out.println("注册核心服务里列表信息发生变化...");
getChildData(zkClient, currentChildren);
}
});
}
public static void getChildData(ZkClient zkClient, List<String> children) {listServer.clear();
children.stream().forEach(
node -> {String serverAddress = zkClient.readData(parentService + "/" + node);
listServer.add(serverAddress);
}
);
System.out.println("服务接口地址:" + listServer.toString());
}
// 申请总数
private static int reqCount = 1;
// 获取以后 server 信息
public static String getServer() {// return listServer.get(0);
int index = reqCount % listServer.size();
String address = listServer.get(index);
System.out.println("客户端申请服务器端:" + address);
reqCount++;
return address;
}
}
启动 Socket 服务端
别离启动 ZkServerSocket
,ZkServerSocket2
咱们能够看到 zookeeper 上注册了一个长久节点 server
和两个长期节点 server_18080
、server_18081
,他们对应的 value 值别离是:127.0.0.1:18080
、127.0.0.1:18081
启动 Socket 客户端
- 能够发现 服务接口地址:[127.0.0.1:18080, 127.0.0.1:18081]
- 咱们在控制台输出内容:
这是第一次传输
,通过取模算法,负载到 18081 这台服务上
- 在端口为:18081 的 Socket 服务端能够收到音讯
- 咱们在控制台输出内容:
这是第二次传输
- 在端口为:18080 的 Socket 服务端能够收到音讯
- 当初强制停掉 18081 的服务,只剩下 18080 这台,
需期待约 15s
因为节点发生变化,通过事件告诉的形式给到订阅方,订阅方通过事件监听的形式来动静获取服务端节点。
- 咱们在控制台输出内容:
这是第三次传输
、这是第四次传输
,都只会转发给 18080
- 再次启动 18081,复活服务,又可实现动静负载。
总结
本文演示的是 dubbo 如何实现动静负载平衡的,只有咱们对注册核心zookeeper
有肯定的理解,便很容易明确!
正文完