关于java:基于-Socket-的-RPC-实现

51次阅读

共计 8990 个字符,预计需要花费 23 分钟才能阅读完成。

根底版本

定义一个 User 类。

import java.io.Serializable;

public class User implements Serializable {
    private static final long serialVersionUID = 1L;
    int id;
    String name;

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {return id;}

    public void setId(int id) {this.id = id;}

    public String getName() {return name;}

    public void setName(String name) {this.name = name;}

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

定义一个接口,返回 User 对象。

public interface IUserService {User findUserById(int id);
}

实现该接口,返回 User 对象。

public class IUserServiceImpl implements IUserService {
    @Override
    public User findUserById(int id) {return new User(id, "Alice");
    }
}

定义一个服务端,建设 Socket 连贯,依据传入的 ID 值返回 User 对象信息。

import com.zebro.IUserService;
import com.zebro.IUserServiceImpl;
import com.zebro.User;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    private static boolean running = true;
    public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(8888);
        // 循环监听
        while(running){Socket client = server.accept();
            process(client);
            client.close();}
        server.close();}
    
    public static void process(Socket socket) throws Exception {DataInputStream dis = new DataInputStream(socket.getInputStream());
        DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
        
        // 读取客户端传入的 ID
        int id = dis.readInt();
        IUserService service = new IUserServiceImpl();
        User user = service.findUserById(id);
        dos.writeInt(user.getId());
        dos.writeUTF(user.getName());
        dos.flush();}
}

编写一个客户端,用于发送 ID 和接管返回的 User 对象信息。

import java.io.*;
import java.net.Socket;

public class Client {public static void main(String[] args) throws Exception {Socket socket = new Socket("127.0.0.1", 8888);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        
        // 发送给服务端
        dos.writeInt(123);
        socket.getOutputStream().write(baos.toByteArray());
        socket.getOutputStream().flush();
        
        // 接管服务端返回的后果
        DataInputStream dis = new DataInputStream(socket.getInputStream());
        int id = dis.readInt();
        String name = dis.readUTF();
        
        // 组装
        User user = new User(id,name);
        System.out.println(user);
        
        dos.close();
        socket.close();}
}

这时候客户端不须要晓得服务端的具体方法名也能获得数据。

优化版本 1

简化客户端的调用形式,引入客户端存根 stub。

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;

public class Stub {public User findUserById(int id) throws IOException {Socket socket = new Socket("127.0.0.1", 8888);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        
        // 发送给服务端
        dos.writeInt(id);
        socket.getOutputStream().write(baos.toByteArray());
        socket.getOutputStream().flush();
        
        // 接管服务端返回的后果
        DataInputStream dis = new DataInputStream(socket.getInputStream());
        int idtmp = dis.readInt();
        if(idtmp != id) System.out.println("error");
        String name = dis.readUTF();
        User user = new User(id,name);
        
        return user;
    }
}
import java.io.IOException;

public class Client {public static void main(String[] args) throws IOException {Stub stub = new Stub();
        System.out.println(stub.findUserById(123));
    }
}

这时候客户端不须要晓得服务端的具体方法名也能获得数据。

优化版本 2

上述版本中,如果服务端办法较多,客户端存根须要提供大量的办法和返回值类型封装,引入动静代理优化相干逻辑。

import com.zebro.User;
import com.zebro.IUserService;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {public static IUserService getStub(){InvocationHandler h = new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {Socket socket = new Socket("127.0.0.1", 8888);
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(baos);

                dos.writeInt((Int)args);

                // 发送给服务端
                socket.getOutputStream().write(baos.toByteArray());
                socket.getOutputStream().flush();
                
                // 接管服务端返回的后果
                DataInputStream dis = new DataInputStream(socket.getInputStream());
                int id = dis.readInt();
                String name = dis.readUTF();
                Object user = new User(id,name);

                return user;
            }
        };

        // 通过动静代理,实例化一个代理对象
        Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return (IUserService) o;
    }
}
import com.zebro.IUserService;

public class Client {public static void main(String[] args) {IUserService stub = Stub.getStub();
        System.out.println(stub.findUserById(123));
    }
}

这时候客户端通过 IUserService 接口,能够晓得服务端的具体方法名,也能获得数据。

优化版本 3

上述版本中,客户端无论调用什么办法,服务端均调用 findUserById 解决逻辑并返回 User 对象,批改为动静办法优化相干逻辑。

import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {static IUserService getStub(){InvocationHandler h = new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {Socket socket = new Socket("127.0.0.1", 8888);
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

                // 反对动静办法名
                oos.writeUTF(method.getName());
                oos.writeObject(method.getParameterTypes());
                oos.writeObject(args);
                oos.flush();

                // 接管服务端返回的后果,object 读入
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                User user = (User)ois.readObject();
                
                oos.close();
                socket.close();
                return user;
            }
        };

        Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return (IUserService) o;
    }
}
import com.zebro.IUserService;
import com.zebro.IUserServiceImpl;
import com.zebro.User;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    private static boolean running = true;
    public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(8088);
        while(running){Socket client = server.accept();
            process(client);
            client.close();}
        server.close();}
    
    public static void process(Socket socket) throws Exception {ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
        ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

        // 服务端反对动静办法和参数的调用
        String methodName = ois.readUTF();
        Class[] parameterTypes = (Class[]) ois.readObject();
        Object[] parameters = (Object[]) ois.readObject();
        
        // 服务类型临时还是写死的,不够灵便
        IUserService service = new IUserServiceImpl();
        Method method = service.getClass().getMethod(methodName, parameterTypes);
        User user = (User)method.invoke(service, parameters);
        oos.writeObject(user);
        oos.flush();}
}

优化版本 4

上述版本中,客户端和服务端都只反对 IUserService 的办法调用,并且返回 User 对象,批改为反对任意接口办法的调用优化相干逻辑。

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {static Object getStub(Class c){InvocationHandler h = new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {Socket socket = new Socket("127.0.0.1", 8888);
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

                // 服务类型
                oos.writeUTF(c.getName());
                oos.writeUTF(method.getName());
                oos.writeObject(method.getParameterTypes());
                oos.writeObject(args);
                oos.flush();

                // 接管服务端返回的后果,object 读入
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                Object obj = ois.readObject();
                
                // 改为返回通用对象
                return obj;
            }
        };
        
        // 这里要写成通用的 c,而不是固定的接口
        Object o = Proxy.newProxyInstance(c.getClassLoader(), new Class[]{c}, h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return o;
    }
}
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;

public class Server {
    private static boolean running = true;
    private static HashMap<String,Class> registerTable = new HashMap<>();
    
    static{registerTable.put(IUserService.class.getName(),IUserServiceImpl.class);
        registerTable.put(IProductService.class.getName(), IProductServiceImpl.class);
    }
    
    public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(8888);
        while(running){Socket client = server.accept();
            process(client);
            client.close();}
        server.close();}
    
    public static void process(Socket socket) throws Exception {ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
        ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

        // 为了适应客户端通用化而做的改变
        String clazzName = ois.readUTF();
        String methodName = ois.readUTF();
        Class[] parameterTypes = (Class[]) ois.readObject();
        Object[] parameters = (Object[]) ois.readObject();

        // 从注册表中查到服务类,如果应用 spring 甚至还能够间接依据配置注入 bean 而后依据 bean 查找。Object service = registerTable.get(clazzName).newInstance();
        Method method = service.getClass().getMethod(methodName, parameterTypes);
        Object o = method.invoke(service, parameters);
        oos.writeObject(o);
        oos.flush();}
}
import com.zebro.IProductService;
import com.zebro.IUserService;

public class Client {public static void main(String[] args) {IUserService userService = (IUserService) Stub.getStub(IUserService.class);
        IProductService productService = (IProductService)Stub.getStub(IProductService.class);
        
        System.out.println(userService.findUserById(123));
        System.out.println(productService.findProductByName("Bob"));
    }
}

正文完
 0