案例介绍
在我们实现 rpc 框架的时候,需要选择 socket 的通信方式。而我们知道一般情况下 socket 通信类似与 qq 聊天,发过去消息,什么时候回复都可以。但是我们 rpc 框架通信,从感觉上类似 http 调用,需要在一定时间内返回,否则就会发生超时断开。
这里我们选择 netty 作为我们的 socket 框架,采用 future 方式进行通信。
Netty 是由 JBOSS 提供的一个 java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用 Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty 相当于简化和流线化了网络应用的编程开发过程,例如:基于 TCP 和 UDP 的 socket 服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括 FTP、SMTP、HTTP 等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
环境准备
1、jdk 1.8.0
2、IntelliJ IDEA Community Edition 2018.3.1 x64
代码示例
itstack-demo-rpc-02
└── src
└── main
│ └── java
│ └── org.itstack.demo.rpc.network
│ ├── client
│ │ ├── ClientSocket.java
│ │ └── MyClientHandler.java
│ ├── codec
│ │ ├── RpcDecoder.java
│ │ └── RpcEncoder.java
│ ├── future
│ │ ├── SyncWrite.java
│ │ ├── SyncWriteFuture.java
│ │ ├── SyncWriteMap.java
│ │ └── WriteFuture.java
│ ├── msg
│ │ ├── Request.java
│ │ └── Response.java
│ ├── server
│ │ ├── MyServerHandler.java
│ │ └── ServerSocket.java
│ └── util
│ └── SerializationUtil.java
└── test
└── java
└── org.itstack.demo.test
├── client
│ └── StartClient.java
└── server
└── StartServer.java
ClientSocket.java
package org.itstack.demo.rpc.network.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.itstack.demo.rpc.network.codec.RpcDecoder;
import org.itstack.demo.rpc.network.codec.RpcEncoder;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class ClientSocket implements Runnable {
private ChannelFuture future;
@Override
public void run() {EventLoopGroup workerGroup = new NioEventLoopGroup();
try {Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.AUTO_READ, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new RpcDecoder(Response.class),
new RpcEncoder(Request.class),
new MyClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 7397).sync();
this.future = f;
f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
} finally {workerGroup.shutdownGracefully();
}
}
public ChannelFuture getFuture() {return future;}
public void setFuture(ChannelFuture future) {this.future = future;}
}
MyClientHandler.java
package org.itstack.demo.rpc.network.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.itstack.demo.rpc.network.future.SyncWriteFuture;
import org.itstack.demo.rpc.network.future.SyncWriteMap;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {Response msg = (Response) obj;
String requestId = msg.getRequestId();
SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
if (future != null) {future.setResponse(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();
ctx.close();}
}
RpcDecoder.java
package org.itstack.demo.rpc.network.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.itstack.demo.rpc.network.util.SerializationUtil;
import java.util.List;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
public RpcDecoder(Class<?> genericClass) {this.genericClass = genericClass;}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}
in.markReaderIndex();
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
out.add(SerializationUtil.deserialize(data, genericClass));
}
}
RpcEncoder.java
package org.itstack.demo.rpc.network.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.itstack.demo.rpc.network.util.SerializationUtil;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
public RpcEncoder(Class<?> genericClass) {this.genericClass = genericClass;}
@Override
protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) {if (genericClass.isInstance(in)) {byte[] data = SerializationUtil.serialize(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
SyncWrite.java
package org.itstack.demo.rpc.network.future;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SyncWrite {public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {if (channel == null) {throw new NullPointerException("channel");
}
if (request == null) {throw new NullPointerException("request");
}
if (timeout <= 0) {throw new IllegalArgumentException("timeout <= 0");
}
String requestId = UUID.randomUUID().toString();
request.setRequestId(requestId);
WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());
SyncWriteMap.syncKey.put(request.getRequestId(), future);
Response response = doWriteAndSync(channel, request, timeout, future);
SyncWriteMap.syncKey.remove(request.getRequestId());
return response;
}
private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {writeFuture.setWriteResult(future.isSuccess());
writeFuture.setCause(future.cause());
// 失败移除
if (!writeFuture.isWriteSuccess()) {SyncWriteMap.syncKey.remove(writeFuture.requestId());
}
}
});
Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
if (response == null) {if (writeFuture.isTimeout()) {throw new TimeoutException();
} else {
// write exception
throw new Exception(writeFuture.cause());
}
}
return response;
}
}
SyncWriteFuture.java
package org.itstack.demo.rpc.network.future;
import org.itstack.demo.rpc.network.msg.Response;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SyncWriteFuture implements WriteFuture<Response> {private CountDownLatch latch = new CountDownLatch(1);
private final long begin = System.currentTimeMillis();
private long timeout;
private Response response;
private final String requestId;
private boolean writeResult;
private Throwable cause;
private boolean isTimeout = false;
public SyncWriteFuture(String requestId) {this.requestId = requestId;}
public SyncWriteFuture(String requestId, long timeout) {
this.requestId = requestId;
this.timeout = timeout;
writeResult = true;
isTimeout = false;
}
public Throwable cause() {return cause;}
public void setCause(Throwable cause) {this.cause = cause;}
public boolean isWriteSuccess() {return writeResult;}
public void setWriteResult(boolean result) {this.writeResult = result;}
public String requestId() {return requestId;}
public Response response() {return response;}
public void setResponse(Response response) {
this.response = response;
latch.countDown();}
public boolean cancel(boolean mayInterruptIfRunning) {return true;}
public boolean isCancelled() {return false;}
public boolean isDone() {return false;}
public Response get() throws InterruptedException, ExecutionException {latch.wait();
return response;
}
public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {if (latch.await(timeout, unit)) {return response;}
return null;
}
public boolean isTimeout() {if (isTimeout) {return isTimeout;}
return System.currentTimeMillis() - begin > timeout;}
}
SyncWriteMap.java
package org.itstack.demo.rpc.network.future;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SyncWriteMap {public static Map<String, WriteFuture> syncKey = new ConcurrentHashMap<String, WriteFuture>();
}
WriteFuture.java
package org.itstack.demo.rpc.network.future;
import org.itstack.demo.rpc.network.msg.Response;
import java.util.concurrent.Future;
public interface WriteFuture<T> extends Future<T> {Throwable cause();
void setCause(Throwable cause);
boolean isWriteSuccess();
void setWriteResult(boolean result);
String requestId();
T response();
void setResponse(Response response);
boolean isTimeout();}
Request.java
package org.itstack.demo.rpc.network.msg;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class Request {
private String requestId;
private Object result;
public String getRequestId() {return requestId;}
public void setRequestId(String requestId) {this.requestId = requestId;}
public Object getResult() {return result;}
public void setResult(Object result) {this.result = result;}
}
Response.java
package org.itstack.demo.rpc.network.msg;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class Response {
private String requestId;
private String param;
public String getRequestId() {return requestId;}
public void setRequestId(String requestId) {this.requestId = requestId;}
public String getParam() {return param;}
public void setParam(String param) {this.param = param;}
}
MyServerHandler.java
package org.itstack.demo.rpc.network.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class MyServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj){Request msg = (Request) obj;
// 反馈
Response request = new Response();
request.setRequestId(msg.getRequestId());
request.setParam(msg.getResult() + "请求成功,反馈结果请接受处理。");
ctx.writeAndFlush(request);
// 释放
ReferenceCountUtil.release(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();
}
}
ServerSocket.java
package org.itstack.demo.rpc.network.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.itstack.demo.rpc.network.codec.RpcDecoder;
import org.itstack.demo.rpc.network.codec.RpcEncoder;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class ServerSocket implements Runnable {
private ChannelFuture f;
@Override
public void run() {EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch){ch.pipeline().addLast(new RpcDecoder(Request.class),
new RpcEncoder(Response.class),
new MyServerHandler());
}
});
ChannelFuture f = null;
f = b.bind(7397).sync();
f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
} finally {workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();}
}
}
SerializationUtil.java
package org.itstack.demo.rpc.network.util;
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by fuzhengwei1 on 2016/10/20.
*/
public class SerializationUtil {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap();
private static Objenesis objenesis = new ObjenesisStd();
private SerializationUtil() {}
/**
* 序列化(对象 -> 字节数组)
*
* @param obj 对象
* @return 字节数组
*/
public static <T> byte[] serialize(T obj) {Class<T> cls = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {Schema<T> schema = getSchema(cls);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);
} finally {buffer.clear();
}
}
/**
* 反序列化(字节数组 -> 对象)
*
* @param data
* @param cls
* @param <T>
*/
public static <T> T deserialize(byte[] data, Class<T> cls) {
try {T message = objenesis.newInstance(cls);
Schema<T> schema = getSchema(cls);
ProtostuffIOUtil.mergeFrom(data, message, schema);
return message;
} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);
}
}
private static <T> Schema<T> getSchema(Class<T> cls) {Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
if (schema == null) {schema = RuntimeSchema.createFrom(cls);
cachedSchema.put(cls, schema);
}
return schema;
}
}
StartClient.java
package org.itstack.demo.test.client;
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelFuture;
import org.itstack.demo.rpc.network.client.ClientSocket;
import org.itstack.demo.rpc.network.future.SyncWrite;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class StartClient {
private static ChannelFuture future;
public static void main(String[] args) {ClientSocket client = new ClientSocket();
new Thread(client).start();
while (true) {
try {
// 获取 future,线程有等待处理时间
if (null == future) {future = client.getFuture();
Thread.sleep(500);
continue;
}
// 构建发送参数
Request request = new Request();
request.setResult("查询用户信息");
SyncWrite s = new SyncWrite();
Response response = s.writeAndSync(future.channel(), request, 1000);
System.out.println("调用结果:" + JSON.toJSON(response));
Thread.sleep(1000);
} catch (Exception e) {e.printStackTrace();
}
}
}
}
StartServer.java
package org.itstack.demo.test.server;
import org.itstack.demo.rpc.network.server.ServerSocket;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class StartServer {public static void main(String[] args) {System.out.println("启动服务端开始");
new Thread(new ServerSocket()).start();
System.out.println("启动服务端完成");
}
}
测试结果
启动 StartServer
启动服务端开始
启动服务端完成
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
启动 StartClient
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"3380f061-2501-49b5-998b-21b5956fe60a"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"81c51815-4d92-482c-bd05-e4b6dfa4d3b6"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7af01c4f-a438-47a1-b35c-8e2cd7e4a5e7"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"86e38bb1-eccc-4d45-b976-c3b67999e3ab"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7f72002c-3b38-43d9-8452-db8797298899"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"d566a7d4-4b0d-426b-8c09-c535ccf8eb09"}
...
关注{bugstack 虫洞栈}公众号获取源码,回复 <rpc 源码 >