关于netty:netty-in-action学习笔记第四章

43次阅读

共计 7048 个字符,预计需要花费 18 分钟才能阅读完成。

netty 提供了对立的 API 进行传输数据,这个相比于 JDK 的形式不便很多。比方上面是一个不必 netty 而应用原生的阻塞 IO 进行传输的例子。

public class PlainOioServer {public void serve(int port) throws IOException {final ServerSocket socket = new ServerSocket(port);
        try {for(;;) {final Socket clientSocket = socket.accept();
                System.out.println("Accepted connection from" + clientSocket);
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        OutputStream out;
                        try {out = clientSocket.getOutputStream();
                            out.write("Hi!rn".getBytes(Charset.forName("UTF-8")));
                            out.flush();
                            clientSocket.close();} catch (IOException e) {e.printStackTrace();
                        } finally {
                            try {clientSocket.close();
                            } catch (IOException ex) {// ignore on close}
                        }
                    }
                }).start();}
        } catch (IOException e) {e.printStackTrace();
        }
    }
} 

代码很好了解,为每一个新来的连贯创立一个线程解决。这种形式有个比拟大的问题是,客户端连接数受限于服务器所能接受的线程数。为了改良这个问题咱们能够应用异步模式来重写这段代码,然而你会发现,简直所有的代码都要重写。原生的 OIO 和 NIO 的 API 简直齐全不能复用。不信你看看上面这段 NIO 的代码,

public class PlainNioServer {public void serve(int port) throws IOException {ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi!rn".getBytes());
        for (;;){
            try {selector.select();
            } catch (IOException ex) {ex.printStackTrace();
                //handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {SelectionKey key = iterator.next();
                iterator.remove();
                try {if (key.isAcceptable()) {
                        ServerSocketChannel server =
                                (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE |
                                SelectionKey.OP_READ, msg.duplicate());
                        System.out.println("Accepted connection from" + client);
                    }
                    if (key.isWritable()) {
                        SocketChannel client =
                                (SocketChannel) key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {if (client.write(buffer) == 0) {break;}
                        }
                        client.close();}
                } catch (IOException ex) {key.cancel();
                    try {key.channel().close();} catch (IOException cex) {// ignore on close}
                }
            }
        }
    }
} 

这个代码不做过多解释了,毕竟咱们的重点是 netty 不是 JDK NIO。

用 netty 实现一个 OIO 的程序是上面这样的姿态:

public class NettyOioServer {public void server(int port)
            throws Exception {
        final ByteBuf buf =
                Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!rn", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                        @Override
                                        public void channelActive(ChannelHandlerContext ctx)
                                                throws Exception {ctx.writeAndFlush(buf.duplicate())
                                                    .addListener(ChannelFutureListener.CLOSE);
                                        }
                                    });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}
    }
} 

而后如果要改成异步非阻塞的模式,只须要把 OioEventLoopGroup 改成 NioEventLoopGroup,把OioServerSocketChannel 改成NioServerSocketChannel,简略到令人发指。

上面是 Channel 的类关系图,

从这幅图看出 ChannelConfigChannelPipeline都属于 Channel,在代码中体现为类的成员。ChannelPipeline 其实后面咱们也讲过了,它实现了责任链模式,把 ChannelHandler 一个个串起来。通过后者咱们能够领有包含但不限于如下的性能:

  • 数据的格局转换
  • 异样告诉
  • active 或者 inactive 告诉
  • EventLoop 注册或者登记事件告诉
  • 用户自定义事件告诉

上面列举了一些 Channle 自身提供的重要办法。

办法名

解释

eventLoop()

返回调配到 channel 上的 eventloop

pipeline()

返回调配到 channel 上的 channelpipeline

isActive()

返回到 channel 是否连贯到一个近程服务

localAddress()

返回本地绑定的 socketAddress

remoteAddress()

返回近程绑定的 socketAddress

write()

写入数据到近程(客户端或者服务端),数据会通过 channelpipeline

有些办法咱们曾经在后面的示例中见过了。来看下 write() 办法的应用示例:

Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere
        ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
        ChannelFuture cf = channel.writeAndFlush(buf);
        cf.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {if (future.isSuccess()) {System.out.println("Write successful");
                } else {System.err.println("Write error");
                    future.cause().printStackTrace();
                }
            }
        }); 

简略解释下,

buf 里是要写的数据,而后调用 write 办法写入数据,返回一个写入的 future 后果。后面曾经说过这个 future 了,咱们给 future 增加一个监听器,以便写入胜利后能够通过回调失去告诉。

另外 write 这个办法也是线程平安的,上面是一个用多线程操作 write 办法的示例,

final Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere
        final ByteBuf buf = Unpooled.copiedBuffer("your data",
                CharsetUtil.UTF_8);
        Runnable writer = new Runnable() {
            @Override
            public void run() {channel.write(buf.duplicate());
            }
        };
        Executor executor = Executors.newCachedThreadPool();

        // write in one thread
        executor.execute(writer);

        // write in another thread
        executor.execute(writer);
        //...
    } 

netty 保障 write 办法线程平安的原理,是将用户线程的操作封装成 Task 放入音讯队列中,底层由同一个 I / O 线程负责执行,这样就实现了部分无锁化。

这部分要解释分明须要深刻到源码底层,因为本篇系列是 netty in action 的笔记系列就不多说了。前面可能思考写一个源码解析系列在深刻这一块。

反对的四种传输方式

NIO

这是 netty 最常见的应用场景。当 channel 状态变更时用户能够收到告诉,有以下几个状态:

  • 新的 channel 被 accept
  • channel 连贯胜利
  • channel 收到数据
  • channel 发送数据

如上图所示,netty 外部其实也是封装了 JDK 的 NIO,应用 selector 来治理 IO 状态的变更。在后面的章节里咱们其实给过 JDK NIO 的代码示例,这里就不贴出来了。

netty NIO 模型里有一个不得不说的个性叫zero-file-copy,很多中央翻译成零拷贝。这种个性能够让咱们间接在文件系统和网卡传输数据,防止了数据从内核空间到用户空间的拷贝。

OIO

OIO 是在 netty 里是一种折中的存在,阻塞的形式只管利用场景很少,然而不代表不存在。比方通过 jdbc 调用数据库,如果是异步的计划是不太适合的。

netty 的 OIO 模型底层也是调用 JDK,后面的笔记咱们也给过示例。这种模型就是用一个线程解决监听(accetp),而后为每个胜利的连贯创立一个解决线程。这样做的目标是避免对于某个连贯的解决阻塞影响其它连贯,毕竟 I / O 操作是很容易引起阻塞的。

既然是阻塞的模型,netty 的封装能做的工作也无限。netty 只是给 socket 上加了SO_TIMEOUT,这样如果一个操作在超时工夫内没有实现,就会抛出SocketTimeoutException,netty 会捕捉这个异样,而后持续前面的流程。而后就是下一个 EventLoop 执行,周而复始。这种解决计划弊病在于抛出异样的开销,因为异样会占用堆栈。

这个图就是对下面的概括,调配一个线程给 socket,socket 连贯服务器而后读数据,读数据可能阻塞也可能胜利。如果是前者捕捉异样后再次重试。

Local In VM transport

netty 蕴含对本地传输的反对,这个传输实现应用雷同的 API 用于虚拟机之间的通信, 传输是齐全异步的。

每个 Channel 应用惟一的 SocketAddress,客户端通过应用 SocketAddress 进行连贯,在服务器会被注册为长期运行,一旦通道敞开,它会主动登记,客户端无奈再应用它。

应用本地传输服务器的行为与其余的传输实现简直是雷同的,须要留神的一个重点是只能在本地的服务器和客户端上应用它们。

Embedded transport

Embedded transport 能够让你更容易的在不同的 ChannelHandler 之间的交互,更多的时候它像是一个工具类。个别用于测试的场景。它自带了一个具体的 Channel 实现,EmbeddedChannel。比方上面是一个应用示例:

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {if (frameLength <= 0) {throw new IllegalArgumentException("frameLength must be positive integer:" + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {while (in.readableBytes() >= frameLength) {ByteBuf buf = in.readBytes(frameLength);
            out.add(buf);
        }
    }
} 
@Test
    public void testFramesDecoded() {ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
        Assert.assertTrue(channel.writeInbound(input.retain()));
        Assert.assertTrue(channel.finish());

        ByteBuf read = channel.readInbound();
        Assert.assertEquals(buf.readSlice(3), read);
        read.release();

        read = channel.readInbound();
        Assert.assertEquals(buf.readSlice(3), read);
        read.release();

        read = channel.readInbound();
        Assert.assertEquals(buf.readSlice(3), read);
        read.release();

        Assert.assertNull(channel.readInbound());
        buf.release();} 

用到的几个办法解释下,

  • writeInbound 将入站音讯写到 EmbeddedChannel 中。如果能够通过 readInbound 办法从 EmbeddedChannel 中读取数据,则返回 true
  • readInbound 从 EmbeddedChannel 中读取入站音讯。任何返回货色都通过整个 ChannelPipeline。如果没有任何可供读取的,则返回 null
  • finish 将 EmbeddedChannel 标记为实现,如果有可读取的入站或出站数据,则返回 true。这个办法还将会调用 EmbeddedChannel 上的 close 办法

更多的应用细节能够去网上理解下。

正文完
 0