关于java:Netty中8大组件详解

5次阅读

共计 27851 个字符,预计需要花费 70 分钟才能阅读完成。

Netty 概述

1、什么是 Netty

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty 是一个异步的、基于事件驱动的网络应用框架,用于疾速开发可保护、高性能的网络服务器和客户端

留神:netty 的异步还是基于多路复用的,并没有实现真正意义上的异步 IO

2、Netty 的劣势

如果应用传统 NIO,其工作量大,bug 多

  • 须要本人构建协定
  • 解决 TCP 传输问题,如粘包、半包
  • 因为 bug 的存在,epoll 空轮询导致 CPU 100%

Netty 对 API 进行加强,使之更易用,如

  • FastThreadLocal => ThreadLocal
  • ByteBuf => ByteBuffer

3、入门案例

1、服务器端代码

public class HelloServer {public static void main(String[] args) {
        // 1、启动器,负责拆卸 netty 组件,启动服务器
        new ServerBootstrap()
                // 2、创立 NioEventLoopGroup,能够简略了解为 线程池 + Selector
                .group(new NioEventLoopGroup())
                // 3、抉择服务器的 ServerSocketChannel 实现
                .channel(NioServerSocketChannel.class)
                // 4、child 负责解决读写,该办法决定了 child 执行哪些操作
                // ChannelInitializer 处理器(仅执行一次)// 它的作用是待客户端 SocketChannel 建设连贯后,执行 initChannel 以便增加更多的处理器
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 5、SocketChannel 的处理器,应用 StringDecoder 解码,ByteBuf=>String
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        // 6、SocketChannel 的业务解决,应用上一个处理器的处理结果
                        nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println(s);
                            }
                        });
                    }
                    // 7、ServerSocketChannel 绑定 8080 端口
                }).bind(8080);
    }
}

2、客户端代码

public class HelloClient {public static void main(String[] args) throws InterruptedException {new Bootstrap()
                .group(new NioEventLoopGroup())
                // 抉择客户 Socket 实现类,NioSocketChannel 示意基于 NIO 的客户端实现
                .channel(NioSocketChannel.class)
                // ChannelInitializer 处理器(仅执行一次)// 它的作用是待客户端 SocketChannel 建设连贯后,执行 initChannel 以便增加更多的处理器
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        // 音讯会通过通道 handler 解决,这里是将 String => ByteBuf 编码收回
                        channel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 指定要连贯的服务器和端口
                .connect(new InetSocketAddress("localhost", 8080))
                // Netty 中很多办法都是异步的,如 connect
                // 这时须要应用 sync 办法期待 connect 建设连贯结束
                .sync()
                // 获取 channel 对象,它即为通道形象,能够进行数据读写操作
                .channel()
                // 写入音讯并清空缓冲区
                .writeAndFlush("hello world");
    }
}

3、运行流程

左:客户端 右:服务器端

组件解释

  • channel 能够了解为数据的通道
  • msg 了解为流动的数据,最开始输出是 ByteBuf,但通过 pipeline 中的各个 handler 加工,会变成其它类型对象,最初输入又变成 ByteBuf
  • handler 能够了解为数据的解决工序

    • 工序有多道,合在一起就是 pipeline(传递路径),pipeline 负责公布事件(读、读取实现…)流传给每个 handler,handler 对本人感兴趣的事件进行解决(重写了相应事件处理办法)

      • pipeline 中有多个 handler,解决时会顺次调用其中的 handler
    • handler 分 Inbound 和 Outbound 两类

      • Inbound 入站
    • Outbound 出站
  • eventLoop 能够了解为解决数据的工人

    • eventLoop 能够治理多个 channel 的 io 操作,并且一旦 eventLoop 负责了某个 channel,就 会将其与 channel 进行绑定,当前该 channel 中的 io 操作都由该 eventLoop 负责
    • eventLoop 既能够执行 io 操作,也能够进行工作解决,每个 eventLoop 有本人的工作队列,队列里能够堆放多个 channel 的待处理工作,工作分为一般工作、定时工作
    • eventLoop 依照 pipeline 程序,顺次依照 handler 的布局(代码)解决数据,能够为每个 handler 指定不同的 eventLoop

1、EventLoop

事件循环对象 EventLoop

EventLoop 实质是一个 单线程执行器(同时保护了一个 Selector),外面有 run 办法解决一个或多个 Channel 上源源不断的 io 事件

它的继承关系如下

  • 继承自 j.u.c.ScheduledExecutorService 因而蕴含了线程池中所有的办法
  • 继承自 netty 本人的 OrderedEventExecutor

    • 提供了 boolean inEventLoop (Thread thread) 办法判断一个线程是否属于此 EventLoop
    • 提供了 EventLoopGroup parent () 办法来看看本人属于哪个 EventLoopGroup

事件循环组 EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 个别会调用 EventLoopGroup 的 register 办法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来解决(保障了 io 事件处理时的线程平安)

  • 继承自 netty 本人的 EventExecutorGroup

    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 办法获取汇合中下一个 EventLoop

1.1 解决一般与定时工作

public class TestEventLoop {public static void main(String[] args) {
        // 创立领有两个 EventLoop 的 NioEventLoopGroup,对应两个线程
        EventLoopGroup group = new NioEventLoopGroup(2);
        // 通过 next 办法能够取得下一个 EventLoop
        System.out.println(group.next());
        System.out.println(group.next());

        // 通过 EventLoop 执行一般工作
        group.next().execute(()->{System.out.println(Thread.currentThread().getName() + "hello");
        });

        // 通过 EventLoop 执行定时工作
        group.next().scheduleAtFixedRate(()->{System.out.println(Thread.currentThread().getName() + "hello2");
        }, 0, 1, TimeUnit.SECONDS);
        
        // 优雅地敞开
        group.shutdownGracefully();}
}

输入后果如下

io.netty.channel.nio.NioEventLoop@7bb11784
io.netty.channel.nio.NioEventLoop@33a10788
nioEventLoopGroup-2-1 hello
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2

敞开 EventLoopGroup

优雅敞开 shutdownGracefully 办法。该办法会首先切换 EventLoopGroup 到敞开状态从而回绝新的工作的退出,而后在工作队列的工作都解决实现后,进行线程的运行。从而确保整体利用是在失常有序的状态下退出的

1.2 解决 IO 工作

服务器代码
public class MyServer {public static void main(String[] args) {new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));

                            }
                        });
                    }
                })
                .bind(8080);
    }
}
客户端代码
public class MyClient {public static void main(String[] args) throws IOException, InterruptedException {Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        System.out.println(channel);
        // 此处打断点调试,调用 channel.writeAndFlush(...);
        System.in.read();}
}

1.3 分工

Bootstrap 的 group () 办法 能够传入两个 EventLoopGroup 参数,别离负责解决不同的事件

public class MyServer {public static void main(String[] args) {new ServerBootstrap()
                // 两个 Group,别离为 Boss 负责 Accept 事件,Worker 负责读写事件
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
            
                ...
    }
}

多个客户端别离发送 hello 后果

nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4

能够看出,一个 EventLoop 能够 负责多个 Channel,且 EventLoop 一旦与 Channel 绑定,则 始终负责 解决该 Channel 中的事件

减少自定义 EventLoopGroup

当有的 工作须要较长的工夫解决时,能够应用非 NioEventLoopGroup,防止同一个 NioEventLoop 中的其余 Channel 在较长的工夫内都无奈失去解决

   public class MyServer {public static void main(String[] args) {
        // 减少自定义的非 NioEventLoopGroup
        EventLoopGroup group = new DefaultEventLoopGroup();
        
        new ServerBootstrap()
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 减少两个 handler,第一个应用 NioEventLoopGroup 解决,第二个应用自定义 EventLoopGroup 解决
                        socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                // 调用下一个 handler
                                ctx.fireChannelRead(msg);
                            }
                        })
                        // 该 handler 绑定自定义的 Group
                        .addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

启动四个客户端发送数据

nioEventLoopGroup-4-1 hello1
defaultEventLoopGroup-2-1 hello1
nioEventLoopGroup-4-2 hello2
defaultEventLoopGroup-2-2 hello2
nioEventLoopGroup-4-1 hello3
defaultEventLoopGroup-2-3 hello3
nioEventLoopGroup-4-2 hello4
defaultEventLoopGroup-2-4 hello4

能够看出,客户端与服务器之间的事件,被 nioEventLoopGroup 和 defaultEventLoopGroup 别离解决

切换的实现

不同的 EventLoopGroup 切换的实现原理如下

由下面的图能够看出,当 handler 中绑定的 Group 不同时,须要切换 Group 来执行不同的工作

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 取得下一个 EventLoop, excutor 即为 EventLoopGroup
    EventExecutor executor = next.executor();
    
    // 如果下一个 EventLoop 在以后的 EventLoopGroup 中
    if (executor.inEventLoop()) {
        // 应用以后 EventLoopGroup 中的 EventLoop 来解决工作
        next.invokeChannelRead(m);
    } else {
        // 否则让另一个 EventLoopGroup 中的 EventLoop 来创立工作并执行
        executor.execute(new Runnable() {public void run() {next.invokeChannelRead(m);
            }
        });
    }
}
  • 如果两个 handler 绑定的是 同一个 EventLoopGroup,那么就间接调用
  • 否则,把要调用的代码封装为一个工作对象,由下一个 handler 的 EventLoopGroup 来调用

2、Channel

Channel 的罕用办法

  • close () 能够用来敞开 Channel
  • closeFuture () 用来解决 Channel 的敞开

    • sync 办法作用是同步期待 Channel 敞开
    • 而 addListener 办法是异步期待 Channel 敞开
  • pipeline () 办法用于增加处理器
  • write () 办法将数据写入

    • 因为缓冲机制,数据被写入到 Channel 中当前,不会立刻被发送
    • 只有当缓冲满了或者调用了 flush () 办法后,才会将数据通过 Channel 发送进来
  • writeAndFlush () 办法将数据写入并 立刻发送(刷出)

2.1 ChannelFuture

连贯问题

拆分客户端代码

public class MyClient {public static void main(String[] args) throws IOException, InterruptedException {ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 该办法为异步非阻塞办法,主线程调用后不会被阻塞,真正去执行连贯操作的是 NIO 线程
                // NIO 线程:NioEventLoop 中的线程
                .connect(new InetSocketAddress("localhost", 8080));
        
        // 该办法用于期待连贯真正建设
        channelFuture.sync();
        
        // 获取客户端 - 服务器之间的 Channel 对象
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello world");
        System.in.read();}
}

如果咱们去掉 channelFuture.sync() 办法,会服务器无奈收到 hello world

这是因为建设连贯 (connect) 的过程是 异步非阻塞 的,若不通过 sync() 办法阻塞主线程,期待连贯真正建设,这时通过 channelFuture.channel () 拿到的 Channel 对象,并不是真正与服务器建设好连贯的 Channel,也就没法将信息正确的传输给服务器端

所以须要通过 channelFuture.sync() 办法,阻塞主线程,同步处理结果 ,期待连贯真正建设好当前,再去取得 Channel 传递数据。应用该办法,获取 Channel 和发送数据的线程 都是主线程

上面还有一种办法,用于 异步 获取建设连贯后的 Channel 和发送数据,使得执行这些操作的线程是 NIO 线程(去执行 connect 操作的线程)

addListener 办法

通过这种办法能够在 NIO 线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作

public class MyClient {public static void main(String[] args) throws IOException, InterruptedException {ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 该办法为异步非阻塞办法,主线程调用后不会被阻塞,真正去执行连贯操作的是 NIO 线程
                // NIO 线程:NioEventLoop 中的线程
                .connect(new InetSocketAddress("localhost", 8080));
        
        // 当 connect 办法执行结束后,也就是连贯真正建设后
        // 会在 NIO 线程中调用 operationComplete 办法
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {Channel channel = channelFuture.channel();
                channel.writeAndFlush("hello world");
            }
        });
        System.in.read();}
}
解决敞开
public class ReadClient {public static void main(String[] args) throws InterruptedException {
        // 创立 EventLoopGroup,应用结束后敞开
        NioEventLoopGroup group = new NioEventLoopGroup();
        
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync();

        Channel channel = channelFuture.channel();
        Scanner scanner = new Scanner(System.in);

        // 创立一个线程用于输出并向服务器发送
        new Thread(()->{while (true) {String msg = scanner.next();
                if ("q".equals(msg)) {
                    // 敞开操作是异步的,在 NIO 线程中执行
                    channel.close();
                    break;
                }
                channel.writeAndFlush(msg);
            }
        }, "inputThread").start();

        // 取得 closeFuture 对象
        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("waiting close...");
        
        // 同步期待 NIO 线程执行完 close 操作
        closeFuture.sync();
        
        // 敞开之后执行一些操作,能够保障执行的操作肯定是在 channel 敞开当前执行的
        System.out.println("敞开之后执行一些额定操作...");
        
        // 敞开 EventLoopGroup
        group.shutdownGracefully();}
}

敞开 channel

当咱们要敞开 channel 时,能够调用 channel.close () 办法进行敞开。然而该办法也是一个 异步办法 。真正的敞开操作并不是在调用该办法的线程中执行的,而是 在 NIO 线程中执行真正的敞开操作

如果咱们想在 channel 真正敞开当前,执行一些额定的操作,能够抉择以下两种办法来实现

  • 通过 channel.closeFuture () 办法取得对应的 ChannelFuture 对象,而后调用 sync () 办法阻塞执行操作的线程,期待 channel 真正敞开后,再执行其余操作
// 取得 closeFuture 对象
ChannelFuture closeFuture = channel.closeFuture();

// 同步期待 NIO 线程执行完 close 操作
closeFuture.sync();
  • 调用 closeFuture.addListener 办法,增加 close 的后续操作
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        // 期待 channel 敞开后才执行的操作
        System.out.println("敞开之后执行一些额定操作...");
        // 敞开 EventLoopGroup
        group.shutdownGracefully();}
});

3、Future 与 Promise

3.1 概念

netty 中的 Future 与 jdk 中的 Future 同名,然而是两个接口

netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩大

  • jdk Future 只能同步期待工作完结(或胜利、或失败)能力失去后果
  • netty Future 能够同步期待工作完结失去后果,也能够异步形式失去后果,但都是要等工作完结
  • netty Promise 不仅有 netty Future 的性能,而且脱离了工作独立存在,只作为两个线程间传递后果的容器

### 3.2 JDK Future

public class JdkFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadFactory factory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {return new Thread(r, "JdkFuture");
            }
        };
        // 创立线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);

        // 取得 Future 对象
        Future<Integer> future = executor.submit(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {TimeUnit.SECONDS.sleep(1);
                return 50;
            }
        });

        // 通过阻塞的形式,取得运行后果
        System.out.println(future.get());
    }
}

### 3.3 Netty Future

public class NettyFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();

        // 取得 EventLoop 对象
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {return 50;}
        });

        // 主线程中获取后果
        System.out.println(Thread.currentThread().getName() + "获取后果");
        System.out.println("getNow" + future.getNow());
        System.out.println("get" + future.get());

        // NIO 线程中异步获取后果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {System.out.println(Thread.currentThread().getName() + "获取后果");
                System.out.println("getNow" + future.getNow());
            }
        });
    }
}

运行后果

main 获取后果
getNow null
get 50
nioEventLoopGroup-2-1 获取后果
getNow 50

Netty 中的 Future 对象,能够通过 EventLoop 的 sumbit () 办法失去

  • 能够通过 Future 对象的 get 办法,阻塞地获取返回后果
  • 也能够通过 getNow 办法,获取后果,若还没有后果,则返回 null,该办法是非阻塞的
  • 还能够通过 future.addListener 办法,在 Callable 办法执行的线程中,异步获取返回后果

3.4 Netty Promise

Promise 相当于一个容器,能够用于寄存各个线程中的后果,而后让其余线程去获取该后果

public class NettyPromise {public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创立 EventLoop
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        // 创立 Promise 对象,用于寄存后果
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        new Thread(()->{
            try {TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            // 自定义线程向 Promise 中寄存后果
            promise.setSuccess(50);
        }).start();

        // 主线程从 Promise 中获取后果
        System.out.println(Thread.currentThread().getName() + " " + promise.get());
    }
}

4、Handler 与 Pipeline

4.1 Pipeline

public class PipeLineServer {public static void main(String[] args) {new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 在 socketChannel 的 pipeline 中增加 handler
                        // pipeline 中 handler 是带有 head 与 tail 节点的双向链表,的理论构造为
                         // head <-> handler1 <-> ... <-> handler4 <->tail
                        // Inbound 次要解决入站操作,个别为读操作,产生入站操作时会触发 Inbound 办法
                        // 入站时,handler 是从 head 向后调用的
                        socketChannel.pipeline().addLast("handler1" ,new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(Thread.currentThread().getName() + "Inbound handler 1");
                                // 父类该办法外部会调用 fireChannelRead
                                // 将数据传递给下一个 handler
                                super.channelRead(ctx, msg);
                            }
                        });
                        socketChannel.pipeline().addLast("handler2", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(Thread.currentThread().getName() + "Inbound handler 2");
                                // 执行 write 操作,使得 Outbound 的办法可能失去调用
          socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));
                                super.channelRead(ctx, msg);
                            }
                        });
                        // Outbound 次要解决出站操作,个别为写操作,产生出站操作时会触发 Outbound 办法
                        // 出站时,handler 的调用是从 tail 向前调用的
                        socketChannel.pipeline().addLast("handler3" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + "Outbound handler 1");
                                super.write(ctx, msg, promise);
                            }
                        });
                        socketChannel.pipeline().addLast("handler4" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + "Outbound handler 2");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

运行后果如下

nioEventLoopGroup-2-2 Inbound handler 1
nioEventLoopGroup-2-2 Inbound handler 2
nioEventLoopGroup-2-2 Outbound handler 2
nioEventLoopGroup-2-2 Outbound handler 1

通过 channel.pipeline ().addLast (name, handler) 增加 handler 时,记得给 handler 取名字。这样能够调用 pipeline 的 addAfter、addBefore 等办法更灵便地向 pipeline 中增加 handler

handler 须要放入通道的 pipeline 中,能力依据放入程序来应用 handler

  • pipeline 是构造是一个带有 head 与 tail 指针的双向链表,其中的节点为 handler

    • 要通过 ctx.fireChannelRead (msg) 等办法,将以后 handler 的处理结果传递给下一个 handler
  • 当有 入站(Inbound)操作时,会从 head 开始向后 调用 handler,直到 handler 不是解决 Inbound 操作为止
  • 当有 出站(Outbound)操作时,会从 tail 开始向前 调用 handler,直到 handler 不是解决 Outbound 操作为止

具体构造如下

调用程序如下

4.2 OutboundHandler

socketChannel.writeAndFlush()

当 handler 中调用该办法进行写操作时,会触发 Outbound 操作,此时是从 tail 向前寻找 OutboundHandler

ctx.writeAndFlush()

当 handler 中调用该办法进行写操作时,会触发 Outbound 操作,此时是从以后 handler 向前寻找 OutboundHandler

4.3 EmbeddedChannel

EmbeddedChannel 能够用于测试各个 handler,通过其构造函数按程序传入须要测试 handler,而后调用对应的 Inbound 和 Outbound 办法即可

public class TestEmbeddedChannel {public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("1");
                super.channelRead(ctx, msg);
            }
        };

        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("2");
                super.channelRead(ctx, msg);
            }
        };

        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("3");
                super.write(ctx, msg, promise);
            }
        };

        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("4");
                super.write(ctx, msg, promise);
            }
        };

        // 用于测试 Handler 的 Channel
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        
        // 执行 Inbound 操作 
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
        // 执行 Outbound 操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
    }
}

5、ByteBuf

调试工具办法

private static void log(ByteBuf buffer) {int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
        .append("read index:").append(buffer.readerIndex())
        .append("write index:").append(buffer.writerIndex())
        .append("capacity:").append(buffer.capacity())
        .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}

该办法能够帮忙咱们更为具体地查看 ByteBuf 中的内容

5.1 创立

public class ByteBufStudy {public static void main(String[] args) {
        // 创立 ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
        ByteBufUtil.log(buffer);

        // 向 buffer 中写入数据
        StringBuilder sb = new StringBuilder();
        for(int i = 0; i < 20; i++) {sb.append("a");
        }
        buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));

        // 查看写入后果
        ByteBufUtil.log(buffer);
    }
}

运行后果

read index:0 write index:0 capacity:16

read index:0 write index:20 capacity:64
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000010| 61 61 61 61                                     |aaaa            |
+--------+-------------------------------------------------+----------------+
  • ByteBuf 通过 ByteBufAllocator 抉择 allocator 并调用对应的 buffer () 办法来创立的 ,默认应用 间接内存 作为 ByteBuf,容量为 256 个字节,能够指定初始容量的大小
  • 当 ByteBuf 的容量无奈包容所有数据时,ByteBuf 会进行扩容操作
  • 如果在 handler 中创立 ByteBuf,倡议应用 ChannelHandlerContext ctx.alloc ().buffer () 来创立

5.2 间接内存与堆内存

通过该办法创立的 ByteBuf,应用的是 基于间接内存 的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);

能够应用上面的代码来创立池化 基于堆 的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);

也能够应用上面的代码来创立池化 基于间接内存 的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
  • 间接内存创立和销毁的代价低廉,但读写性能高(少一次内存复制),适宜配合池化性能一起用
  • 间接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的治理,但也要留神及时被动开释

验证

public class ByteBufStudy {public static void main(String[] args) {ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
        System.out.println(buffer.getClass());

        buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
        System.out.println(buffer.getClass());

        buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
        System.out.println(buffer.getClass());
    }
}
// 应用池化的间接内存
class io.netty.buffer.PooledUnsafeDirectByteBuf
    
// 应用池化的堆内存    
class io.netty.buffer.PooledUnsafeHeapByteBuf
    
// 应用池化的间接内存    
class io.netty.buffer.PooledUnsafeDirectByteBuf

5.3 池化与非池化

池化的最大意义在于能够 重用 ByteBuf,长处有

  • 没有池化,则每次都得创立新的 ByteBuf 实例,这个操作对间接内存代价低廉,就算是堆内存,也会减少 GC 压力
  • 有了池化,则能够重用池中 ByteBuf 实例,并且采纳了与 jemalloc 相似的内存调配算法晋升调配效率
  • 高并发时,池化性能更节约内存,缩小内存溢出的可能

池化性能是否开启,能够通过上面的零碎环境变量来设置

-Dio.netty.allocator.type={unpooled|pooled}
  • 4.1 当前,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
  • 4.1 之前,池化性能还不成熟,默认是非池化实现

5.4 组成

ByteBuf 次要有以下几个组成部分

  • 最大容量与以后容量

    • 在结构 ByteBuf 时,可传入两个参数,别离代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为 Integer.MAX_VALUE
    • 当 ByteBuf 容量无奈包容所有数据时,会进行扩容操作,若超出 最大容量,会抛出 java.lang.IndexOutOfBoundsException 异样
  • 读写操作不同于 ByteBuffer 只用 position 进行管制,ByteBuf 别离由读指针和写指针两个指针管制。进行读写操作时,无需进行模式的切换

    • 读指针前的局部被称为废除局部,是曾经读过的内容
  • 读指针与写指针之间的空间称为可读局部

    • 写指针与以后容量之间的空间称为可写局部

5.5 写入

罕用办法如下

留神

  • 这些办法的未指明返回值的,其返回值都是 ByteBuf,意味着能够链式调用来写入不同的数据
  • 网络传输中,默认习惯是 Big Endian,应用 writeInt (int value)

应用办法

public class ByteBufStudy {public static void main(String[] args) {
        // 创立 ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
        ByteBufUtil.log(buffer);

        // 向 buffer 中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        ByteBufUtil.log(buffer);

        buffer.writeInt(5);
        ByteBufUtil.log(buffer);

        buffer.writeIntLE(6);
        ByteBufUtil.log(buffer);

        buffer.writeLong(7);
        ByteBufUtil.log(buffer);
    }
}

运行后果

read index:0 write index:0 capacity:16

read index:0 write index:4 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04                                     |....            |
+--------+-------------------------------------------------+----------------+

read index:0 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05                         |........        |
+--------+-------------------------------------------------+----------------+

read index:0 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00             |............    |
+--------+-------------------------------------------------+----------------+

read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+

还有一类办法是 set 结尾 的一系列办法,也 能够写入数据,但不会扭转写指针地位

5.6 扩容

当 ByteBuf 中的容量无奈包容写入的数据时,会进行扩容操作

buffer.writeLong(7);
ByteBufUtil.log(buffer);
// 扩容前
read index:0 write index:12 capacity:16
...

// 扩容后
read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+
扩容规定
  • 如何写入后数据大小未超过 512 字节,则抉择下一个 16 的整数倍进行扩容

    • 例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
  • 如果写入后数据大小超过 512 字节,则抉择下一个 2^n
  • 例如写入后大小为 513 字节,则扩容后 capacity 是 210=1024 字节(29=512 曾经不够了)
  • 扩容 不能超过 maxCapacity,否则会抛出 java.lang.IndexOutOfBoundsException 异样
Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)

5.7 读取

读取次要是通过一系列 read 办法进行读取,读取时会依据读取数据的字节数挪动读指针

如果须要 反复读取,须要调用 buffer.markReaderIndex() 对读指针进行标记,并通过 buffer.resetReaderIndex() 将读指针复原到 mark 标记的地位

public class ByteBufStudy {public static void main(String[] args) {
        // 创立 ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向 buffer 中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        buffer.writeInt(5);

        // 读取 4 个字节
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        ByteBufUtil.log(buffer);

        // 通过 mark 与 reset 实现反复读取
        buffer.markReaderIndex();
        System.out.println(buffer.readInt());
        ByteBufUtil.log(buffer);

        // 复原到 mark 标记处
        buffer.resetReaderIndex();
        ByteBufUtil.log(buffer);
    }
}
read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16

read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+

还有以 get 结尾的一系列办法,这些 办法不会扭转读指针的地位

5.8 开释

因为 Netty 中有堆外内存(间接内存)的 ByteBuf 实现,堆外内存最好是手动来开释,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 应用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 应用的就是间接内存了,须要非凡的办法来回收内存
  • PooledByteBuf 和它的子类应用了池化机制,须要更简单的规定来回收内存

Netty 这里采纳了援用计数法来管制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 办法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 办法计数加 1,示意调用者没用完之前,其它 handler 即便调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即便 ByteBuf 对象还在,其各个办法均无奈失常应用
开释规定

因为 pipeline 的存在,个别须要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已实现了它的使命,那么便无须再传递)

根本规定是,谁是最初使用者,谁负责 release

  • 终点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 办法中首次创立 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead (byteBuf))
  • 入站 ByteBuf 解决准则

    • 对原始 ByteBuf 不做解决,调用 ctx.fireChannelRead (msg) 向后传递,这时毋庸 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead (msg) 向后传递,那么也必须 release
    • 留神各种异样,如果 ByteBuf 没有胜利传递到下一个 ChannelHandler,必须 release
    • 假如音讯始终向后传,那么 TailContext 会负责开释未解决音讯(原始的 ByteBuf)
  • 出站 ByteBuf 解决准则

    • 出站音讯最终都会转为 ByteBuf 输入,始终向前传,由 HeadContext flush 后 release
  • 异样解决准则

    • 有时候不分明 ByteBuf 被援用了多少次,但又必须彻底开释,能够循环调用 release 直到返回 true
while (!buffer.release()) {}

当 ByteBuf 被传到了 pipeline 的 head 与 tail 时,ByteBuf 会被其中的办法彻底开释,但前提是 ByteBuf 被传递到了 head 与 tail 中

TailConext 中开释 ByteBuf 的源码

protected void onUnhandledInboundMessage(Object msg) {
    try {logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
    } finally {
        // 具体的开释办法
        ReferenceCountUtil.release(msg);
    }
}

判断传过来的是否为 ByteBuf,是的话才须要开释

public static boolean release(Object msg) {return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;}

5.9 切片

ByteBuf 切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有产生内存复制,还是应用原始 ByteBuf 的内存,切片后的 ByteBuf 保护独立的 read,write 指针

失去分片后的 buffer 后,要调用其 retain 办法,使其外部的援用计数加一。防止原 ByteBuf 开释,导致切片 buffer 无奈应用批改原 ByteBuf 中的值,也会影响切片后失去的 ByteBuf

public class TestSlice {public static void main(String[] args) {
        // 创立 ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向 buffer 中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});

        // 将 buffer 分成两局部
        ByteBuf slice1 = buffer.slice(0, 5);
        ByteBuf slice2 = buffer.slice(5, 5);

        // 须要让分片的 buffer 援用计数加一
        // 防止原 Buffer 开释导致分片 buffer 无奈应用
        slice1.retain();
        slice2.retain();
        
        ByteBufUtil.log(slice1);
        ByteBufUtil.log(slice2);

        // 更改原始 buffer 中的值
        System.out.println("=========== 批改原 buffer 中的值 ===========");
        buffer.setByte(0,5);

        System.out.println("=========== 打印 slice1===========");
        ByteBufUtil.log(slice1);
    }
}

运行后果

read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a                                  |.....           |
+--------+-------------------------------------------------+----------------+
=========== 批改原 buffer 中的值 ===========
=========== 打印 slice1===========
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+

5.10 劣势

  • 池化思维 – 能够重用池中 ByteBuf 实例,更节约内存,缩小内存溢出的可能
  • 读写指针拆散,不须要像 ByteBuffer 一样切换读写模式
  • 能够主动扩容
  • 反对链式调用,应用更晦涩
  • 很多中央体现零拷贝,例如
  • slice、duplicate、CompositeByteBuf

本文由 传智教育博学谷 教研团队公布。

如果本文对您有帮忙,欢送 关注 点赞 ;如果您有任何倡议也可 留言评论 私信,您的反对是我保持创作的能源。

转载请注明出处!

正文完
 0