手写RPC框架第二章netty通信

0次阅读

共计 15372 个字符,预计需要花费 39 分钟才能阅读完成。

案例介绍
在我们实现 rpc 框架的时候,需要选择 socket 的通信方式。而我们知道一般情况下 socket 通信类似与 qq 聊天,发过去消息,什么时候回复都可以。但是我们 rpc 框架通信,从感觉上类似 http 调用,需要在一定时间内返回,否则就会发生超时断开。

这里我们选择 netty 作为我们的 socket 框架,采用 future 方式进行通信。

Netty 是由 JBOSS 提供的一个 java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用 Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty 相当于简化和流线化了网络应用的编程开发过程,例如:基于 TCP 和 UDP 的 socket 服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括 FTP、SMTP、HTTP 等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

环境准备
1、jdk 1.8.0
2、IntelliJ IDEA Community Edition 2018.3.1 x64

代码示例

itstack-demo-rpc-02
└── src
    └── main
    │    └── java
    │        └── org.itstack.demo.rpc.network
    │             ├── client
    │             │   ├── ClientSocket.java
    │             │   └── MyClientHandler.java  
    │             ├── codec
    │             │   ├── RpcDecoder.java
    │             │   └── RpcEncoder.java  
    │             ├── future
    │             │   ├── SyncWrite.java     
    │             │   ├── SyncWriteFuture.java    
    │             │   ├── SyncWriteMap.java    
    │             │   └── WriteFuture.java    
    │             ├── msg
    │             │   ├── Request.java
    │             │   └── Response.java 
    │             ├── server
    │             │   ├── MyServerHandler.java
    │             │   └── ServerSocket.java     
    │             └── util
    │                 └── SerializationUtil.java     
    └── test
         └── java
             └── org.itstack.demo.test
                 ├── client
                 │   └── StartClient.java
                 └── server
                     └── StartServer.java                 

ClientSocket.java

package org.itstack.demo.rpc.network.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.itstack.demo.rpc.network.codec.RpcDecoder;
import org.itstack.demo.rpc.network.codec.RpcEncoder;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;

/**
 * http://www.itstack.org
 * create by fuzhengwei on 2019/5/6
 */
public class ClientSocket implements Runnable {

    private ChannelFuture future;

    @Override
    public void run() {EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.AUTO_READ, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new RpcDecoder(Response.class),
                            new RpcEncoder(Request.class),
                            new MyClientHandler());
                }
            });
            ChannelFuture f = b.connect("127.0.0.1", 7397).sync();
            this.future = f;
            f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        } finally {workerGroup.shutdownGracefully();
        }
    }

    public ChannelFuture getFuture() {return future;}

    public void setFuture(ChannelFuture future) {this.future = future;}
}

MyClientHandler.java

package org.itstack.demo.rpc.network.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.itstack.demo.rpc.network.future.SyncWriteFuture;
import org.itstack.demo.rpc.network.future.SyncWriteMap;
import org.itstack.demo.rpc.network.msg.Response;

/**
 * http://www.itstack.org
 * create by fuzhengwei on 2019/5/6
 */
public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {Response msg = (Response) obj;
        String requestId = msg.getRequestId();
        SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
        if (future != null) {future.setResponse(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();
        ctx.close();}

}

RpcDecoder.java

package org.itstack.demo.rpc.network.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.itstack.demo.rpc.network.util.SerializationUtil;

import java.util.List;

/**
 * http://www.itstack.org
 * create by fuzhengwei on 2019/5/6
 */
public class RpcDecoder extends ByteToMessageDecoder {

    private Class<?> genericClass;

    public RpcDecoder(Class<?> genericClass) {this.genericClass = genericClass;}

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        out.add(SerializationUtil.deserialize(data, genericClass));
    }

}

RpcEncoder.java

package org.itstack.demo.rpc.network.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.itstack.demo.rpc.network.util.SerializationUtil;

/**
 * http://www.itstack.org
 * create by fuzhengwei on 2019/5/6
 */
public class RpcEncoder extends MessageToByteEncoder {

    private Class<?> genericClass;

    public RpcEncoder(Class<?> genericClass) {this.genericClass = genericClass;}

    @Override
    protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out)  {if (genericClass.isInstance(in)) {byte[] data = SerializationUtil.serialize(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }

}

SyncWrite.java

package org.itstack.demo.rpc.network.future;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SyncWrite {public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {if (channel == null) {throw new NullPointerException("channel");
        }
        if (request == null) {throw new NullPointerException("request");
        }
        if (timeout <= 0) {throw new IllegalArgumentException("timeout <= 0");
        }

        String requestId = UUID.randomUUID().toString();
        request.setRequestId(requestId);

        WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());
        SyncWriteMap.syncKey.put(request.getRequestId(), future);

        Response response = doWriteAndSync(channel, request, timeout, future);

        SyncWriteMap.syncKey.remove(request.getRequestId());
        return response;
    }

    private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {writeFuture.setWriteResult(future.isSuccess());
                writeFuture.setCause(future.cause());
                // 失败移除
                if (!writeFuture.isWriteSuccess()) {SyncWriteMap.syncKey.remove(writeFuture.requestId());
                }
            }
        });

        Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
        if (response == null) {if (writeFuture.isTimeout()) {throw new TimeoutException();
            } else {
                // write exception
                throw new Exception(writeFuture.cause());
            }
        }
        return response;
    }

}

SyncWriteFuture.java

package org.itstack.demo.rpc.network.future;


import org.itstack.demo.rpc.network.msg.Response;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SyncWriteFuture implements WriteFuture<Response> {private CountDownLatch latch = new CountDownLatch(1);
    private final long begin = System.currentTimeMillis();
    private long timeout;
    private Response response;
    private final String requestId;
    private boolean writeResult;
    private Throwable cause;
    private boolean isTimeout = false;

    public SyncWriteFuture(String requestId) {this.requestId = requestId;}

    public SyncWriteFuture(String requestId, long timeout) {
        this.requestId = requestId;
        this.timeout = timeout;
        writeResult = true;
        isTimeout = false;
    }


    public Throwable cause() {return cause;}

    public void setCause(Throwable cause) {this.cause = cause;}

    public boolean isWriteSuccess() {return writeResult;}

    public void setWriteResult(boolean result) {this.writeResult = result;}

    public String requestId() {return requestId;}

    public Response response() {return response;}

    public void setResponse(Response response) {
        this.response = response;
        latch.countDown();}

    public boolean cancel(boolean mayInterruptIfRunning) {return true;}

    public boolean isCancelled() {return false;}

    public boolean isDone() {return false;}

    public Response get() throws InterruptedException, ExecutionException {latch.wait();
        return response;
    }

    public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {if (latch.await(timeout, unit)) {return response;}
        return null;
    }

    public boolean isTimeout() {if (isTimeout) {return isTimeout;}
        return System.currentTimeMillis() - begin > timeout;}
}

SyncWriteMap.java

package org.itstack.demo.rpc.network.future;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class SyncWriteMap {public static Map<String, WriteFuture> syncKey = new ConcurrentHashMap<String, WriteFuture>();

}

WriteFuture.java

package org.itstack.demo.rpc.network.future;

import org.itstack.demo.rpc.network.msg.Response;

import java.util.concurrent.Future;

public interface WriteFuture<T> extends Future<T> {Throwable cause();

    void setCause(Throwable cause);

    boolean isWriteSuccess();

    void setWriteResult(boolean result);

    String requestId();

    T response();

    void setResponse(Response response);

    boolean isTimeout();}

Request.java

package org.itstack.demo.rpc.network.msg;

/**
 * http://www.itstack.org
 * create by fuzhengwei on 2019/5/6
 */
public class Request {

    private String requestId;
    private Object result;

    public String getRequestId() {return requestId;}

    public void setRequestId(String requestId) {this.requestId = requestId;}

    public Object getResult() {return result;}

    public void setResult(Object result) {this.result = result;}

}

Response.java

package org.itstack.demo.rpc.network.msg;

/**
 * http://www.itstack.org
 * create by fuzhengwei on 2019/5/6
 */
public class Response {

    private String requestId;
    private String param;

    public String getRequestId() {return requestId;}

    public void setRequestId(String requestId) {this.requestId = requestId;}

    public String getParam() {return param;}

    public void setParam(String param) {this.param = param;}

}

MyServerHandler.java

package org.itstack.demo.rpc.network.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;

/**
 * http://www.itstack.org
 * create by fuzhengwei on 2019/5/6
 */
public class MyServerHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj){Request msg = (Request) obj;
        // 反馈
        Response request = new Response();
        request.setRequestId(msg.getRequestId());
        request.setParam(msg.getResult() + "请求成功,反馈结果请接受处理。");
        ctx.writeAndFlush(request);
        // 释放
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();
    }

}

ServerSocket.java

package org.itstack.demo.rpc.network.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.itstack.demo.rpc.network.codec.RpcDecoder;
import org.itstack.demo.rpc.network.codec.RpcEncoder;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;

/**
 * http://www.itstack.org
 * create by fuzhengwei on 2019/5/6
 */
public class ServerSocket implements Runnable {

    private ChannelFuture f;

    @Override
    public void run() {EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch){ch.pipeline().addLast(new RpcDecoder(Request.class),
                                    new RpcEncoder(Response.class),
                                    new MyServerHandler());
                        }
                    });

            ChannelFuture f = null;
            f = b.bind(7397).sync();
            f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        } finally {workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();}

    }

}

SerializationUtil.java

package org.itstack.demo.rpc.network.util;

import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by fuzhengwei1 on 2016/10/20.
 */
public class SerializationUtil {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap();

    private static Objenesis objenesis = new ObjenesisStd();

    private SerializationUtil() {}

    /**
     * 序列化(对象 -> 字节数组)
     *
     * @param obj 对象
     * @return 字节数组
     */
    public static <T> byte[] serialize(T obj) {Class<T> cls = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {Schema<T> schema = getSchema(cls);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);
        } finally {buffer.clear();
        }
    }

    /**
     * 反序列化(字节数组 -> 对象)
     *
     * @param data
     * @param cls
     * @param <T>
     */
    public static <T> T deserialize(byte[] data, Class<T> cls) {
        try {T message = objenesis.newInstance(cls);
            Schema<T> schema = getSchema(cls);
            ProtostuffIOUtil.mergeFrom(data, message, schema);
            return message;
        } catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);
        }
    }

    private static <T> Schema<T> getSchema(Class<T> cls) {Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
        if (schema == null) {schema = RuntimeSchema.createFrom(cls);
            cachedSchema.put(cls, schema);
        }
        return schema;
    }

}

StartClient.java

package org.itstack.demo.test.client;

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelFuture;
import org.itstack.demo.rpc.network.client.ClientSocket;
import org.itstack.demo.rpc.network.future.SyncWrite;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;

/**
 * http://www.itstack.org
 * create by fuzhengwei on 2019/5/6
 */
public class StartClient {

    private static ChannelFuture future;

    public static void main(String[] args) {ClientSocket client = new ClientSocket();
        new Thread(client).start();

        while (true) {
            try {
                // 获取 future,线程有等待处理时间
                if (null == future) {future = client.getFuture();
                    Thread.sleep(500);
                    continue;
                }
                // 构建发送参数
                Request request = new Request();
                request.setResult("查询用户信息");
                SyncWrite s = new SyncWrite();
                Response response = s.writeAndSync(future.channel(), request, 1000);
                System.out.println("调用结果:" + JSON.toJSON(response));
                Thread.sleep(1000);
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

}

StartServer.java

package org.itstack.demo.test.server;

import org.itstack.demo.rpc.network.server.ServerSocket;

/**
 * http://www.itstack.org
 * create by fuzhengwei on 2019/5/6
 */
public class StartServer {public static void main(String[] args) {System.out.println("启动服务端开始");
        new Thread(new ServerSocket()).start();
        System.out.println("启动服务端完成");
    }

}

测试结果

启动 StartServer

启动服务端开始
启动服务端完成
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

启动 StartClient

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"3380f061-2501-49b5-998b-21b5956fe60a"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"81c51815-4d92-482c-bd05-e4b6dfa4d3b6"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7af01c4f-a438-47a1-b35c-8e2cd7e4a5e7"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"86e38bb1-eccc-4d45-b976-c3b67999e3ab"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7f72002c-3b38-43d9-8452-db8797298899"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"d566a7d4-4b0d-426b-8c09-c535ccf8eb09"}

...

关注{bugstack 虫洞栈}公众号获取源码,回复 <rpc 源码 >

正文完
 0