乐趣区

关于netty:Netty应用入门及重要组件

1.Netty 简介

Netty 是由 JBOSS 提供的一个 java 开源框架。
Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以疾速开发高性能、高可靠性的网络服务器和客户端程序。


2. 为什么应用 Netty

尽管 JAVA NIO 框架提供了多路复用 IO 的反对,然而并没有提供下层“信息格式”的良好封装。例如前两者并没有提供针对 Protocol Buffer、JSON 这些信息格式的封装,然而 Netty 框架提供了这些数据格式封装(基于责任链模式的编
码和解码性能);

2、NIO 的类库和 API 相当简单,应用它来开发,须要十分熟练地把握 Selector、ByteBuffer、ServerSocketChannel、SocketChannel 等,须要很多额定的编程技能来辅助应用 NIO, 例如,因为 NIO 波及了 Reactor 线程模型,所以必须必须对多线程和网络编程十分相熟能力写出高质量的 NIO 程序

3、要编写一个牢靠的、易保护的、高性能的 NIO 服务器利用。除了框架自身要兼容实现各类操作系统的实现外。更重要的是它应该还要解决很多下层特有服务,例如:客户端的权限、还有下面提到的信息格式封装、简略的数据读取,断连重连,半包读写,心跳等等,这些 Netty 框架都提供了响应的反对。

4、JAVA NIO 框架存在一个 poll/epoll bug:Selector doesn’t block on Selector.select(timeout),不能 block 意味着 CPU 的使用率会变成 100%(这是底层 JNI 的问题,下层要解决这个异样实际上也好办)。当然这个 bug 只有在 Linux 内核上能力重现


3.Netty 重要组件

3.1 Channel 接口

在 Java 的网络编程中,其根本的结构是类 Socket。Netty 的 Channel 接口所提供的 API,被用于所有的 I/O 操作。大大地升高了间接应用 Socket 类的复杂性。此外,Channel 也是领有许多预约义的、专门化实现的宽泛类层次结构的根。


channel 生命周期

  • ChannelUnregistered:Channel 曾经被创立,但还未注册 EventLoop
  • ChannelRegistered:Channel 曾经被注册到了 EventLoop
  • ChannelActive:Channel 处于活动状态(曾经连贯到它的近程节点)。它当初能够接管和发送数据了
  • ChannelInactive:Channel 没有连贯到近程节点

当这些状态产生扭转时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其能够随后对它们做出响应。


channel 重要办法

1.eventLoop:返回调配给 Channel 的 EventLoop

2.pipeline:返回调配给 Channel 的 ChannelPipeline

3.isActive:如果 Channel 是流动的,则返回 true。流动的意义可能依赖于底层的传输。
例如,一个 Socket 传输一旦连贯到了近程节点便是流动的,而一个 Datagram 传输一旦被关上便是流动的。

4.localAddress:返回本地的 SokcetAddress

5,remoteAddress:返回近程的 SocketAddress

6.write:将数据写到近程节点。这个数据将被传递给 ChannelPipeline,并且排队直到它被冲刷

7.flush:将之前已写的数据冲刷到底层传输,如一个 Socket

8.writeAndFlush:一个简便的办法,等同于调用 write()并接着调用 flush()


3.2 EventLoop 丶 EventLoopGroup

EventLoop 定义了 Netty 的外围形象,用于解决网络连接的生命周期中所产生的事件。EventLoop 充当任务调度丶线程治理丶线程调配的重要对象。

3.2.1 任务调度

查看 EventLoop 的继承关系可看出,EventLoop 具备任务调度,充当线程池的作用, 一个 EventLoop 将由一个永远都不会扭转的 Thread 驱动. 依据配置和可用外围的不同,可能会创立多个 EventLoop 实例用以优化资源的应用,且单个 EventLoop 可指派用于服务多个 Channel(解决多个网络连接)。

Netty 的 EventLoop 在继承了 ScheduledExecutorService, 可调度一个工作以便稍后(提早)执行或者周期性地执行。
比方,想要注册一个在客户端曾经连贯了 5 分钟之后触发的工作。一个常见的做法是,发送心跳音讯到近程节点,查看连贯是否还活着。如果没有响应,你便晓得能够敞开该 Channel 了。


3.2.2 线程治理

在外部,提交工作,如果(以后)调用线程正是撑持 EventLoop 的线程,那么所提交的代码块将会被(间接)执行。否则,EventLoop 将调度该工作以便稍后执行,并将它放入到外部队列中。当 EventLoop 下次解决它的事件时,它会执行队列中的那些工作 / 事件。


3.2.3 线程调配

服务于 Channel 的 I/O 和事件的 EventLoop 则蕴含在 EventLoopGroup 中。

IO 多路复用:在以后的线程模型中,它们可能会被多个 Channel 所共享。这使得能够通过尽可能大量的 Thread 来撑持大量的 Channel,而不是每个 Channel 调配一个 Thread。

调配 EventLoop:EventLoopGroup 负责为每个新创建的 Channel 调配一个 EventLoop。在以后实现中,应用程序循环(round-robin)的形式进行调配以获取一个平衡的散布,并且雷同的 EventLoop 可能会被调配给多个 Channel。

线程平安:一旦一个 Channel 被调配给一个 EventLoop,它将在它的整个生命周期中都应用这个 EventLoop(以及相关联的 Thread)。请牢记这一点,因为它能够使你从担心你的 ChannelHandler 实现中的线程平安和同步问题中解脱进去。

留神:须要留神,EventLoop 对 ThreadLocal 的应用的影响。因为一个 EventLoop 通常会用于撑持多个 Channel.

所以对于所有相关联的 Channel 来说若应用它来实现状态追踪则会有线程平安问题。然而在一些无状态的上下文中,它依然能够被用于在多个 Channel 之间共享一些重度的或者代价低廉的对象,甚至是事件。


3.3 ChannelFuture 接口

Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会立刻返回,所以咱们须要一种用于在将来的某个工夫点确定其后果的办法。因而 Netty 提供了 ChannelFuture 接口。

其 addListener()办法注册了一个 ChannelFutureListener,以便在某个操作实现时(无论是否胜利)失去告诉。


3.4 ChannelHandler 接口

3.4.1 ChannelHandler 次要接口

对于应用程序开发人员的角度来看,Netty 的次要组件是 ChannelHandler,它充当了所有解决入站和出站数据的利用程序逻辑的容器。ChannelHandler 的办法是由网络事件触发的。

对于 netty 来说, 次要流程就是音讯通信从进入入站处理器再到进来出站处理器, 即接管音讯再响应音讯。

入站处理器 :举例来说,ChannelInboundHandler 是一个常常实现的子接口。这种类型的 ChannelHandler 接支出站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所解决。

次要办法如下:

  • channelRegistered 当 Channel 曾经注册到它的 EventLoop 并且可能解决 I/O 时被调用
  • channelUnregistered 当 Channel 从它的 EventLoop 登记并且无奈解决任何 I/O 时被调用
  • channelActive 当 Channel 处于活动状态时被调用;Channel 曾经连贯 / 绑定并且曾经就绪
  • channelInactive 当 Channel 来到活动状态并且不再连贯它的近程节点时被调用
  • channelReadComplete 当 Channel 上的一个读操作实现时被调用
  • channelRead 当从 Channel 读取数据时被调用

出站处理器 :当你要给连贯的客户端发送响应时,也能够从 ChannelInboundHandler 间接冲刷数据而后输入到对端, 即调用 writeAndFlush 或 flush 时则会通过出站处理器(常实现channelOutbountHandler 子接口)

次要办法如下:

  • bind(ChannelHandlerContext,SocketAddress,ChannelPromise)当申请将 Channel 绑定到本地地址时被调用
  • connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)当申请将 Channel 连贯到近程节点时被调用
  • disconnect(ChannelHandlerContext,ChannelPromise)当申请将 Channel 从近程节点断开时被调用
  • close(ChannelHandlerContext,ChannelPromise) 当申请敞开 Channel 时被调用
  • deregister(ChannelHandlerContext,ChannelPromise)当申请将 Channel 从它的 EventLoop 登记时被调用
  • read(ChannelHandlerContext) 当申请从 Channel 读取更多的数据时被调用
  • flush(ChannelHandlerContext) 当申请通过 Channel 将入队数据冲刷到近程节点时被调用
  • write(ChannelHandlerContext,Object,ChannelPromise) 当申请通过 Channel 将数据写到近程节点时被调用。

ChannelHandler 的适配器有一些适配器类应用于自定义处理器,因为它们提供了定义在对应接口中的所有办法的默认实现。因为你有时会疏忽那些不感兴趣的事件,所以 Netty 提供了形象基类 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。


3.4.2 ChannelHandler 生命周期

接口 ChannelHandler 定义的生命周期操作,在 ChannelHandler 被增加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用这些操作。这些办法中的每一个都承受一个 ChannelHandlerContext 参数。

  • handlerAdded: 当把 ChannelHandler 增加到 ChannelPipeline 中时被调用
  • handlerRemoved: 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
  • exceptionCaught: 当处理过程中在 ChannelPipeline 中有谬误产生时被调用

3.5 ChannelPipeline 接口

与 channel 绑定: 当 Channel 被创立时,它将会被主动地调配一个新的 ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个 ChannelPipeline,也不能拆散其以后的。

ChannelHandler 容器: ChannelHandler 在 ChannelPipeline 里工作,执行程序是由它们被增加的程序来决定的, 它们是在应用程序的初始化或者疏导阶段被装置的。这些对象接管事件、执行它们所实现的解决逻辑,并将数据传递给链中的下一个 ChannelHandler。

入站静止:如果一个音讯或者任何其余的入站事件被读取,那么它会从 ChannelPipeline 的头部开始流动,最终,数据将会达到 ChannelPipeline 的尾端,届时,所有解决就都完结了。

出站静止:数据的出站静止(即正在被写的数据)在概念上也是一样的。在这种状况下,数据将从 ChannelOutboundHandler 链的尾端开始流动,直到它达到链的头部为止。在这之后,出站数据将会达到网络传输层,这里显示为 Socket。通常状况下,这将触发一个写操作。

程序性:当两个类别的 ChannelHandler 都混合增加到同一个 ChannelPipeline, 尽管 ChannelInboundHandle 和 ChannelOutboundHandle 都扩大自 ChannelHandler,然而 Netty 能辨别 ChannelInboundHandler 实现和 ChannelOutboundHandler 实现,并确保数据只会在具备雷同定向类型的两个 ChannelHandler 之间传递。

次要办法如下:

  • addFirst、addBefore、addAfter、addLast 将一个 ChannelHandler 增加到 ChannelPipeline 中
  • remove 将一个 ChannelHandler 从 ChannelPipeline 中移除
  • replace 将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler
  • get 通过类型或者名称返回 ChannelHandler
  • context 返回和 ChannelHandler 绑定的 ChannelHandlerContext
  • names 返回 ChannelPipeline 中所有 ChannelHandler 的名称

4. 第一个繁难 netty 程序

4.1 服务端

public class EchoServer  {

    private final int port;

    public EchoServer(int port) {this.port = port;}

    public static void main(String[] args) throws InterruptedException {
        int port = 9999;
        EchoServer echoServer = new EchoServer(port);
        System.out.println("服务器行将启动");
        echoServer.start();
        System.out.println("服务器敞开");
    }

    public void start() throws InterruptedException {final EchoServerHandler serverHandler = new EchoServerHandler();
        /* 线程组 */
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            /* 服务端启动必须 */
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)/* 将线程组传入 */
                    .channel(NioServerSocketChannel.class)/* 指定应用 NIO 进行网络传输 */
                    .localAddress(new InetSocketAddress(port))/* 指定服务器监听端口 */
                    /* 服务端每接管到一个连贯申请,就会新启一个 socket 通信,也就是 channel,所以上面这段代码的作用就是为这个子 channel 减少 handle*/
                    .childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {
                            /* 增加到该子 channel 的 pipeline 的尾部 */
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            ChannelFuture f = b.bind().sync();/* 异步绑定到服务器,sync()会阻塞直到实现 */
            f.channel().closeFuture().sync();/* 阻塞直到服务器的 channel 敞开 */} finally {group.shutdownGracefully().sync();/* 优雅敞开线程组 */}

    }


}
@ChannelHandler.Sharable
/* 不加这个注解那么在减少到 childHandler 时就必须 new 进去 */
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /* 客户端读到数据当前,就会执行 */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf)msg;
        System.out.println("Server accept"+in.toString(CharsetUtil.UTF_8));
        ctx.write(in);

    }

    /*** 服务端读取实现网络数据后的解决 */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    /*** 产生异样后的解决 */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {cause.printStackTrace();
        ctx.close();}
}

4.2 客户端

public class EchoClient {

    private final int port;
    private final String host;

    public EchoClient(int port, String host) {
        this.port = port;
        this.host = host;
    }

    public void start() throws InterruptedException {
        /* 线程组 */
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            /* 客户端启动必备 */
            Bootstrap b = new Bootstrap();
            b.group(group)/* 把线程组传入 */
                    /* 指定应用 NIO 进行网络传输 */
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host,port))
                    .handler(new EchoClientHandle());
            /* 连贯到近程节点,阻塞直到连贯实现 */
            ChannelFuture f = b.connect().sync();
            /* 阻塞程序,直到 Channel 产生了敞开 */
            f.channel().closeFuture().sync();}finally {group.shutdownGracefully().sync();}

    }

    public static void main(String[] args) throws InterruptedException {new EchoClient(9999,"127.0.0.1").start();}
}
public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {

    /* 客户端读到数据当前,就会执行 */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
            throws Exception {System.out.println("client acccept:"+msg.toString(CharsetUtil.UTF_8));
    }

    /* 连贯建设当前 */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty",CharsetUtil.UTF_8));
        //ctx.fireChannelActive();}

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {cause.printStackTrace();

        ctx.close();}
}

一个繁难的 netty 程序就实现了,实现了通信性能。

退出移动版