关于java:Netty-源码分析系列二Netty-架构设计

3次阅读

共计 7259 个字符,预计需要花费 19 分钟才能阅读完成。

前言

上一篇文章,咱们对 Netty做了一个根本的概述,晓得什么是 Netty 以及 Netty 的简略利用。

Netty 源码剖析系列(一)Netty 概述

本篇文章咱们就来说说 Netty 的架构设计。学习一个框架之前,咱们首先要弄懂它的设计原理,而后再进行深层次的剖析。

接下来咱们从三个方面来剖析 Netty 的架构设计。

Selector 模型

Java NIO 是基于 Selector 模型来实现非阻塞的 I/O。Netty 底层是基于 Java NIO 实现的,因而也应用了 Selector 模型。

Selector 模型解决了传统的阻塞 I/O 编程一个客户端一个线程的问题。Selector 提供了一种机制,用于监督一个或多个 NIO 通道,并辨认何时能够应用一个或多个 NIO 通道进行数据传输。这样,一个线程就能够治理多个通道,从而治理多个网络连接。

Selector 提供了抉择执行曾经就绪的工作的能力。从底层来看,Selector 会轮询 Channel 是否曾经筹备好执行每个 I/O 操作。Selector 容许单线程解决多个 Channel。Selector 是一种多路复用的技术。

SelectableChannel

并不是所有的 Channel 都是能够被 Selector 复用的,只有抽象类 SelectableChannel的子类能力被 Selector 复用。

例如,FileChannel 就不能被选择器复用,因为 FileChannel 不是 SelectableChannel 的子类。

为了与 Selector 一起应用,SelectableChannel必须首先通过 register 办法来注册此类的实例。此办法返回一个新的 SelectionKey 对象,该对象示意 Channel 曾经在 Selector 进行了注册。向 Selector 注册后,Channel将放弃注册状态,直到登记为止。

一个 Channel 最多能够应用任何一个特定的 Selector 注册一次,然而雷同的 Channel 能够注册到多个 Selector 上。能够通过调用 isRegistered办法来确定是否向一个或多个 Selector 注册了 Channel。

SelectableChannel能够平安的供多个并发线程应用。

Channel 注册到 Selector

应用 SelectableChannelregister 办法,可将 Channel 注册到Selector。办法接口源码如下:

    public final SelectionKey register(Selector sel, int ops)
        throws ClosedChannelException
    {return register(sel, ops, null);
    }
    
    public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;

其中各选项阐明如下:

  • sel:指定 Channel 要注册的 Selector
  • ops : 指定 Selector须要查问的通道的操作。

一个 Channel 在 Selector 注册其代表的是一个 SelectionKey 事件,SelectionKey的类型包含:

  • OP_READ:可读事件;值为:1<<0
  • OP_WRITE:可写事件;值为:1<<2
  • OP_CONNECT:客户端连贯服务端的事件 (tcp 连贯),个别为创立SocketChannel 客户端 channel;值为:1<<3
  • OP_ACCEPT:服务端接管客户端连贯的事件,个别为创立 ServerSocketChannel 服务端 channel;值为:1<<4

具体的注册代码如下:

 // 1. 创立通道管理器(Selector)
 Selector selector = Selector.open();
 
 // 2. 创立通道 ServerSocketChannel
 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
 
 // 3.channel 要注册到 Selector 上就必须是非阻塞的,所以 FileChannel 是不能够应用 Selector 的,因为 FileChannel 是阻塞的
 serverSocketChannel.configureBlocking(false);
 
 // 4. 第二个参数指定了咱们对 Channel 的什么类型的事件感兴趣
 SelectionKey key = serverSocketChannel.register(selector , SelectionKey.OP_READ);
 
 // 也能够应用或运算 | 来组合多个事件,例如
 SelectionKey key = serverSocketChannel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);

值得注意的是 :一个 Channel 仅仅能够被注册到一个 Selector 一次, 如果将 Channel 注册到 Selector 屡次, 那么其实就是相当于更新 SelectionKey interest set

SelectionKey

ChannelSelector 关系确定后之后,并且一旦 Channel 处于某种就绪状态,就能够被选择器查问到。这个工作再调用 Selectorselect 办法实现。select 办法的作用,就是对感兴趣的通道操作进行就绪状态的查问。

// 当注册事件达到时,办法返回,否则该办法会始终阻塞
selector.select();

SelectionKey 蕴含了 interest 汇合,代表了所抉择的感兴趣的事件汇合。能够通过 SelectionKey 读写 interest 汇合,例如:

// 返回以后感兴趣的事件列表
int interestSet = key.interestOps();

// 也可通过 interestSet 判断其中蕴含的事件
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;    

// 能够通过 interestOps(int ops)办法批改事件列表
key.interestOps(interestSet | SelectionKey.OP_WRITE);

能够看到,用 位与 操作 interest 汇合和给定的 SelectionKey 常量,能够确定某个确定的事件是否在 interest 汇合中。

SelectionKey 蕴含了 ready 汇合。ready 汇合是通道曾经准备就绪的操作的汇合。在一次抉择之后,会首先拜访这个 ready 汇合。能够这样拜访 ready 汇合:

int readySet = key.readyOps();

// 也可通过四个办法来别离判断不同事件是否就绪
key.isReadable();    // 读事件是否就绪
key.isWritable();    // 写事件是否就绪
key.isConnectable(); // 客户端连贯事件是否就绪
key.isAcceptable();  // 服务端连贯事件是否就绪

咱们能够通过 SelectionKey 来获取以后的 channelselector

// 返回以后事件关联的通道,可转换的选项包含:`ServerSocketChannel` 和 `SocketChannel`
Channel channel = key.channel();

// 返回以后事件所关联的 Selector 对象
Selector selector = key.selector();

能够将一个对象或者其余信息附着到 SelectionKey 上,这样就能不便地辨认某个特定的通道。

key.attach(theObject);
Object attachedObj = key.attachment();

还能够在用 register() 办法向 Selector 注册 Channel 的时候附加对象。

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

遍历 SelectionKey

一旦调用了 select 办法,并且返回值表明有一个或更多个通道就绪了,而后能够通过调用 selector selectedKey()办法,拜访 SelectionKey 汇合中的就绪通道,如下所示:

Set<SelectionKey> selectionKeys = selector.selectedKeys();

能够遍历这个已抉择的键汇合来拜访就绪的通道,代码如下:

// 获取监听事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 迭代解决
while (iterator.hasNext()) {
    // 获取事件
    SelectionKey key = iterator.next();
    // 移除事件,防止反复解决
    iterator.remove();
    // 可连贯
    if (key.isAcceptable()) {...} 
    // 可读
    if (key.isReadable()) {...}
    // 可写
    if(key.isWritable()){...}
}

事件驱动

Netty是一款异步的事件驱动的网络应用程序框架。在 Netty 中,事件是指对某些操作感兴趣的事。例如,在某个 Channel 注册了 OP_READ,阐明该 Channel 对读感兴趣,当 Channel 中有可读的数据时,它会失去一个事件的告诉。

Netty 事件驱动模型中包含以下外围组件。

Channel

Channel(管道)是 Java NIO 的一个根本形象,代表了一个连贯到如硬件设施、文件、网络 socket 等实体的凋谢连贯,或者是一个可能实现一种或多种不同的I/O 操作的程序。

回调

回调 就是一个办法,一个指向曾经被提供给另外一个办法的办法的援用。这使得后者能够在适当的时候调用前者,Netty 在外部应用了回调来处理事件;当一个回调被触发时,相干的事件能够被一个 ChannelHandler 接口解决。

例如:在上一篇文章中,Netty 开发的服务端的管道处理器代码中,当 Channel 中有可读的音讯时,NettyServerHandler的回调办法 channelRead 就会被调用。

public class NettyServerHandler extends ChannelInboundHandlerAdapter {// 读取数据理论(这里咱们能够读取客户端发送的音讯)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("server ctx =" + ctx);
        Channel channel = ctx.channel();
        // 将 msg 转成一个 ByteBuf
        //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送音讯是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:" + channel.remoteAddress());
    }


    // 解决异样, 个别是须要敞开通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();
    }
}

Future

Future 能够看作是一个异步操作的后果的占位符;它将在将来的某个时刻实现,并提供对其后果的拜访,Netty 提供了 ChannelFuture 用于在异步操作的时候应用,每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture(齐全是异步和事件驱动的)。

以下是一个 ChannelFutureListener应用的示例。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ChannelFuture future = ctx.channel().close();
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {//..}
        });
    }

事件及处理器

在 Netty 中事件依照出 / 入站数据流进行分类:

入站数据或相干状态更改触发的事件包含:

  • 连贯已被激活或者失活。
  • 数据读取。
  • 用户事件。
  • 谬误事件,

出站事件是将来将会登程的某个动作的操作后果:

  • 关上或者敞开到近程节点的连贯。
  • 将数据写或者冲刷到套接字。

每个事件都能够被分发给 ChannelHandler 类中的某个用户实现的办法。如下图展现了一个事件是如何被一个这样的 ChannelHandler 链所解决的。

ChannelHandler 为处理器提供了根本的形象,可了解为一种为了响应特定事件而被执行的回调。

责任链模式

责任链模式 (Chain of Responsibility Pattern) 是一种行为型设计模式,它为申请创立了一个解决对象的链。其链中每一个节点都看作是一个对象,每个节点解决的申请均不同,且外部主动保护一个下一节点对象。当一个申请从链式的首端收回时,会沿着链的门路顺次传递给每一个节点对象,直至有对象解决这个申请为止。

责任链模式的重点在这个 “ 链 ” 上,由一条链去解决类似的申请,在链中决定谁来解决这个申请,并返回相应的后果。在 Netty 中,定义了 ChannelPipeline 接口用于对责任链的形象。

责任链模式会定义一个形象处理器(Handler)角色,该角色对申请进行形象,并定义一个办法来设定和返回对下一个处理器的援用。在 Netty 中,定义了 ChannelHandler 接口承当该角色。

责任链模式的优缺点

长处:

  • 发送者不须要晓得本人发送的这个申请到底会被哪个对象解决掉,实现了发送者和接受者的解耦。
  • 简化了发送者对象的设计。
  • 能够动静的增加节点和删除节点。

毛病:

  • 所有的申请都从链的头部开始遍历,对性能有损耗。
  • 不不便调试。因为该模式采纳了相似递归的形式,调试的时候逻辑比较复杂。

应用场景:

  • 一个申请须要一系列的解决工作。
  • 业务流的解决,例如文件审批。
  • 对系统进行扩大补充。

ChannelPipeline

Netty 的 ChannelPipeline 设计,就采纳了责任链设计模式,底层采纳双向链表的数据结构,,将链上的各个处理器串联起来。

客户端每一个申请的到来,Netty 都认为,ChannelPipeline中的所有的处理器都有机会解决它,因而,对于入栈的申请,全副从头节点开始往后流传,始终流传到尾节点(来到尾节点的 msg 会被开释掉)。

入站事件 :通常指 IO 线程生成了入站数据(艰深了解:从 socket 底层本人往上冒上来的事件都是入站)。
比方 EventLoop 收到 selectorOP_READ事件,入站处理器调用 socketChannel.read(ByteBuffer) 承受到数据后,这将导致通道的 ChannelPipeline 中蕴含的下一个中的 channelRead 办法被调用。

出站事件 :通常指 IO 线程执行理论的输入操作(艰深了解:想被动往 socket 底层操作的事件的都是出站)。
比方 bind 办法用意时申请 server socket 绑定到给定的 SocketAddress,这将导致通道的ChannelPipeline 中蕴含的下一个出站处理器中的 bind 办法被调用。

将事件传递给下一个处理器

处理器必须调用 ChannelHandlerContext 中的事件流传办法,将事件传递给下一个处理器。

入站事件和出站事件的流传办法如下图所示:

以下示例阐明了事件流传通常是如何实现的:

public class MyInboundHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("Connected!");
        ctx.fireChannelActive();}
}

public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {System.out.println("Closing...");
        ctx.close(promise);
    }
}

总结

正是因为 Netty 的分层架构设计十分正当,基于 Netty 的各种应用服务器和协定栈开发才可能如雨后春笋般失去疾速倒退。

结尾

我是一个正在被打击还在致力后退的码农。如果文章对你有帮忙,记得点赞、关注哟,谢谢!

正文完
 0