乐趣区

JavaJava-AIO使用

欢迎关注公众号: nullobject
文章首发在个人博客 https://www.nullobject.cn,公众号 nullobject 同步更新。

这篇文章主要介绍 Java AIO 网络编程。

1. AIO 是什么

本文所说的 AIO 特指 Java 环境下的 AIOAIOjavaIO 模型 的一种,作为 NIO 的改进和增强随 JDK1.7 版本更新被集成在 JDKnio包中,因此 AIO 也被称作是 NIO2.0。区别于传统的BIO(Blocking IO, 同步阻塞式模型,JDK1.4 之前就存在于 JDK 中,NIOJDK1.4 版本发布更新)的阻塞式读写,AIO提供了从建立连接到读、写的全异步操作。AIO可用于异步的 文件读写 网络通信。本文将介绍如何使用 AIO 实现一个简单的网络通信以及 AIO 的一些比较关键的 API。

2. 简单的使用

首先以 Server 端 为例,需要创建一个 AsynchronousServerSocketChannel 示例并绑定监听端口,接着开始监听客户端连接:

public class SimpleAIOServer {public static void main(String[] args) {
        try {
            final int port = 5555;
            // 首先打开一个 ServerSocket 通道并获取 AsynchronousServerSocketChannel 实例:AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
            // 绑定需要监听的端口到 serverSocketChannel:  
            serverSocketChannel.bind(new InetSocketAddress(port));
            // 实现一个 CompletionHandler 回调接口 handler,// 之后需要在 handler 的实现中处理连接请求和监听下一个连接、数据收发,以及通信异常。CompletionHandler<AsynchronousSocketChannel, Object> handler = new CompletionHandler<AsynchronousSocketChannel,
                    Object>() {
                @Override
                public void completed(final AsynchronousSocketChannel result, final Object attachment) {
                    // 继续监听下一个连接请求  
                    serverSocketChannel.accept(attachment, this);
                    try {System.out.println("接受了一个连接:" + result.getRemoteAddress()
                                                              .toString());
                        // 给客户端发送数据并等待发送完成
                        result.write(ByteBuffer.wrap("From Server:Hello i am server".getBytes()))
                              .get();
                        ByteBuffer readBuffer = ByteBuffer.allocate(128);
                        // 阻塞等待客户端接收数据
                        result.read(readBuffer)
                              .get();
                        System.out.println(new String(readBuffer.array()));

                    } catch (IOException | InterruptedException | ExecutionException e) {e.printStackTrace();
                    }
                }

                @Override
                public void failed(final Throwable exc, final Object attachment) {System.out.println("出错了:" + exc.getMessage());
                }
            };
            serverSocketChannel.accept(null, handler);
            // 由于 serverSocketChannel.accept(null, handler); 是一个异步方法,调用会直接返回,// 为了让子线程能够有时间处理监听客户端的连接会话,// 这里通过让主线程休眠一段时间 (当然实际开发一般不会这么做) 以确保应用程序不会立即退出。TimeUnit.MINUTES.sleep(Integer.MAX_VALUE);
        } catch (IOException | InterruptedException e) {e.printStackTrace();
        }
    }
}

其中 result 即表示当前接受的客户端的连接会话,与客户端的通信都需要通过该连接会话进行。

Client 端

public class SimpleAIOClient {public static void main(String[] args) {
        try {
            // 打开一个 SocketChannel 通道并获取 AsynchronousSocketChannel 实例
            AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
            // 连接到服务器并处理连接结果
            client.connect(new InetSocketAddress("127.0.0.1", 5555), null, new CompletionHandler<Void, Void>() {
                @Override
                public void completed(final Void result, final Void attachment) {System.out.println("成功连接到服务器!");
                    try {
                        // 给服务器发送信息并等待发送完成
                        client.write(ByteBuffer.wrap("From client:Hello i am client".getBytes()))
                              .get();
                        ByteBuffer readBuffer = ByteBuffer.allocate(128);
                        // 阻塞等待接收服务端数据
                        client.read(readBuffer)
                              .get();
                        System.out.println(new String(readBuffer.array()));
                    } catch (InterruptedException | ExecutionException e) {e.printStackTrace();
                    }
                }

                @Override
                public void failed(final Throwable exc, final Void attachment) {exc.printStackTrace();
                }
            });
            TimeUnit.MINUTES.sleep(Integer.MAX_VALUE);
        } catch (IOException | InterruptedException e) {e.printStackTrace();
        }
    }
}

3. AIO 主要 API 详解

从第 2 节例子可以看到,实现一个最简单的 AIO socket 通信serverclient,主要需要这些相关的类和接口:

  • AsynchronousServerSocketChannel

    服务端 Socket 通道类,负责服务端 Socket 的创建和监听;

  • AsynchronousSocketChannel

    客户端 Socket 通道类,负责客户端消息读写;

  • CompletionHandler<A,V>

    消息处理回调接口,是一个负责消费异步 IO 操作结果的消息处理器;

  • ByteBuffer

    负责承载通信过程中需要读、写的消息。

此外,还有可选的用于异步通道资源共享的 AsynchronousChannelGroup 类,接下来将一一介绍这些类的主要接口及使用。

3.1.1 AsynchronousServerSocketChannel

AsynchronousServerSocketChannel是一个 流式监听套接字 的异步通道。

AsynchronousServerSocketChannel的使用需要经过三个步骤:创建 / 打开通道 绑定地址和端口 监听客户端连接请求

一、创建 / 打开通道 :简单地,可以通过调用AsynchronousServerSocketChannel 的静态方法 open() 来创建 AsynchronousServerSocketChannel 实例:

try {AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
} catch (IOException e) {e.printStackTrace();
}

当打开通道失败时,会抛出一个 IOException 异常。AsynchronousServerSocketChannel提供了设置通道分组 (AsynchronousChannelGroup) 的功能,以实现组内通道资源共享。可以调用 open(AsynchronousChannelGroup) 重载方法创建指定分组的通道:

try {ExecutorService pool = Executors.newCachedThreadPool();
  AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 10);
  AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group);
} catch (IOException e) {e.printStackTrace();
}

AsynchronousChannelGroup封装了处理由绑定到组的异步通道所触发的 I / O 操作完成所需的机制。每个 AsynchronousChannelGroup 关联了一个被用于 提交处理 I / O 事件 分发消费在组内通道上执行的异步操作结果的 completion-handlers的线程池。除了处理 I / O 事件,该线程池还有可能处理其他一些用于支持完成异步 I / O 操作的任务。从上面例子可以看到,通过指定 AsynchronousChannelGroup 的方式打开 AsynchronousServerSocketChannel,可以定制 server channel 执行的线程池。有关AsynchronousChannelGroup 的详细介绍可以查看官方文档注释。如果不指定 AsynchronousChannelGroup,则AsynchronousServerSocketChannel 会归类到一个默认的分组中。

二、绑定地址和端口:通过调用 AsynchronousServerSocketChannel.bind(SocketAddress) 方法来绑定监听地址和端口:

// 构建一个 InetSocketAddress 实例以指定监听的地址和端口,如果需要指定 ip,则调用 InetSocketAddress(ip,port)构造方法创建即可
serverSocketChannel.bind(new InetSocketAddress(port));

三、监听和接收客户端连接请求:

监听客户端连接请求,主要通过调用 AsynchronousServerSocketChannel.accept() 方法完成。accept()有两个重载方法:

public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>);
public abstract Future<AsynchronousSocketChannel> accept();

这两个重载方法的行为方式完全相同,事实上,AIO的很多异步 API 都封装了诸如此类的重载方法:提供 CompletionHandle 回调参数或者返回一个 Future<T> 类型变量。用过 Feture 接口的都知道,可以调用 Feture.get() 方法 阻塞 等待调用结果。以第一个重载方法为例,当接受一个新的客户端连接,或者 accept 操作发生异常时,会通过 CompletionHandler 将结果返回给用户处理:

serverSocketChannel
.accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel,
        AsynchronousServerSocketChannel>() {
          @Override
          public void completed(final AsynchronousSocketChannel result,
                                final AsynchronousServerSocketChannel attachment) {
            // 接收到新的客户端连接时回调
            // result 即和该客户端的连接会话
            // 此时可以通过 result 与客户端进行交互
          }

          @Override
          public void failed(final Throwable exc, final AsynchronousServerSocketChannel attachment) {// accept 失败时回调}
        });

需要注意的是,AsynchronousServerSocketChannel是线程安全的,但在任何时候 同一时间内只能允许有一个 accept 操作 。因此,必须得等待前一个accept 操作完成之后才能启动下一个accept

serverSocketChannel
.accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel,
        AsynchronousServerSocketChannel>() {
          @Override
          public void completed(final AsynchronousSocketChannel result,
                                final AsynchronousServerSocketChannel attachment) {
            // 接收到新的客户端连接,此时本次 accept 已经完成
            // 继续监听下一个客户端连接到来
            serverSocketChannel.accept(serverSocketChannel,this);
            // result 即和该客户端的连接会话
            // 此时可以通过 result 与客户端进行交互
          }
          ...
        });

此外 ,还可以通过以下方法获取和设置AsynchronousServerSocketChannelsocket选项:

// 设置 socket 选项
serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// 获取 socket 选项设置
boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);

其中 StandardSocketOptions 类封装了常用的 socket 设置选项。

获取本地地址:

InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();

3.1.2 AsynchronousSocketChannel

AsynchronousSocketChannel是一个 流式连接套接字 的异步通道。

AsynchronousSocketChannel表示服务端与客户端之间的连接通道。客户端可以通过调用 AsynchronousSocketChannel 静态方法 open() 创建,而服务端则通过调用 AsynchronousServerSocketChannel.accept() 方法后由 AIO 内部在合适的时候创建。下面 以客户端实现为例,介绍AsynchronousSocketChannel

一、创建 AsynchronousSocketChannel 并连接到服务端:需要通过 open() 创建和打开一个 AsynchronousSocketChannel 实例,再调用其 connect() 方法连接到服务端,接着才可以与服务端交互:

// 打开一个 socket 通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// 阻塞等待连接成功
socketChannel.connect(new InetSocketAddress(ip,port)).get();
// 连接成功,接下来可以进行 read、write 操作

AsynchronousServerSocketChannelAsynchronousSocketChannel 也提供了 open(AsynchronousChannelGroup) 方法用于指定通道分组和定制线程池。socketChannel.connect()也提供了 CompletionHandler 回调和 Future 返回值两个重载方法,上面例子使用带 Future 返回值的重载,并调用 get() 方法阻塞等待连接建立完成。

二、发送消息:

可以构建一个 ByteBuffer 对象并调用 socketChannel.write(ByteBuffer) 方法异步发送消息,并通过 CompletionHandler 回调接收处理发送结果:

ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes());
socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() {
  @Override
  public void completed(final Integer result, final Object attachment) {// 发送完成,result:总共写入的字节数}

  @Override
  public void failed(final Throwable exc, final Object attachment) {// 发送失败}
});

三、读取消息:

构建一个指定接收长度的 ByteBuffer 用于接收数据,调用 socketChannel.read() 方法读取消息并通过 CompletionHandler 处理读取结果:

ByteBuffer readBuffer = ByteBuffer.allocate(128);
socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() {
  @Override
  public void completed(final Integer result, final Object attachment) {// 读取完成,result: 实际读取的字节数。如果通道中没有数据可读则 result=-1。}

  @Override
  public void failed(final Throwable exc, final Object attachment) {// 读取失败}
});

此外 AsynchronousSocketChannel 也封装了设置 / 获取 socket 选项的方法:

// 设置 socket 选项
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// 获取 socket 选项设置
boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);

3.1.3 CompletionHandler

CompletionHandler是一个用于 消费异步 I / O 操作结果 的处理器。

AIO 中定义的异步通道允许指定一个 CompletionHandler 处理器消费一个异步操作的结果。从上文中也可以看到,AIO 中大部分的异步 I / O 操作接口都封装了一个带 CompletionHandler 类型参数的重载方法,使用 CompletionHandler 可以很方便地处理 AIO 中的异步 I / O 操作结果。CompletionHandler是一个具有两个泛型类型参数的接口, 声明了两个接口方法:

public interface CompletionHandler<V,A> {void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

其中,泛型 V 表示 I / O 操作的结果类型,通过该类型参数消费 I / O 操作的结果;泛型 A 为附加到 I / O 操作中的对象类型,可以通过该类型参数将需要的变量传入到 CompletionHandler 实现中使用。因此,AIO 中大部分的异步 I / O 操作都有一个类似这样的重载方法:

<V,A> void ioOperate(params,A attachment,CompletionHandler<V,A> handler);

例如,AsynchronousServerSocketChannel.accept()方法:

public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);

AsynchronousSocketChannel.write()方法等:

public final <A> void write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A> handler)

当 I / O 操作成功完成时,会回调到 completed 方法,failed方法则在 I / O 操作失败时被回调。需要注意的是:CompletionHandler 的实现中应当即使处理操作结果,以避免一直占用调用线程而不能分发其他的 CompletionHandler 处理器。

4 The End :)

退出移动版