1.Netty简介
Netty是由JBOSS提供的一个java开源框架。
Netty提供异步的、事件驱动的网络应用程序框架和工具,用以疾速开发高性能、高可靠性的网络服务器和客户端程序。
2.为什么应用Netty
尽管 JAVA NIO 框架提供了多路复用 IO 的反对,然而并没有提供下层“信息格式”的良好封装。例如前两者并没有提供针对 Protocol Buffer、JSON 这些信息格式的封装,然而 Netty 框架提供了这些数据格式封装(基于责任链模式的编
码和解码性能);
2、NIO 的类库和 API 相当简单,应用它来开发,须要十分熟练地把握 Selector、ByteBuffer、ServerSocketChannel、SocketChannel 等,须要很多额定的编程技能来辅助应用 NIO,例如,因为 NIO 波及了 Reactor 线程模型,所以必须必须对多线程和网络编程十分相熟能力写出高质量的 NIO 程序
3、要编写一个牢靠的、易保护的、高性能的 NIO 服务器利用。除了框架自身要兼容实现各类操作系统的实现外。更重要的是它应该还要解决很多下层特有服务,例如:客户端的权限、还有下面提到的信息格式封装、简略的数据读取,断连重连,半包读写,心跳等等,这些 Netty 框架都提供了响应的反对。
4、JAVA NIO 框架存在一个 poll/epoll bug:Selector doesn’t block on Selector.select(timeout),不能 block 意味着 CPU 的使用率会变成 100%(这是底层 JNI 的问题,下层要解决这个异样实际上也好办)。当然这个 bug 只有在 Linux内核上能力重现
3.Netty重要组件
3.1 Channel接口
在Java 的网络编程中,其根本的结构是类 Socket。Netty 的 Channel 接口所提供的 API,被用于所有的 I/O 操作。大大地升高了间接应用 Socket类的复杂性。此外,Channel也是领有许多预约义的、专门化实现的宽泛类层次结构的根。
channel生命周期
- ChannelUnregistered :Channel 曾经被创立,但还未注册EventLoop
- ChannelRegistered :Channel 曾经被注册到了 EventLoop
- ChannelActive :Channel 处于活动状态(曾经连贯到它的近程节点)。它当初能够接管和发送数据了
- ChannelInactive :Channel 没有连贯到近程节点
当这些状态产生扭转时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline中的 ChannelHandler,其能够随后对它们做出响应。
channel重要办法
1.eventLoop: 返回调配给 Channel 的 EventLoop
2.pipeline: 返回调配给 Channel 的 ChannelPipeline
3.isActive: 如果 Channel 是流动的,则返回 true。流动的意义可能依赖于底层的传输。
例如,一个 Socket 传输一旦连贯到了近程节点便是流动的,而一个 Datagram 传输一旦被关上便是流动的。
4.localAddress: 返回本地的 SokcetAddress
5,remoteAddress: 返回近程的 SocketAddress
6.write: 将数据写到近程节点。这个数据将被传递给 ChannelPipeline,并且排队直到它被冲刷
7.flush: 将之前已写的数据冲刷到底层传输,如一个 Socket
8.writeAndFlush: 一个简便的办法,等同于调用 write()并接着调用 flush()
3.2 EventLoop丶EventLoopGroup
EventLoop 定义了 Netty 的外围形象,用于解决网络连接的生命周期中所产生的事件。EventLoop充当任务调度丶线程治理丶线程调配的重要对象。
3.2.1任务调度
查看EventLoop的继承关系可看出,EventLoop具备任务调度,充当线程池的作用,一个 EventLoop 将由一个永远都不会扭转的 Thread 驱动.依据配置和可用外围的不同,可能会创立多个 EventLoop 实例用以优化资源的应用,且单个 EventLoop可指派用于服务多个 Channel(解决多个网络连接)。
Netty 的 EventLoop 在继承了ScheduledExecutorService,可调度一个工作以便稍后(提早)执行或者周期性地执行。
比方,想要注册一个在客户端曾经连贯了 5 分钟之后触发的工作。一个常见的做法是,发送心跳音讯到近程节点,查看连贯是否还活着。如果没有响应,你便晓得能够敞开该 Channel了。
3.2.2线程治理
在外部,提交工作,如果(以后)调用线程正是撑持 EventLoop 的线程,那么所提交的代码块将会被(间接)执行。否则,EventLoop 将调度该工作以便稍后执行,并将它放入到外部队列中。当 EventLoop 下次解决它的事件时,它会执行队列中的那些工作/事件。
3.2.3线程调配
服务于 Channel 的 I/O 和事件的 EventLoop 则蕴含在 EventLoopGroup 中。
IO多路复用:在以后的线程模型中,它们可能会被多个 Channel 所共享。这使得能够通过尽可能大量的 Thread 来撑持大量的 Channel,而不是每个 Channel 调配一个 Thread。
调配EventLoop:EventLoopGroup 负责为每个新创建的 Channel 调配一个 EventLoop。在以后实现中,应用程序循环(round-robin)的形式进行调配以获取一个平衡的散布,并且雷同的 EventLoop 可能会被调配给多个 Channel。
线程平安:一旦一个 Channel 被调配给一个 EventLoop,它将在它的整个生命周期中都应用这个EventLoop(以及相关联的 Thread)。请牢记这一点,因为它能够使你从担心你的ChannelHandler 实现中的线程平安和同步问题中解脱进去。
留神:须要留神,EventLoop对 ThreadLocal的应用的影响。因为一个 EventLoop通常会用于撑持多个 Channel.
所以对于所有相关联的 Channel 来说若应用它来实现状态追踪则会有线程平安问题。然而在一些无状态的上下文中,它依然能够被用于在多个 Channel 之间共享一些重度的或者代价低廉的对象,甚至是事件。
3.3 ChannelFuture 接口
Netty中所有的 I/O 操作都是异步的。因为一个操作可能不会立刻返回,所以咱们须要一种用于在将来的某个工夫点确定其后果的办法。因而Netty 提供了ChannelFuture 接口。
其 addListener()办法注册了一个 ChannelFutureListener,以便在某个操作实现时(无论是否胜利)失去告诉。
3.4 ChannelHandler 接口
3.4.1 ChannelHandler次要接口
对于应用程序开发人员的角度来看,Netty 的次要组件是 ChannelHandler,它充当了所有解决入站和出站数据的利用程序逻辑的容器。ChannelHandler 的办法是由网络事件触发的。
对于netty来说,次要流程就是音讯通信从进入入站处理器再到进来出站处理器,即接管音讯再响应音讯。
入站处理器:举例来说,ChannelInboundHandler是一个常常实现的子接口。这种类型的ChannelHandler接支出站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所解决。
次要办法如下:
- channelRegistered 当 Channel 曾经注册到它的 EventLoop 并且可能解决 I/O 时被调用
- channelUnregistered 当 Channel 从它的 EventLoop 登记并且无奈解决任何 I/O 时被调用
- channelActive 当 Channel 处于活动状态时被调用;Channel 曾经连贯/绑定并且曾经就绪
- channelInactive 当 Channel 来到活动状态并且不再连贯它的近程节点时被调用
- channelReadComplete 当 Channel 上的一个读操作实现时被调用
- channelRead 当从 Channel 读取数据时被调用
出站处理器:当你要给连贯的客户端发送响应时,也能够从 ChannelInboundHandler 间接冲刷数据而后输入到对端,即调用writeAndFlush或flush时则会通过出站处理器(常实现channelOutbountHandler子接口)
次要办法如下:
- bind(ChannelHandlerContext,SocketAddress,ChannelPromise)当申请将 Channel 绑定到本地地址时被调用
- connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)当申请将 Channel 连贯到近程节点时被调用
- disconnect(ChannelHandlerContext,ChannelPromise)当申请将 Channel 从近程节点断开时被调用
- close(ChannelHandlerContext,ChannelPromise) 当申请敞开 Channel 时被调用
- deregister(ChannelHandlerContext,ChannelPromise)当申请将 Channel 从它的 EventLoop 登记时被调用
- read(ChannelHandlerContext) 当申请从 Channel 读取更多的数据时被调用
- flush(ChannelHandlerContext) 当申请通过 Channel 将入队数据冲刷到近程节点时被调用
- write(ChannelHandlerContext,Object,ChannelPromise) 当申请通过 Channel 将数据写到近程节点时被调用。
ChannelHandler 的适配器有一些适配器类应用于自定义处理器,因为它们提供了定义在对应接口中的所有办法的默认实现。因为你有时会疏忽那些不感兴趣的事件,所以 Netty 提供了形象基类 ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter。
3.4.2 ChannelHandler生命周期
接口 ChannelHandler 定义的生命周期操作,在ChannelHandler被增加到ChannelPipeline中或者被从 ChannelPipeline 中移除时会调用这些操作。这些办法中的每一个都承受一个ChannelHandlerContext 参数。
- handlerAdded: 当把 ChannelHandler 增加到 ChannelPipeline 中时被调用
- handlerRemoved: 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
- exceptionCaught: 当处理过程中在 ChannelPipeline 中有谬误产生时被调用
3.5 ChannelPipeline 接口
与channel绑定:当 Channel 被创立时,它将会被主动地调配一个新的 ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个 ChannelPipeline,也不能拆散其以后的。
ChannelHandler容器: ChannelHandler在ChannelPipeline里工作,执行程序是由它们被增加的程序来决定的,它们是在应用程序的初始化或者疏导阶段被装置的。这些对象接管事件、执行它们所实现的解决逻辑,并将数据传递给链中的下一个 ChannelHandler。
入站静止:如果一个音讯或者任何其余的入站事件被读取,那么它会从 ChannelPipeline 的头部开始流动,最终,数据将会达到 ChannelPipeline 的尾端,届时,所有解决就都完结了。
出站静止:数据的出站静止(即正在被写的数据)在概念上也是一样的。在这种状况下,数据将从ChannelOutboundHandler 链的尾端开始流动,直到它达到链的头部为止。在这之后,出站数据将会达到网络传输层,这里显示为 Socket。通常状况下,这将触发一个写操作。
程序性:当两个类别的ChannelHandler都混合增加到同一个ChannelPipeline,尽管 ChannelInboundHandle 和 ChannelOutboundHandle 都扩大自 ChannelHandler,然而Netty 能辨别 ChannelInboundHandler 实现和 ChannelOutboundHandler 实现,并确保数据只会在具备雷同定向类型的两个 ChannelHandler 之间传递。
次要办法如下:
- addFirst、addBefore、addAfter、addLast将一个 ChannelHandler 增加到 ChannelPipeline 中
- remove 将一个 ChannelHandler 从 ChannelPipeline 中移除
- replace 将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler
- get 通过类型或者名称返回 ChannelHandler
- context 返回和 ChannelHandler 绑定的 ChannelHandlerContext
- names 返回 ChannelPipeline 中所有 ChannelHandler 的名称
4.第一个繁难netty程序
4.1 服务端
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws InterruptedException { int port = 9999; EchoServer echoServer = new EchoServer(port); System.out.println("服务器行将启动"); echoServer.start(); System.out.println("服务器敞开"); } public void start() throws InterruptedException { final EchoServerHandler serverHandler = new EchoServerHandler(); /*线程组*/ EventLoopGroup group = new NioEventLoopGroup(); try { /*服务端启动必须*/ ServerBootstrap b = new ServerBootstrap(); b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定应用NIO进行网络传输*/ .localAddress(new InetSocketAddress(port))/*指定服务器监听端口*/ /*服务端每接管到一个连贯申请,就会新启一个socket通信,也就是channel, 所以上面这段代码的作用就是为这个子channel减少handle*/ .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { /*增加到该子channel的pipeline的尾部*/ ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到实现*/ f.channel().closeFuture().sync();/*阻塞直到服务器的channel敞开*/ } finally { group.shutdownGracefully().sync();/*优雅敞开线程组*/ } }}
@ChannelHandler.Sharable/*不加这个注解那么在减少到childHandler时就必须new进去*/public class EchoServerHandler extends ChannelInboundHandlerAdapter { /*客户端读到数据当前,就会执行*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; System.out.println("Server accept"+in.toString(CharsetUtil.UTF_8)); ctx.write(in); } /*** 服务端读取实现网络数据后的解决*/ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } /*** 产生异样后的解决*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
4.2 客户端
public class EchoClient { private final int port; private final String host; public EchoClient(int port, String host) { this.port = port; this.host = host; } public void start() throws InterruptedException { /*线程组*/ EventLoopGroup group = new NioEventLoopGroup(); try{ /*客户端启动必备*/ Bootstrap b = new Bootstrap(); b.group(group)/*把线程组传入*/ /*指定应用NIO进行网络传输*/ .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host,port)) .handler(new EchoClientHandle()); /*连贯到近程节点,阻塞直到连贯实现*/ ChannelFuture f = b.connect().sync(); /*阻塞程序,直到Channel产生了敞开*/ f.channel().closeFuture().sync(); }finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { new EchoClient(9999,"127.0.0.1").start(); }}
public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> { /*客户端读到数据当前,就会执行*/ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client acccept:"+msg.toString(CharsetUtil.UTF_8)); } /*连贯建设当前*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer( "Hello Netty",CharsetUtil.UTF_8)); //ctx.fireChannelActive(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
一个繁难的netty程序就实现了,实现了通信性能。