乐趣区

关于java:dubbo实现动态负载均衡

引言

本文续写上一篇博客 dubbo 框架,这里次要简略演示 dubbo 如何实现动静负载平衡的。

模块构建

新建子模块 springboot_dubbo_load

该模块 pom文件中引入 ZK 客户端依赖:

    <dependencies>
        <!--ZK 客户端工具 -->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <!--            <exclusions>-->
            <!--                <exclusion>-->
            <!--                    <groupId>org.slf4j</groupId>-->
            <!--                    <artifactId>slf4j-log4j12</artifactId>-->
            <!--                </exclusion>-->
            <!--            </exclusions>-->
            <version>0.10</version>
        </dependency>
    </dependencies>

ZkServerSocket Socket 服务端:

package com.baba.wlb.server;

import org.I0Itec.zkclient.ZkClient;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @Author wulongbo
 * @Date 2021/1/5 9:56
 * @Version 1.0
 */

// ServerSocket 服务端
public class ZkServerSocket implements Runnable {

    private static int port = 18081;

    private String parentService = "/service";


    /**
     * 服务器端:<br>
     * 1. 服务器启动的时候,会将以后服务器信息注册到注册核心。首先先创立一个父节点为 service, 在父节点上面再创立一个子节点,* 每个子节点都寄存以后服务接口地址。* ## 子节点构造
     * /service 长久节点
     * ##/8080 value 39.102.56.91:18080 长期节点
     * ##/8081 value 39.102.56.91:18081 长期节点
     *
     * @param args
     */
    private ZkClient zkClient = new ZkClient("39.102.56.91:2181");

    public static void main(String[] args) {ZkServerSocket server = new ZkServerSocket(port);
        Thread thread = new Thread(server);
        thread.start();}

    public ZkServerSocket(int port) {this.port = port;}

    public void regServer() {
        // 1. 先创立父节点 server 为长久节点
        if (!zkClient.exists(parentService)) {
//            // 删除旧节点
//            zkClient.delete(parentService);
            // 2. 创立父节点
            // 长久节点
            zkClient.createPersistent(parentService);
        }


        String serverKey = parentService + "/server_" + port;
        if (!zkClient.exists(serverKey)) {
//            // 删除旧节点
//            zkClient.delete(serverKey);
            // 3. 创立子节点 value 为服务接口地址
            // 长期节点
            zkClient.createEphemeral(serverKey, "127.0.0.1:" + port);
        }

    }

    @Override
    public void run() {
        ServerSocket serverSocket = null;
        try {serverSocket = new ServerSocket(port);
            System.out.println("Server start port:" + port);
            regServer();
            Socket socket = null;
            while (true) {socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();}
        } catch (IOException e) {e.printStackTrace();
        } finally {
            try {if (serverSocket != null) {serverSocket.close();
                }
            } catch (Exception e2) {}}
    }
}

ZkServerSocket2 Socket 服务端 用于模仿集群:

package com.baba.wlb.server;

import org.I0Itec.zkclient.ZkClient;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @Author wulongbo
 * @Date 2021/1/5 9:56
 * @Version 1.0
 */

// ServerSocket 服务端
public class ZkServerSocket2 implements Runnable {

    private static int port = 18080;

    private String parentService = "/service";


    /**
     * 服务器端:<br>
     * 1. 服务器启动的时候,会将以后服务器信息注册到注册核心。首先先创立一个父节点为 service, 在父节点上面再创立一个子节点,* 每个子节点都寄存以后服务接口地址。* ## 子节点构造
     * /service 长久节点
     * ##/8080 value 39.102.56.91:18080 长期节点
     * ##/8081 value 39.102.56.91:18081 长期节点
     *
     * @param args
     */
    private ZkClient zkClient = new ZkClient("39.102.56.91:2181");

    public static void main(String[] args) {ZkServerSocket2 server = new ZkServerSocket2(port);
        Thread thread = new Thread(server);
        thread.start();}

    public ZkServerSocket2(int port) {this.port = port;}

    public void regServer() {
        // 1. 先创立父节点 server 为长久节点
        if (!zkClient.exists(parentService)) {
//            // 删除旧节点
//            zkClient.delete(parentService);
            // 2. 创立父节点
            // 长久节点
            zkClient.createPersistent(parentService);
        }


        String serverKey = parentService + "/server_" + port;
        if (!zkClient.exists(serverKey)) {
//            // 删除旧节点
//            zkClient.delete(serverKey);
            // 3. 创立子节点 value 为服务接口地址
            // 长期节点
            zkClient.createEphemeral(serverKey, "127.0.0.1:" + port);
        }

    }

    @Override
    public void run() {
        ServerSocket serverSocket = null;
        try {serverSocket = new ServerSocket(port);
            System.out.println("Server start port:" + port);
            regServer();
            Socket socket = null;
            while (true) {socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();}
        } catch (IOException e) {e.printStackTrace();
        } finally {
            try {if (serverSocket != null) {serverSocket.close();
                }
            } catch (Exception e2) {}}
    }
}

ZkServerClient Socket 客户端:

package com.baba.wlb.client;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author wulongbo
 * @Date 2021/1/5 10:24
 * @Version 1.0
 */
public class ZkServerClient {

    // 寄存服务列表信息
    public static List<String> listServer = new ArrayList<String>();

    // 客户端:读取 service 节点,获取上面的子节点 value 值,本地实现近程调用。private static String parentService = "/service";

    private static ZkClient zkClient = new ZkClient("39.102.56.91:2181");

    public static void main(String[] args) {initServer();
        ZkServerClient client = new ZkServerClient();
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String name;
            try {name = bufferedReader.readLine();
                if ("exit".equals(name)) {System.exit(0);
                }
                client.send(name);
            } catch (IOException e) {e.printStackTrace();
            }
        }
    }

    private void send(String name) {String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            out.println(name);
            while (true) {String resp = in.readLine();
                if (resp == null)
                    break;
                else if (resp.length() > 0) {System.out.println("Receive:" + resp);
                    break;
                }
            }
        } catch (Exception e) {e.printStackTrace();
        } finally {if (out != null) {out.close();
            }
            if (in != null) {
                try {in.close();
                } catch (IOException e1) {e1.printStackTrace();
                }

            }
            if (socket != null) {
                try {socket.close();
                } catch (IOException e) {e.printStackTrace();
                }
            }
        }
    }

    // 注册所有 server
    private static void initServer() {//        listServer.clear();
//        listServer.add("39.102.56.91:18080");

        // 从 zookeeper 上获取服务列表信息
        List<String> children = zkClient.getChildren(parentService);
        getChildData(zkClient, children);

        // 应用 Zk 工夫告诉获取最新服务列表信息
        zkClient.subscribeChildChanges(parentService, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {System.out.println("注册核心服务里列表信息发生变化...");
                getChildData(zkClient, currentChildren);
            }
        });
    }

    public static void getChildData(ZkClient zkClient, List<String> children) {listServer.clear();
        children.stream().forEach(
                node -> {String serverAddress = zkClient.readData(parentService + "/" + node);
                    listServer.add(serverAddress);
                }
        );
        System.out.println("服务接口地址:" + listServer.toString());
    }

    // 申请总数
    private static int reqCount = 1;

    // 获取以后 server 信息
    public static String getServer() {//        return listServer.get(0);
        int index = reqCount % listServer.size();
        String address = listServer.get(index);
        System.out.println("客户端申请服务器端:" + address);
        reqCount++;
        return address;
    }


}

启动 Socket 服务端

别离启动 ZkServerSocketZkServerSocket2
咱们能够看到 zookeeper 上注册了一个长久节点 server和两个长期节点 server_18080server_18081,他们对应的 value 值别离是:127.0.0.1:18080127.0.0.1:18081

启动 Socket 客户端

  • 能够发现 服务接口地址:[127.0.0.1:18080, 127.0.0.1:18081]

  • 咱们在控制台输出内容:这是第一次传输,通过取模算法,负载到 18081 这台服务上

  • 在端口为:18081 的 Socket 服务端能够收到音讯

  • 咱们在控制台输出内容:这是第二次传输

  • 在端口为:18080 的 Socket 服务端能够收到音讯

  • 当初强制停掉 18081 的服务,只剩下 18080 这台,需期待约 15s 因为节点发生变化,通过事件告诉的形式给到订阅方,订阅方通过事件监听的形式来动静获取服务端节点。

  • 咱们在控制台输出内容:这是第三次传输 这是第四次传输,都只会转发给 18080

  • 再次启动 18081,复活服务,又可实现动静负载。

总结

本文演示的是 dubbo 如何实现动静负载平衡的,只有咱们对注册核心zookeeper 有肯定的理解,便很容易明确!

退出移动版