共计 8474 个字符,预计需要花费 22 分钟才能阅读完成。
1.Netty 简介
Netty 是由 JBOSS 提供的一个 java 开源框架。
Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以疾速开发高性能、高可靠性的网络服务器和客户端程序。
2. 为什么应用 Netty
尽管 JAVA NIO 框架提供了多路复用 IO 的反对,然而并没有提供下层“信息格式”的良好封装。例如前两者并没有提供针对 Protocol Buffer、JSON 这些信息格式的封装,然而 Netty 框架提供了这些数据格式封装(基于责任链模式的编
码和解码性能);
2、NIO 的类库和 API 相当简单,应用它来开发,须要十分熟练地把握 Selector、ByteBuffer、ServerSocketChannel、SocketChannel 等,须要很多额定的编程技能来辅助应用 NIO, 例如,因为 NIO 波及了 Reactor 线程模型,所以必须必须对多线程和网络编程十分相熟能力写出高质量的 NIO 程序
3、要编写一个牢靠的、易保护的、高性能的 NIO 服务器利用。除了框架自身要兼容实现各类操作系统的实现外。更重要的是它应该还要解决很多下层特有服务,例如:客户端的权限、还有下面提到的信息格式封装、简略的数据读取,断连重连,半包读写,心跳等等,这些 Netty 框架都提供了响应的反对。
4、JAVA NIO 框架存在一个 poll/epoll bug:Selector doesn’t block on Selector.select(timeout),不能 block 意味着 CPU 的使用率会变成 100%(这是底层 JNI 的问题,下层要解决这个异样实际上也好办)。当然这个 bug 只有在 Linux 内核上能力重现
3.Netty 重要组件
3.1 Channel 接口
在 Java 的网络编程中,其根本的结构是类 Socket。Netty 的 Channel 接口所提供的 API,被用于所有的 I/O 操作。大大地升高了间接应用 Socket 类的复杂性。此外,Channel 也是领有许多预约义的、专门化实现的宽泛类层次结构的根。
channel 生命周期
- ChannelUnregistered:Channel 曾经被创立,但还未注册 EventLoop
- ChannelRegistered:Channel 曾经被注册到了 EventLoop
- ChannelActive:Channel 处于活动状态(曾经连贯到它的近程节点)。它当初能够接管和发送数据了
- ChannelInactive:Channel 没有连贯到近程节点
当这些状态产生扭转时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其能够随后对它们做出响应。
channel 重要办法
1.eventLoop:返回调配给 Channel 的 EventLoop
2.pipeline:返回调配给 Channel 的 ChannelPipeline
3.isActive:如果 Channel 是流动的,则返回 true。流动的意义可能依赖于底层的传输。
例如,一个 Socket 传输一旦连贯到了近程节点便是流动的,而一个 Datagram 传输一旦被关上便是流动的。
4.localAddress:返回本地的 SokcetAddress
5,remoteAddress:返回近程的 SocketAddress
6.write:将数据写到近程节点。这个数据将被传递给 ChannelPipeline,并且排队直到它被冲刷
7.flush:将之前已写的数据冲刷到底层传输,如一个 Socket
8.writeAndFlush:一个简便的办法,等同于调用 write()并接着调用 flush()
3.2 EventLoop 丶 EventLoopGroup
EventLoop 定义了 Netty 的外围形象,用于解决网络连接的生命周期中所产生的事件。EventLoop 充当任务调度丶线程治理丶线程调配的重要对象。
3.2.1 任务调度
查看 EventLoop 的继承关系可看出,EventLoop 具备任务调度,充当线程池的作用, 一个 EventLoop 将由一个永远都不会扭转的 Thread 驱动. 依据配置和可用外围的不同,可能会创立多个 EventLoop 实例用以优化资源的应用,且单个 EventLoop 可指派用于服务多个 Channel(解决多个网络连接)。
Netty 的 EventLoop 在继承了 ScheduledExecutorService, 可调度一个工作以便稍后(提早)执行或者周期性地执行。
比方,想要注册一个在客户端曾经连贯了 5 分钟之后触发的工作。一个常见的做法是,发送心跳音讯到近程节点,查看连贯是否还活着。如果没有响应,你便晓得能够敞开该 Channel 了。
3.2.2 线程治理
在外部,提交工作,如果(以后)调用线程正是撑持 EventLoop 的线程,那么所提交的代码块将会被(间接)执行。否则,EventLoop 将调度该工作以便稍后执行,并将它放入到外部队列中。当 EventLoop 下次解决它的事件时,它会执行队列中的那些工作 / 事件。
3.2.3 线程调配
服务于 Channel 的 I/O 和事件的 EventLoop 则蕴含在 EventLoopGroup 中。
IO 多路复用:在以后的线程模型中,它们可能会被多个 Channel 所共享。这使得能够通过尽可能大量的 Thread 来撑持大量的 Channel,而不是每个 Channel 调配一个 Thread。
调配 EventLoop:EventLoopGroup 负责为每个新创建的 Channel 调配一个 EventLoop。在以后实现中,应用程序循环(round-robin)的形式进行调配以获取一个平衡的散布,并且雷同的 EventLoop 可能会被调配给多个 Channel。
线程平安:一旦一个 Channel 被调配给一个 EventLoop,它将在它的整个生命周期中都应用这个 EventLoop(以及相关联的 Thread)。请牢记这一点,因为它能够使你从担心你的 ChannelHandler 实现中的线程平安和同步问题中解脱进去。
留神:须要留神,EventLoop 对 ThreadLocal 的应用的影响。因为一个 EventLoop 通常会用于撑持多个 Channel.
所以对于所有相关联的 Channel 来说若应用它来实现状态追踪则会有线程平安问题。然而在一些无状态的上下文中,它依然能够被用于在多个 Channel 之间共享一些重度的或者代价低廉的对象,甚至是事件。
3.3 ChannelFuture 接口
Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会立刻返回,所以咱们须要一种用于在将来的某个工夫点确定其后果的办法。因而 Netty 提供了 ChannelFuture 接口。
其 addListener()办法注册了一个 ChannelFutureListener,以便在某个操作实现时(无论是否胜利)失去告诉。
3.4 ChannelHandler 接口
3.4.1 ChannelHandler 次要接口
对于应用程序开发人员的角度来看,Netty 的次要组件是 ChannelHandler,它充当了所有解决入站和出站数据的利用程序逻辑的容器。ChannelHandler 的办法是由网络事件触发的。
对于 netty 来说, 次要流程就是音讯通信从进入入站处理器再到进来出站处理器, 即接管音讯再响应音讯。
入站处理器 :举例来说,ChannelInboundHandler 是一个常常实现的子接口。这种类型的 ChannelHandler 接支出站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所解决。
次要办法如下:
- channelRegistered 当 Channel 曾经注册到它的 EventLoop 并且可能解决 I/O 时被调用
- channelUnregistered 当 Channel 从它的 EventLoop 登记并且无奈解决任何 I/O 时被调用
- channelActive 当 Channel 处于活动状态时被调用;Channel 曾经连贯 / 绑定并且曾经就绪
- channelInactive 当 Channel 来到活动状态并且不再连贯它的近程节点时被调用
- channelReadComplete 当 Channel 上的一个读操作实现时被调用
- channelRead 当从 Channel 读取数据时被调用
出站处理器 :当你要给连贯的客户端发送响应时,也能够从 ChannelInboundHandler 间接冲刷数据而后输入到对端, 即调用 writeAndFlush 或 flush 时则会通过出站处理器(常实现channelOutbountHandler 子接口)
次要办法如下:
- bind(ChannelHandlerContext,SocketAddress,ChannelPromise)当申请将 Channel 绑定到本地地址时被调用
- connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)当申请将 Channel 连贯到近程节点时被调用
- disconnect(ChannelHandlerContext,ChannelPromise)当申请将 Channel 从近程节点断开时被调用
- close(ChannelHandlerContext,ChannelPromise) 当申请敞开 Channel 时被调用
- deregister(ChannelHandlerContext,ChannelPromise)当申请将 Channel 从它的 EventLoop 登记时被调用
- read(ChannelHandlerContext) 当申请从 Channel 读取更多的数据时被调用
- flush(ChannelHandlerContext) 当申请通过 Channel 将入队数据冲刷到近程节点时被调用
- write(ChannelHandlerContext,Object,ChannelPromise) 当申请通过 Channel 将数据写到近程节点时被调用。
ChannelHandler 的适配器有一些适配器类应用于自定义处理器,因为它们提供了定义在对应接口中的所有办法的默认实现。因为你有时会疏忽那些不感兴趣的事件,所以 Netty 提供了形象基类 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。
3.4.2 ChannelHandler 生命周期
接口 ChannelHandler 定义的生命周期操作,在 ChannelHandler 被增加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用这些操作。这些办法中的每一个都承受一个 ChannelHandlerContext 参数。
- handlerAdded: 当把 ChannelHandler 增加到 ChannelPipeline 中时被调用
- handlerRemoved: 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
- exceptionCaught: 当处理过程中在 ChannelPipeline 中有谬误产生时被调用
3.5 ChannelPipeline 接口
与 channel 绑定: 当 Channel 被创立时,它将会被主动地调配一个新的 ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个 ChannelPipeline,也不能拆散其以后的。
ChannelHandler 容器: ChannelHandler 在 ChannelPipeline 里工作,执行程序是由它们被增加的程序来决定的, 它们是在应用程序的初始化或者疏导阶段被装置的。这些对象接管事件、执行它们所实现的解决逻辑,并将数据传递给链中的下一个 ChannelHandler。
入站静止:如果一个音讯或者任何其余的入站事件被读取,那么它会从 ChannelPipeline 的头部开始流动,最终,数据将会达到 ChannelPipeline 的尾端,届时,所有解决就都完结了。
出站静止:数据的出站静止(即正在被写的数据)在概念上也是一样的。在这种状况下,数据将从 ChannelOutboundHandler 链的尾端开始流动,直到它达到链的头部为止。在这之后,出站数据将会达到网络传输层,这里显示为 Socket。通常状况下,这将触发一个写操作。
程序性:当两个类别的 ChannelHandler 都混合增加到同一个 ChannelPipeline, 尽管 ChannelInboundHandle 和 ChannelOutboundHandle 都扩大自 ChannelHandler,然而 Netty 能辨别 ChannelInboundHandler 实现和 ChannelOutboundHandler 实现,并确保数据只会在具备雷同定向类型的两个 ChannelHandler 之间传递。
次要办法如下:
- addFirst、addBefore、addAfter、addLast 将一个 ChannelHandler 增加到 ChannelPipeline 中
- remove 将一个 ChannelHandler 从 ChannelPipeline 中移除
- replace 将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler
- get 通过类型或者名称返回 ChannelHandler
- context 返回和 ChannelHandler 绑定的 ChannelHandlerContext
- names 返回 ChannelPipeline 中所有 ChannelHandler 的名称
4. 第一个繁难 netty 程序
4.1 服务端
public class EchoServer {
private final int port;
public EchoServer(int port) {this.port = port;}
public static void main(String[] args) throws InterruptedException {
int port = 9999;
EchoServer echoServer = new EchoServer(port);
System.out.println("服务器行将启动");
echoServer.start();
System.out.println("服务器敞开");
}
public void start() throws InterruptedException {final EchoServerHandler serverHandler = new EchoServerHandler();
/* 线程组 */
EventLoopGroup group = new NioEventLoopGroup();
try {
/* 服务端启动必须 */
ServerBootstrap b = new ServerBootstrap();
b.group(group)/* 将线程组传入 */
.channel(NioServerSocketChannel.class)/* 指定应用 NIO 进行网络传输 */
.localAddress(new InetSocketAddress(port))/* 指定服务器监听端口 */
/* 服务端每接管到一个连贯申请,就会新启一个 socket 通信,也就是 channel,所以上面这段代码的作用就是为这个子 channel 减少 handle*/
.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {
/* 增加到该子 channel 的 pipeline 的尾部 */
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync();/* 异步绑定到服务器,sync()会阻塞直到实现 */
f.channel().closeFuture().sync();/* 阻塞直到服务器的 channel 敞开 */} finally {group.shutdownGracefully().sync();/* 优雅敞开线程组 */}
}
}
@ChannelHandler.Sharable
/* 不加这个注解那么在减少到 childHandler 时就必须 new 进去 */
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/* 客户端读到数据当前,就会执行 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf)msg;
System.out.println("Server accept"+in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
/*** 服务端读取实现网络数据后的解决 */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
/*** 产生异样后的解决 */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {cause.printStackTrace();
ctx.close();}
}
4.2 客户端
public class EchoClient {
private final int port;
private final String host;
public EchoClient(int port, String host) {
this.port = port;
this.host = host;
}
public void start() throws InterruptedException {
/* 线程组 */
EventLoopGroup group = new NioEventLoopGroup();
try{
/* 客户端启动必备 */
Bootstrap b = new Bootstrap();
b.group(group)/* 把线程组传入 */
/* 指定应用 NIO 进行网络传输 */
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host,port))
.handler(new EchoClientHandle());
/* 连贯到近程节点,阻塞直到连贯实现 */
ChannelFuture f = b.connect().sync();
/* 阻塞程序,直到 Channel 产生了敞开 */
f.channel().closeFuture().sync();}finally {group.shutdownGracefully().sync();}
}
public static void main(String[] args) throws InterruptedException {new EchoClient(9999,"127.0.0.1").start();}
}
public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {
/* 客户端读到数据当前,就会执行 */
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {System.out.println("client acccept:"+msg.toString(CharsetUtil.UTF_8));
}
/* 连贯建设当前 */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty",CharsetUtil.UTF_8));
//ctx.fireChannelActive();}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {cause.printStackTrace();
ctx.close();}
}
一个繁难的 netty 程序就实现了,实现了通信性能。