netty提供了对立的API进行传输数据,这个相比于JDK的形式不便很多。比方上面是一个不必netty而应用原生的阻塞IO进行传输的例子。
public class PlainOioServer {
public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port);
try {
for(;;) {
final Socket clientSocket = socket.accept();
System.out.println(
"Accepted connection from " + clientSocket);
new Thread(new Runnable() {
@Override
public void run() {
OutputStream out;
try {
out = clientSocket.getOutputStream();
out.write("Hi!rn".getBytes(
Charset.forName("UTF-8")));
out.flush();
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
clientSocket.close();
} catch (IOException ex) {
// ignore on close
}
}
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
代码很好了解,为每一个新来的连贯创立一个线程解决。这种形式有个比拟大的问题是,客户端连接数受限于服务器所能接受的线程数。为了改良这个问题咱们能够应用异步模式来重写这段代码,然而你会发现,简直所有的代码都要重写。原生的OIO和NIO的API简直齐全不能复用。不信你看看上面这段NIO的代码,
public class PlainNioServer {
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi!rn".getBytes());
for (;;){
try {
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
//handle exception
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server =
(ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_WRITE |
SelectionKey.OP_READ, msg.duplicate());
System.out.println(
"Accepted connection from " + client);
}
if (key.isWritable()) {
SocketChannel client =
(SocketChannel) key.channel();
ByteBuffer buffer =
(ByteBuffer) key.attachment();
while (buffer.hasRemaining()) {
if (client.write(buffer) == 0) {
break;
}
}
client.close();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
// ignore on close
}
}
}
}
}
}
这个代码不做过多解释了,毕竟咱们的重点是netty不是JDK NIO。
用netty实现一个OIO的程序是上面这样的姿态:
public class NettyOioServer {
public void server(int port)
throws Exception {
final ByteBuf buf =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!rn", Charset.forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(
ChannelHandlerContext ctx)
throws Exception {
ctx.writeAndFlush(buf.duplicate())
.addListener(
ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
而后如果要改成异步非阻塞的模式,只须要把OioEventLoopGroup
改成NioEventLoopGroup
,把OioServerSocketChannel
改成NioServerSocketChannel
,简略到令人发指。
上面是Channel
的类关系图,
从这幅图看出ChannelConfig
和ChannelPipeline
都属于Channel
,在代码中体现为类的成员。ChannelPipeline
其实后面咱们也讲过了,它实现了责任链模式,把ChannelHandler
一个个串起来。通过后者咱们能够领有包含但不限于如下的性能:
- 数据的格局转换
- 异样告诉
- active或者inactive告诉
- EventLoop注册或者登记事件告诉
- 用户自定义事件告诉
上面列举了一些Channle
自身提供的重要办法。
办法名
解释
eventLoop()
返回调配到channel上的eventloop
pipeline()
返回调配到channel上的channelpipeline
isActive()
返回到channel是否连贯到一个近程服务
localAddress()
返回本地绑定的socketAddress
remoteAddress()
返回近程绑定的socketAddress
write()
写入数据到近程(客户端或者服务端),数据会通过channelpipeline
有些办法咱们曾经在后面的示例中见过了。来看下write()
办法的应用示例:
Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
ChannelFuture cf = channel.writeAndFlush(buf);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
System.out.println("Write successful");
} else {
System.err.println("Write error");
future.cause().printStackTrace();
}
}
});
简略解释下,
buf里是要写的数据,而后调用write办法写入数据,返回一个写入的future后果。后面曾经说过这个future了,咱们给future增加一个监听器,以便写入胜利后能够通过回调失去告诉。
另外write这个办法也是线程平安的,上面是一个用多线程操作write办法的示例,
final Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere
final ByteBuf buf = Unpooled.copiedBuffer("your data",
CharsetUtil.UTF_8);
Runnable writer = new Runnable() {
@Override
public void run() {
channel.write(buf.duplicate());
}
};
Executor executor = Executors.newCachedThreadPool();
// write in one thread
executor.execute(writer);
// write in another thread
executor.execute(writer);
//...
}
netty保障write办法线程平安的原理,是将用户线程的操作封装成Task放入音讯队列中,底层由同一个I/O线程负责执行,这样就实现了部分无锁化。
这部分要解释分明须要深刻到源码底层,因为本篇系列是netty in action的笔记系列就不多说了。前面可能思考写一个源码解析系列在深刻这一块。
反对的四种传输方式
NIO
这是netty最常见的应用场景。当channel状态变更时用户能够收到告诉,有以下几个状态:
- 新的channel被accept
- channel连贯胜利
- channel收到数据
- channel发送数据
如上图所示,netty外部其实也是封装了JDK的NIO,应用selector来治理IO状态的变更。在后面的章节里咱们其实给过JDK NIO的代码示例,这里就不贴出来了。
netty NIO模型里有一个不得不说的个性叫zero-file-copy
,很多中央翻译成零拷贝。这种个性能够让咱们间接在文件系统和网卡传输数据,防止了数据从内核空间到用户空间的拷贝。
OIO
OIO是在netty里是一种折中的存在,阻塞的形式只管利用场景很少,然而不代表不存在。比方通过jdbc调用数据库,如果是异步的计划是不太适合的。
netty的OIO模型底层也是调用JDK,后面的笔记咱们也给过示例。这种模型就是用一个线程解决监听(accetp),而后为每个胜利的连贯创立一个解决线程。这样做的目标是避免对于某个连贯的解决阻塞影响其它连贯,毕竟I/O操作是很容易引起阻塞的。
既然是阻塞的模型,netty的封装能做的工作也无限。netty只是给socket上加了SO_TIMEOUT
,这样如果一个操作在超时工夫内没有实现,就会抛出SocketTimeoutException
,netty会捕捉这个异样,而后持续前面的流程。而后就是下一个EventLoop执行,周而复始。这种解决计划弊病在于抛出异样的开销,因为异样会占用堆栈。
这个图就是对下面的概括,调配一个线程给socket,socket连贯服务器而后读数据,读数据可能阻塞也可能胜利。如果是前者捕捉异样后再次重试。
Local In VM transport
netty蕴含对本地传输的反对,这个传输实现应用雷同的API用于虚拟机之间的通信,传输是齐全异步的。
每个Channel应用惟一的SocketAddress,客户端通过应用SocketAddress进行连贯,在服务器会被注册为长期运行,一旦通道敞开,它会主动登记,客户端无奈再应用它。
应用本地传输服务器的行为与其余的传输实现简直是雷同的,须要留神的一个重点是只能在本地的服务器和客户端上应用它们。
Embedded transport
Embedded transport能够让你更容易的在不同的ChannelHandler之间的交互,更多的时候它像是一个工具类。个别用于测试的场景。它自带了一个具体的Channel实现,EmbeddedChannel
。比方上面是一个应用示例:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException("frameLength must be positive integer: " + frameLength);
}
this.frameLength = frameLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= frameLength) {
ByteBuf buf = in.readBytes(frameLength);
out.add(buf);
}
}
}
@Test
public void testFramesDecoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
Assert.assertTrue(channel.writeInbound(input.retain()));
Assert.assertTrue(channel.finish());
ByteBuf read = channel.readInbound();
Assert.assertEquals(buf.readSlice(3), read);
read.release();
read = channel.readInbound();
Assert.assertEquals(buf.readSlice(3), read);
read.release();
read = channel.readInbound();
Assert.assertEquals(buf.readSlice(3), read);
read.release();
Assert.assertNull(channel.readInbound());
buf.release();
}
用到的几个办法解释下,
- writeInbound 将入站音讯写到EmbeddedChannel中。如果能够通过readInbound办法从EmbeddedChannel中读取数据,则返回true
- readInbound 从EmbeddedChannel中读取入站音讯。任何返回货色都通过整个ChannelPipeline。如果没有任何可供读取的,则返回null
- finish 将EmbeddedChannel标记为实现,如果有可读取的入站或出站数据,则返回true。这个办法还将会调用EmbeddedChannel上的close办法
更多的应用细节能够去网上理解下。
发表回复