关于netty:Netty应用入门及重要组件

52次阅读

共计 8474 个字符,预计需要花费 22 分钟才能阅读完成。

1.Netty 简介

Netty 是由 JBOSS 提供的一个 java 开源框架。
Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以疾速开发高性能、高可靠性的网络服务器和客户端程序。


2. 为什么应用 Netty

尽管 JAVA NIO 框架提供了多路复用 IO 的反对,然而并没有提供下层“信息格式”的良好封装。例如前两者并没有提供针对 Protocol Buffer、JSON 这些信息格式的封装,然而 Netty 框架提供了这些数据格式封装(基于责任链模式的编
码和解码性能);

2、NIO 的类库和 API 相当简单,应用它来开发,须要十分熟练地把握 Selector、ByteBuffer、ServerSocketChannel、SocketChannel 等,须要很多额定的编程技能来辅助应用 NIO, 例如,因为 NIO 波及了 Reactor 线程模型,所以必须必须对多线程和网络编程十分相熟能力写出高质量的 NIO 程序

3、要编写一个牢靠的、易保护的、高性能的 NIO 服务器利用。除了框架自身要兼容实现各类操作系统的实现外。更重要的是它应该还要解决很多下层特有服务,例如:客户端的权限、还有下面提到的信息格式封装、简略的数据读取,断连重连,半包读写,心跳等等,这些 Netty 框架都提供了响应的反对。

4、JAVA NIO 框架存在一个 poll/epoll bug:Selector doesn’t block on Selector.select(timeout),不能 block 意味着 CPU 的使用率会变成 100%(这是底层 JNI 的问题,下层要解决这个异样实际上也好办)。当然这个 bug 只有在 Linux 内核上能力重现


3.Netty 重要组件

3.1 Channel 接口

在 Java 的网络编程中,其根本的结构是类 Socket。Netty 的 Channel 接口所提供的 API,被用于所有的 I/O 操作。大大地升高了间接应用 Socket 类的复杂性。此外,Channel 也是领有许多预约义的、专门化实现的宽泛类层次结构的根。


channel 生命周期

  • ChannelUnregistered:Channel 曾经被创立,但还未注册 EventLoop
  • ChannelRegistered:Channel 曾经被注册到了 EventLoop
  • ChannelActive:Channel 处于活动状态(曾经连贯到它的近程节点)。它当初能够接管和发送数据了
  • ChannelInactive:Channel 没有连贯到近程节点

当这些状态产生扭转时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其能够随后对它们做出响应。


channel 重要办法

1.eventLoop:返回调配给 Channel 的 EventLoop

2.pipeline:返回调配给 Channel 的 ChannelPipeline

3.isActive:如果 Channel 是流动的,则返回 true。流动的意义可能依赖于底层的传输。
例如,一个 Socket 传输一旦连贯到了近程节点便是流动的,而一个 Datagram 传输一旦被关上便是流动的。

4.localAddress:返回本地的 SokcetAddress

5,remoteAddress:返回近程的 SocketAddress

6.write:将数据写到近程节点。这个数据将被传递给 ChannelPipeline,并且排队直到它被冲刷

7.flush:将之前已写的数据冲刷到底层传输,如一个 Socket

8.writeAndFlush:一个简便的办法,等同于调用 write()并接着调用 flush()


3.2 EventLoop 丶 EventLoopGroup

EventLoop 定义了 Netty 的外围形象,用于解决网络连接的生命周期中所产生的事件。EventLoop 充当任务调度丶线程治理丶线程调配的重要对象。

3.2.1 任务调度

查看 EventLoop 的继承关系可看出,EventLoop 具备任务调度,充当线程池的作用, 一个 EventLoop 将由一个永远都不会扭转的 Thread 驱动. 依据配置和可用外围的不同,可能会创立多个 EventLoop 实例用以优化资源的应用,且单个 EventLoop 可指派用于服务多个 Channel(解决多个网络连接)。

Netty 的 EventLoop 在继承了 ScheduledExecutorService, 可调度一个工作以便稍后(提早)执行或者周期性地执行。
比方,想要注册一个在客户端曾经连贯了 5 分钟之后触发的工作。一个常见的做法是,发送心跳音讯到近程节点,查看连贯是否还活着。如果没有响应,你便晓得能够敞开该 Channel 了。


3.2.2 线程治理

在外部,提交工作,如果(以后)调用线程正是撑持 EventLoop 的线程,那么所提交的代码块将会被(间接)执行。否则,EventLoop 将调度该工作以便稍后执行,并将它放入到外部队列中。当 EventLoop 下次解决它的事件时,它会执行队列中的那些工作 / 事件。


3.2.3 线程调配

服务于 Channel 的 I/O 和事件的 EventLoop 则蕴含在 EventLoopGroup 中。

IO 多路复用:在以后的线程模型中,它们可能会被多个 Channel 所共享。这使得能够通过尽可能大量的 Thread 来撑持大量的 Channel,而不是每个 Channel 调配一个 Thread。

调配 EventLoop:EventLoopGroup 负责为每个新创建的 Channel 调配一个 EventLoop。在以后实现中,应用程序循环(round-robin)的形式进行调配以获取一个平衡的散布,并且雷同的 EventLoop 可能会被调配给多个 Channel。

线程平安:一旦一个 Channel 被调配给一个 EventLoop,它将在它的整个生命周期中都应用这个 EventLoop(以及相关联的 Thread)。请牢记这一点,因为它能够使你从担心你的 ChannelHandler 实现中的线程平安和同步问题中解脱进去。

留神:须要留神,EventLoop 对 ThreadLocal 的应用的影响。因为一个 EventLoop 通常会用于撑持多个 Channel.

所以对于所有相关联的 Channel 来说若应用它来实现状态追踪则会有线程平安问题。然而在一些无状态的上下文中,它依然能够被用于在多个 Channel 之间共享一些重度的或者代价低廉的对象,甚至是事件。


3.3 ChannelFuture 接口

Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会立刻返回,所以咱们须要一种用于在将来的某个工夫点确定其后果的办法。因而 Netty 提供了 ChannelFuture 接口。

其 addListener()办法注册了一个 ChannelFutureListener,以便在某个操作实现时(无论是否胜利)失去告诉。


3.4 ChannelHandler 接口

3.4.1 ChannelHandler 次要接口

对于应用程序开发人员的角度来看,Netty 的次要组件是 ChannelHandler,它充当了所有解决入站和出站数据的利用程序逻辑的容器。ChannelHandler 的办法是由网络事件触发的。

对于 netty 来说, 次要流程就是音讯通信从进入入站处理器再到进来出站处理器, 即接管音讯再响应音讯。

入站处理器 :举例来说,ChannelInboundHandler 是一个常常实现的子接口。这种类型的 ChannelHandler 接支出站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所解决。

次要办法如下:

  • channelRegistered 当 Channel 曾经注册到它的 EventLoop 并且可能解决 I/O 时被调用
  • channelUnregistered 当 Channel 从它的 EventLoop 登记并且无奈解决任何 I/O 时被调用
  • channelActive 当 Channel 处于活动状态时被调用;Channel 曾经连贯 / 绑定并且曾经就绪
  • channelInactive 当 Channel 来到活动状态并且不再连贯它的近程节点时被调用
  • channelReadComplete 当 Channel 上的一个读操作实现时被调用
  • channelRead 当从 Channel 读取数据时被调用

出站处理器 :当你要给连贯的客户端发送响应时,也能够从 ChannelInboundHandler 间接冲刷数据而后输入到对端, 即调用 writeAndFlush 或 flush 时则会通过出站处理器(常实现channelOutbountHandler 子接口)

次要办法如下:

  • bind(ChannelHandlerContext,SocketAddress,ChannelPromise)当申请将 Channel 绑定到本地地址时被调用
  • connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)当申请将 Channel 连贯到近程节点时被调用
  • disconnect(ChannelHandlerContext,ChannelPromise)当申请将 Channel 从近程节点断开时被调用
  • close(ChannelHandlerContext,ChannelPromise) 当申请敞开 Channel 时被调用
  • deregister(ChannelHandlerContext,ChannelPromise)当申请将 Channel 从它的 EventLoop 登记时被调用
  • read(ChannelHandlerContext) 当申请从 Channel 读取更多的数据时被调用
  • flush(ChannelHandlerContext) 当申请通过 Channel 将入队数据冲刷到近程节点时被调用
  • write(ChannelHandlerContext,Object,ChannelPromise) 当申请通过 Channel 将数据写到近程节点时被调用。

ChannelHandler 的适配器有一些适配器类应用于自定义处理器,因为它们提供了定义在对应接口中的所有办法的默认实现。因为你有时会疏忽那些不感兴趣的事件,所以 Netty 提供了形象基类 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。


3.4.2 ChannelHandler 生命周期

接口 ChannelHandler 定义的生命周期操作,在 ChannelHandler 被增加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用这些操作。这些办法中的每一个都承受一个 ChannelHandlerContext 参数。

  • handlerAdded: 当把 ChannelHandler 增加到 ChannelPipeline 中时被调用
  • handlerRemoved: 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
  • exceptionCaught: 当处理过程中在 ChannelPipeline 中有谬误产生时被调用

3.5 ChannelPipeline 接口

与 channel 绑定: 当 Channel 被创立时,它将会被主动地调配一个新的 ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个 ChannelPipeline,也不能拆散其以后的。

ChannelHandler 容器: ChannelHandler 在 ChannelPipeline 里工作,执行程序是由它们被增加的程序来决定的, 它们是在应用程序的初始化或者疏导阶段被装置的。这些对象接管事件、执行它们所实现的解决逻辑,并将数据传递给链中的下一个 ChannelHandler。

入站静止:如果一个音讯或者任何其余的入站事件被读取,那么它会从 ChannelPipeline 的头部开始流动,最终,数据将会达到 ChannelPipeline 的尾端,届时,所有解决就都完结了。

出站静止:数据的出站静止(即正在被写的数据)在概念上也是一样的。在这种状况下,数据将从 ChannelOutboundHandler 链的尾端开始流动,直到它达到链的头部为止。在这之后,出站数据将会达到网络传输层,这里显示为 Socket。通常状况下,这将触发一个写操作。

程序性:当两个类别的 ChannelHandler 都混合增加到同一个 ChannelPipeline, 尽管 ChannelInboundHandle 和 ChannelOutboundHandle 都扩大自 ChannelHandler,然而 Netty 能辨别 ChannelInboundHandler 实现和 ChannelOutboundHandler 实现,并确保数据只会在具备雷同定向类型的两个 ChannelHandler 之间传递。

次要办法如下:

  • addFirst、addBefore、addAfter、addLast 将一个 ChannelHandler 增加到 ChannelPipeline 中
  • remove 将一个 ChannelHandler 从 ChannelPipeline 中移除
  • replace 将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler
  • get 通过类型或者名称返回 ChannelHandler
  • context 返回和 ChannelHandler 绑定的 ChannelHandlerContext
  • names 返回 ChannelPipeline 中所有 ChannelHandler 的名称

4. 第一个繁难 netty 程序

4.1 服务端

public class EchoServer  {

    private final int port;

    public EchoServer(int port) {this.port = port;}

    public static void main(String[] args) throws InterruptedException {
        int port = 9999;
        EchoServer echoServer = new EchoServer(port);
        System.out.println("服务器行将启动");
        echoServer.start();
        System.out.println("服务器敞开");
    }

    public void start() throws InterruptedException {final EchoServerHandler serverHandler = new EchoServerHandler();
        /* 线程组 */
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            /* 服务端启动必须 */
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)/* 将线程组传入 */
                    .channel(NioServerSocketChannel.class)/* 指定应用 NIO 进行网络传输 */
                    .localAddress(new InetSocketAddress(port))/* 指定服务器监听端口 */
                    /* 服务端每接管到一个连贯申请,就会新启一个 socket 通信,也就是 channel,所以上面这段代码的作用就是为这个子 channel 减少 handle*/
                    .childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {
                            /* 增加到该子 channel 的 pipeline 的尾部 */
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            ChannelFuture f = b.bind().sync();/* 异步绑定到服务器,sync()会阻塞直到实现 */
            f.channel().closeFuture().sync();/* 阻塞直到服务器的 channel 敞开 */} finally {group.shutdownGracefully().sync();/* 优雅敞开线程组 */}

    }


}
@ChannelHandler.Sharable
/* 不加这个注解那么在减少到 childHandler 时就必须 new 进去 */
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /* 客户端读到数据当前,就会执行 */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf)msg;
        System.out.println("Server accept"+in.toString(CharsetUtil.UTF_8));
        ctx.write(in);

    }

    /*** 服务端读取实现网络数据后的解决 */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    /*** 产生异样后的解决 */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {cause.printStackTrace();
        ctx.close();}
}

4.2 客户端

public class EchoClient {

    private final int port;
    private final String host;

    public EchoClient(int port, String host) {
        this.port = port;
        this.host = host;
    }

    public void start() throws InterruptedException {
        /* 线程组 */
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            /* 客户端启动必备 */
            Bootstrap b = new Bootstrap();
            b.group(group)/* 把线程组传入 */
                    /* 指定应用 NIO 进行网络传输 */
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host,port))
                    .handler(new EchoClientHandle());
            /* 连贯到近程节点,阻塞直到连贯实现 */
            ChannelFuture f = b.connect().sync();
            /* 阻塞程序,直到 Channel 产生了敞开 */
            f.channel().closeFuture().sync();}finally {group.shutdownGracefully().sync();}

    }

    public static void main(String[] args) throws InterruptedException {new EchoClient(9999,"127.0.0.1").start();}
}
public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {

    /* 客户端读到数据当前,就会执行 */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
            throws Exception {System.out.println("client acccept:"+msg.toString(CharsetUtil.UTF_8));
    }

    /* 连贯建设当前 */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty",CharsetUtil.UTF_8));
        //ctx.fireChannelActive();}

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {cause.printStackTrace();

        ctx.close();}
}

一个繁难的 netty 程序就实现了,实现了通信性能。

正文完
 0