一、引言

❀ 家喻户晓

Netty 是一款基于 NIO 客户、服务器端的 Java 开源编程框架,提供异步的、事件驱动的网络应用程序框架和工具,用以疾速开发高性能、高可靠性的网络服务器和客户端程序。

❀ 艰深来讲

Netty 一个十分好用的解决 Socket 的 Jar 包,能够用它来开发服务器和客户端。

二、为什么要学习 Netty

Netty 作为一个优良的网络通信框架,许多开源我的项目都应用它来构建通信层。比方 Hadoop、Cassandra、Spark、Dubbo、gRPC、RocketMQ、Zookeeper甚至咱们罕用的 Spring 等等。

更重要的是,Netty 是开发高性能 Java 服务器的必学框架。

能够说作为一个 Java 工程师,要理解 Java 服务器的高阶常识,Netty 是一个必须要学习的货色。

三、Netty 的个性

1、设计
  • 为不同的传输类型(阻塞和非阻塞)提供对立的 API
  • 基于灵便且可扩大的事件模型,可将关注点明确拆散
  • 高度可定制的线程模型:单线程、一个或多个线程池
  • 牢靠的无连贯数据 Socket 反对(UDP)
2、易用
  • 欠缺的 JavaDoc ,用户指南和样例
  • 无需额定依赖,JDK 5 (Netty 3.x) 、JDK 6 (Netty 4.x)
3、性能
  • 更高的吞吐量,更低的提早
  • 更省资源
  • 缩小不必要的内存拷贝
4、平安
  • 残缺的 SSL/TLS 和 STARTTLS 的反对
5、社区
  • 沉闷的社区和泛滥的开源贡献者

四、初识 Netty

Talk is cheap, show me the code!
1、抛弃服务器

接下来从代码中感受一下 Netty,首先实现一个 discard(抛弃)服务器,即对收到的数据不做任何解决。

  • 实现 ChannelInBoundHandlerAdapter 首先咱们从 handler 的实现开始, Netty 应用 handler 来解决 I/O 事件。

    public class DiscardServerHandler extends ChannelInboundHandlerAdapter {   @Override  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {       // 抛弃收到的数据      ((ByteBuf) msg).release();  }  @Override  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {       cause.printStackTrace();      ctx.close();    }}
    • 1 行,DiscardServerHandler 继承自 ChannelInboundHandlerAdapter,这个类实现了 ChannelInboundHandler接口,ChannelInboundHandler 提供了许多事件处理的接口办法。
    • 4 行,当收到新的音讯时,就会调用 chanelRead() 办法。
    • 6 行,ByteBuf 是一个援用计数对象,这个对象必须显式地调用 release() 办法来开释。处理器的职责是开释所有传递到处理器的援用计数对象,上面是比拟常见的 chanelRead() 办法实现:

      @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {  try {      // Do something with msg  } finally {      ReferenceCountUtil.release(msg);  }}
    • 10 行,exceptionCaught() 办法是在处理事件时产生异样调用的办法。
  • 启动 Handler 实现 handler 后,咱们须要一个 main() 办法来启动它。

    public class DiscardServer {  private int port;  public DiscardServer(int port) {      this.port = port;  }  public void run() throws Exception {      // 接管进来的连贯      EventLoopGroup boss = new NioEventLoopGroup();      // 解决曾经接管的连贯      EventLoopGroup worker = new NioEventLoopGroup();      try {          ServerBootstrap bootstrap = new ServerBootstrap();          bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {              @Override              protected void initChannel(SocketChannel socketChannel) throws Exception {                  // 增加自定义的 handler                  socketChannel.pipeline().addLast(new DiscardServerHandler());              }          }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);          // 绑定端口,开始接管进来的连贯          ChannelFuture channelFuture = bootstrap.bind(port).sync();          // 敞开          channelFuture.channel().closeFuture().sync();      } finally {          boss.shutdownGracefully();          worker.shutdownGracefully();      }  }  public static void main(String[] args) throws Exception {      int port = 8080;      new DiscardServer(port).run();  }}
    • 11 行,EventLoopGroup 是用来解决 I/O 操作的多线程事件循环器,Netty 提供了许多不同的 EventLoopGroup 的实现用来解决不同的传输。在本例咱们实现了一个服务端利用,因而须要两个 EventLoopGroup 。第一个用来接管进来的连贯,常被称作 boss ;第二个用来解决曾经接管的连贯,成为 worker。一旦 boss 接管到一个新进来的连贯,就会把连贯的信息注册到 worker 下面。
    • 15 行,ServerBootstrap 是一个启动 NIO 服务的辅助启动类。
    • 16 行,指定 NioServerSocketChannel 用来阐明一个新的 Channel 如何接管进来的连贯。
    • 20 行, ChannelInitializer 用来帮忙使用者创立一个新的 channel ,同时能够应用 pipline 指定一些特定的处理器。
    • 22 行,通过这两个办法能够指定新配置的 channel 的一些参数配置。
  • 查看接管到的数据 如此,一个基于 Netty 的服务端程序就实现了,然而当初启动起来咱们看不到任何交互,所以咱们略微批改一下 DiscardServerHandler 类的 channelRead() 办法,能够查看到客户端发来的音讯。

    @Override  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      ByteBuf byteBuf = (ByteBuf) msg;      try {          while (byteBuf.isReadable()) {              System.out.print((char) byteBuf.readByte());              System.out.flush();          }      } finally {          ReferenceCountUtil.release(msg);      }  }
  • 测试 接下来咱们启动 DiscardServer ,应用 telnet 来测试一下。

    控制台接管到了命令行发来的音讯:

    • *
2、应答服务器

咱们曾经实现了服务器能够接管客户端发来的音讯,通常服务器会对客户端发来的申请作出回应,上面就通过 ECHO 协定来实现对客户端的音讯响应。

ECHO 协定即会把客户端发来的数据原样返回,所以也戏称“乒乓球”协定。

在上述代码的根底下面,咱们只需对 DiscardServerHandler 类的 channelRead() 办法稍加批改:

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ctx.write(msg);        ctx.flush();}
  • ChannelHandlerContext 对象提供了许多操作,使你可能触发各种各样的 I/O 事件和操作。这里咱们调用了 write(Object) 办法来逐字地把承受到的音讯写入。请留神不同于 DISCARD 的例子咱们并没有开释承受到的音讯,这是因为当写入的时候 Netty 曾经帮咱们开释了。
  • ctx.write(Object) 办法不会使音讯写入到通道上,他被缓冲在了外部,你须要调用 ctx.flush() 办法来把缓冲区中数据强行输入。或者能够用更简洁的 cxt.writeAndFlush(msg) 以达到同样的目标。

再次运行 telnet 命令,就会承受到你发送的信息。


3、工夫服务器

接下来咱们基于 TIME 协定,实现构建和发送一个音讯,而后在实现时敞开连贯。和之前的例子不同的是在不承受任何申请时会发送一个含 32 位的整数的音讯,并且一旦音讯发送就会立刻敞开连贯。

TIME 协定能够提供机器可读的日期工夫信息。

咱们会在连贯创立时发送工夫音讯,所以须要笼罩 channelActive() 办法:

public class TimeServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        // 调配空间        final ByteBuf time = ctx.alloc().buffer(4);        // 获取 32 位工夫戳并写入        time.writeInt((int) (System.currentTimeMillis() / 1000L));        final ChannelFuture future = ctx.writeAndFlush(time);        // 增加监听器        future.addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture channelFuture) throws Exception {                assert future == channelFuture;                // 敞开连贯                ctx.close();            }        });    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}
  • 4 行,channelActive() 办法将会在连贯被建设并且筹备进行通信时被调用。
  • 6 行,同 Java 的 NIO 相似,为了构建一个音讯,须要为缓冲区调配空间。因为要发送一个 32 位的工夫戳,所以至多 4 字节。
  • 8 行,音讯构建结束后,执行写入。回忆应用 Java NIO 的 Buffer 时,在读写操作之间,须要调用 buffer.flip( ) 办法设置指针地位。然而在在 Netty 中不须要这样操作,起因是 Netty 提供了两个指针,一个读指针和一个写指针,在读写时两者不相互影响。再也不必放心遗记调用 flip( ) 办法时数据为空或者数据谬误啦。
  • 11 行,在第 9 行执行完 ctx.writeAndFlush(time) 后会返回一个 ChannelFuture 对象,代表着还没有产生的一次 I/O 操作。这意味着任何一个申请操作都不会马上被执行,因为在 Netty 里所有的操作都是异步的。这样来看,咱们想实现音讯发送后敞开连贯,间接在后边调用 ctx.close( ) 可能不能立即敞开连贯。返回的 ChannelFuture 对象在操作实现后会告诉它的监听器,继续执行操作实现后的动作。
    • *
4、工夫客户端

对于工夫服务端不能间接用 telnet 的形式测试,因为不能靠人工把一个 32 位的二进制数据翻译成工夫,所以上面将实现一个工夫客户端。

与服务端的实现惟一不同的就是应用了不同的 Bootstrap 和 Channel 实现:

public class TimeClient {    private String host;    private int port;    public TimeClient(String host, int port) {        this.host = host;        this.port = port;    }    public void run() throws Exception{        EventLoopGroup worker = new NioEventLoopGroup();        try {            Bootstrap bootstrap = new Bootstrap();            bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {                @Override                protected void initChannel(NioSocketChannel ch) throws Exception {                    ch.pipeline().addLast(new TimeClientHandler());                }            }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);            // 启动            ChannelFuture future = bootstrap.connect(host, port).sync();            // 期待连贯敞开            future.channel().closeFuture().sync();        } finally {            worker.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception {        TimeClient timeClient = new TimeClient("localhost", 8080);        timeClient.run();    }}
  • 13 行,比照 server 端只指定了一个 EventLoopGroup ,它即会作为 boss group 也会作为 worker group,只管客户端不须要应用到 boss group。
  • 15 行,Bootstrap 和 ServerBootstrap 相似,Bootstrap 面向于服务端的 channel ,比方客户端和无连贯传输模式的 channel。

再略微改变一下 handler :

public class TimeClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    // 在 TCP/IP 中,Netty 会把读到的数据放入 ByteBuf 中        ByteBuf byteBuf = (ByteBuf) msg;        try {            long time = byteBuf.readUnsignedInt() * 1000L;            System.out.println(new Date(time));            ctx.close();        }finally {            ReferenceCountUtil.release(msg);        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}

别离启动 TimeServer 和 TimeClient ,控制台打印出了以后工夫:

然而,屡次运行后处理器有时候会因为抛出 IndexOutOfBoundsException 而回绝工作。带着这个问题,持续往下面看。

5、解决基于流的传输

比拟典型的基于流传输的 TCP/IP 协定,也就是说,应用层两个不同的数据包,在 TCP/IP 协定传输时,可能会组合或者拆分应用层协定的数据。因为两个数据包之间并无边界辨别,可能导致音讯的读取谬误。

很多材料也称上述这种景象为 TCP 粘包,而值得注意的是:

1、TCP 协定自身设计就是面向流的,提供牢靠传输。 2、正因为面向流,对于应用层的数据包而言,没有边界辨别。这就须要应用层被动解决不同数据包之间的组装。 3、产生粘包景象不是 TCP 的缺点,只是应用层没有被动做数据包的解决。

回到下面程序,这也就是上述异样产生的起因。一个 32 位整型是十分小的数据,它并不见得会被常常拆分到到不同的数据段内。然而,问题是它的确可能会被拆分到不同的数据段内。

比拟常见的两种解决方案就是基于长度或者基于终结符,持续以下面的 TIME 协定程序为根底,着手解决这个问题。因为只发送一个 32 位的整形工夫戳,咱们采纳基于数据长度的形式:

❀ 解决方案一

最简略的计划是结构一个外部的可积攒的缓冲,直到4个字节全副接管到了外部缓冲。批改一下 TimeClientHandler 的代码:

public class TimeClientHandler extends ChannelInboundHandlerAdapter {    private ByteBuf buf;    private static final int CAPACITY = 4;    @Override    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        buf = ctx.alloc().buffer(CAPACITY);    }    @Override    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {        buf.release();        buf = null;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf byteBuf = (ByteBuf) msg;        buf.writeBytes(byteBuf);        byteBuf.release();        // 数据大于或等于 4 字节        if (buf.readableBytes() >= CAPACITY) {            long time = buf.readUnsignedInt() * 1000L;            System.out.println(new Date(time));            ctx.close();        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}

其中笼罩了 handler 生命周期的两个办法:

  • 8 行,handlerAdded():当检测到新的连贯之后,调用ch.pipeline().addLast(new LifeCycleTestHandler())之后的回调,示意以后的channel中曾经胜利增加了一个逻辑处理器
  • 13 行,handlerRemoved():在连贯敞开后把这条连贯上的所有逻辑处理器全副移除掉。
❀ 解决方案二

只管上述计划曾经解决了 TIME 客户端的问题了,然而在处理器中减少了逻辑,咱们能够把解决音讯的局部抽取进去,成为一个独自的处理器,并且能够减少多个 ChannelHandler 到 ChannelPipline ,每个处理器各司其职,缩小模块的复杂度。

由此,拆分出一个 TimeDecoder 用于解决音讯:

public class TimeDecoder extends ByteToMessageDecoder {    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {        if (in.readableBytes() >= 4) {            out.add(in.readBytes(4));        }    }}
  • ByteToMessageDecoder 继承自 ChannelInboundHandlerAdapter ,每当有新数据接管的时候,ByteToMessageDecoder 都会调用 decode() 办法来解决外部的那个累积缓冲。
  • 如果在 decode() 办法里减少了一个对象到 out 对象里,这意味着解码器解码音讯胜利。ByteToMessageDecoder 将会抛弃在累积缓冲里曾经被读过的数据。

最初,批改 TimeClient 的代码,将 TimeDecoder 退出 ChannelPipline :

bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {                @Override                protected void initChannel(NioSocketChannel ch) throws Exception {                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());                }            }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);

除此之外,Netty还提供了更多开箱即用的解码器使你能够更简略地实现更多的协定,帮忙你防止开发一个难以保护的处理器实现,感兴趣的小伙伴能够自行理解。

6、将音讯解码为自定义对象

上述的例子咱们始终在应用 ByteBuf 作为协定音讯的次要数据结构,然而理论应用中,须要传输的音讯更加简单,形象为对象来解决更加不便。持续以 TIME 客户端和服务器为根底,应用自定义的对象代替 ByteBuf 。

  • 定义保留工夫的对象 OurTime :

    public class OurTime {  private final long value;  public OurTime() {      this(System.currentTimeMillis() / 1000L);  }  public OurTime(long value) {      this.value = value;  }  public long value() {      return value;  }  @Override  public String toString() {      return new Date(value() * 1000L).toString();  }}
  • 批改 TimeDecoder 类,返回 OurTime 类:

    public class TimeDecoder extends ByteToMessageDecoder {  @Override  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {      if (in.readableBytes() >= 4) {          out.add(new OurTime(in.readUnsignedInt()));      }  }}
  • 批改后的 TimeClientHandler 类,解决新音讯更加简洁:

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {  @Override  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      OurTime ourTime = (OurTime) msg;      System.out.println(ourTime);      ctx.close();  }  @Override  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {      cause.printStackTrace();      ctx.close();  }}
    • *

而对于服务端来说,大同小异。

批改 TimeServerHandler 的代码:

@Overridepublic void channelActive(ChannelHandlerContext ctx) {    ChannelFuture f = ctx.writeAndFlush(new UnixTime());    f.addListener(ChannelFutureListener.CLOSE); }

当初,惟一短少的性能是一个编码器,是ChannelOutboundHandler的实现,用来将 OurTime 对象从新转化为一个 ByteBuf。这是比编写一个解码器简略得多,因为没有须要解决的数据包编码音讯时拆分和组装。

public class TimeEncoder extends ChannelOutboundHandlerAdapter {    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {        UnixTime m = (OurTime) msg;        ByteBuf encoded = ctx.alloc().buffer(4);        encoded.writeInt((int)m.value());        ctx.write(encoded, promise); // (1)    }}

在这几行代码里还有几个重要的事件。第一,通过 ChannelPromise,当编码后的数据被写到了通道上 Netty 能够通过这个对象标记是胜利还是失败。第二, 咱们不须要调用 cxt.flush()。因为处理器曾经独自拆散出了一个办法 void flush(ChannelHandlerContext cxt),如果像本人实现 flush() 办法内容能够自行笼罩这个办法。

进一步简化操作,你能够应用 MessageToByteEncode:

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {        @Override        protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {            out.writeInt((int)msg.value());        }    }

最初在 TimeServerHandler 之前把 TimeEncoder 插入到ChannelPipeline。

五、总结

置信读完这篇文章的从头至尾,小伙伴们对应用 Netty 编写一个客户端和服务端有了大略的理解。前面咱们将持续探索 Netty 的源码实现,并联合其波及的基础知识进行理解、深刻。

❤ 转载请注明本文地址或起源,谢谢合作 ❤