共计 12445 个字符,预计需要花费 32 分钟才能阅读完成。
IO 模型
IO 申请的两个阶段(Linux)
- IO 调用阶段: 用户过程向内核发动零碎调用
-
IO 执行阶段: 此时用户进行期待 IO 申请解决实现返回, 此阶段分为两步
- 期待数据就绪, 并写入内核缓冲区
-
数据从内核缓冲区 到 用户态缓冲区
- 内核态:运行操作系统程序,操作硬件
- 用户态:运行用户程序
Linux 五种 IO 模型
1. 同步阻塞 IO(BIO)
内核只能同时解决一个申请, 分两个阶段(即上述的IO 执行阶段):
- 零碎调用
- 数据从内核缓冲区读取到用户缓冲区
这个两个操作都是阻塞的所以只有等这两个操作都实现后能力解决其余 IO
2. 同步非阻塞 IO(NIO)
过程的申请不会始终期待而是有专门的线程来轮询这些 IO 过程是否存有数据, 然而轮询过程中会存在着零碎调用导致的高低问切换, 如果申请过多会存在重大的零碎性能耗费
3.IO 多路复用
多路是指 多个数据通道 , 复用指的是 一个或多个固定的线程 来解决每一 Socket
连贯, select
poll
epoll
都是 IO 多路复用的实现, 线程一次能够 select
多个数据通道的数据状态, 解决了 NIO
性能耗费过重的问题
- 文件描述符 fd
文件描述符(File descriptor)模式上是一个非负整数, 是一个索引值, 指向内核为每一个过程所保护的该过程所关上文件的记录表.
– select
这个函数会监督 3 类文件描述符, 别离是writefds
,readfds
,exceptfds
调用 select 函数时会阻塞, 直到 select 有以上 3 中描述符文件就绪或者超时, 一旦某个描述符就绪了, 会告诉程序进行相干的读写操作, 因为 select poll epoll 都是同步 IO, 所以它们都须要在事件就绪后本人负责读写. 也就是 select 会阻塞监听相干事件, 直到解决完读写操作或者超时后才会解除阻塞.select 单个过程可能监听的文件数量是无限的,linux 个别默认是1024
int select(int n,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout);
– poll
int poll (struct pollfd *fds, unsigned int nfds, int timeout);
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events to watch */
short revents; /* returned events witnessed */
};
poll 应用一个 pollfd
的构造体来传导须要监听的事件和要产生的事件, 另外poll 监听的文件描述符个数是没有限度的
– epoll
不须要轮询,工夫复杂度为 O(1)
epoll_create 创立一个白板 寄存 fd_events
epoll_ctl 用于向内核注册新的描述符或者是扭转某个文件描述符的状态。已注册的描述符在内核中会被保护在一棵红黑树上
epoll_wait 通过 回调函数 内核会将 I/O 筹备好的描述符 退出到一个链表 中治理,过程调用 epoll_wait()
便能够失去事件实现的描述符
两种触发模式:
LT: 程度触发
当 epoll_wait()
检测到描述符事件达到时,将此事件告诉过程,过程能够不立刻解决该事件,下次调用 epoll_wait()
会再次告诉过程。是默认的一种模式,并且同时反对 Blocking
和 No-Blocking
。
ET: 边缘触发
和 LT 模式不同的是,告诉之后过程必须立刻处理事件。
下次再调用 epoll_wait()
时不会再失去事件达到的告诉。 很大水平上缩小了 epoll 事件被反复触发的次数,因而效率要比 LT 模式高。只反对 No-Blocking
,以防止因为一个文件句柄的阻塞读 / 阻塞写操作把解决多个文件描述符的工作饿死。
4. 信号驱动模型
信号驱动模型并不罕用, 是一种半异步 IO. 当数据准备就绪后, 内核会发送一个 SIGIO 音讯给利用过程, 过程而后开始读写音讯.
5. 异步 IO
零碎调用会被立刻返回后果, 而后读取写音讯由异步实现.
BIO
BIO – Block-IO 阻塞同步的通信形式
BIO 的问题:
阻塞 \ 同步,BIO 很依赖于网络, 网速不好阻塞工夫会很长; 每次申请都由程序执行并返回, 这是同步的缺点
BIO 的工作流程:
- 服务端启动
- 阻塞期待客户端连贯
- 客户端连贯
- 监听客户端内容
- 客户端断开
- 回到第一步
BioServer
public class BioServer {public static void main(String[] args) {
try {
// 服务端绑定端口
ServerSocket server = new ServerSocket(9000);
while (true) {
// 创立一个 Socket 接管连贯 - 当没有时阻塞
Socket socket = server.accept();
// 获取输出流
InputStream inputStream = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String message;
while (null != (message = reader.readLine())) {System.out.println(message);
}
inputStream.close();
socket.close();}
} catch (IOException e) {e.printStackTrace();
}
}
}
BioClient
public class BioClient {public static void main(String[] args) {
try {
// 创立 socket
Socket socket = new Socket("localhost",9000);
// 获取 Socket 输入流
OutputStream outputStream = socket.getOutputStream();
// 输入流
outputStream.write("hello socket".getBytes());
// 敞开
outputStream.close();
socket.close();} catch (IOException e) {e.printStackTrace();
}
}
}
多线程解决 BIO 阻塞问 题
- 解决了的问题:多个线程解决当一个客户端迟迟不退出时, 其余线程仍然能够解决其它客户端发送过去的申请. 防止了一个申请阻塞导致其余客户端申请始终期待的问题
- 依然存在问题:退出服务端给定固定线程数是 10, 有 10 个客户端创立了连贯 然而没有一个人发送音讯 那么 10 个线程将全副阻塞, 或者有些客户端迟迟没有操作会造成不必要的资源占用.
多线程 BioServer 代码
public class BioServer {private static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
ServerSocket serverSocket;
try {serverSocket = new ServerSocket(9000);
while (true){//new Thread(new BioHandler(serverSocket.accept()){}).start();
executorService.execute(new BioHandler(serverSocket.accept()));
}
} catch (IOException e) {e.printStackTrace();
}
}
}
public class BioHandler implements Runnable {
private Socket socket;
public BioHandler(Socket socket) {this.socket = socket;}
public void run() {
try {InputStream input = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
String m;
while (null != (m = reader.readLine())){System.out.println(m);
}
input.close();
socket.close();} catch (IOException e) {e.printStackTrace();
}
}
}
NIO
java 1.4 版本引入, 给予缓冲区面 \ 向通道的 io 操作
bio | nio |
---|---|
面向流 | 面向缓冲区(buffer) |
阻塞 io | 非阻塞 io |
同步 | 同步 |
无 | Selector(选择器) |
缓冲区(Buffer)
缓冲区介绍
缓冲区是一个特定数据类型的容器, 有 java.nio 包定义, 所有的缓冲区都是 Buffer 抽象类的子类
Buffer 次要用于和 NIO 通道进行通信, 数据从通道读入到缓冲区, 再从缓冲区读取到通道
Buffer 就像是一个数据能够保留多个类型雷同的数据
子类
ByteBuffer
CharBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
根本属性
1.容量 (capacity): 示意缓冲区的最大容量 一旦创立不能批改
2. 限度 (limit): 第一个不可读的索引, 即位于 limit 前面的数据不可读
3. 地位 (position): 下一个要读取或写入数据的索引
4.flip: 将此时的 position 设为 limit,position 置为 0 , 个别是从 inputChannel 将数据读入到 buffer 而后将 buffer flip 后 为了从 buffer 中读取数据 outputChannel
5.标记 (mark) 和复原 (reset): 标记是一个索引, 通过 Buffer.mark()指定一个特定的地位, 应用 reset 办法能够复原到这个地位
public class BufferSample {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.println("capacity:" + buffer.capacity());
System.out.println("limit:" + buffer.limit(10));
System.out.println("position:" + buffer.position());
/**
* 后果:
* capacity:1024
* limit:java.nio.HeapByteBuffer[pos=0 lim=10 cap=1024]
* position:0
*/
System.out.println("==============================");
String str = "hello";
buffer.put(str.getBytes());
System.out.println("position:" + buffer.position());
/**
* 后果:
* position:5
*/
System.out.println("==============================");
System.out.println("pos 和 limit 之间元素的个数:" + buffer.remaining());
buffer.mark();
buffer.put("oo".getBytes());
System.out.println("reset 前 position:" + buffer.position());
buffer.reset();
System.out.println("reset 后 position:" + buffer.position());
/**
* 后果:
* pos 和 limit 之间元素的个数:5
* reset 前 position:7
* reset 后 position:5
*/
System.out.println("==============================");
buffer.rewind();
System.out.println("position:" + buffer.position());
/**
* 后果:
* position:0
*/
System.out.println("==============================");
byte[] dst = new byte[3];
buffer.get(dst);
System.out.println(new String(dst));
System.out.println("position:" + buffer.position());
/**
* 后果:
* hel
* position:3
*/
System.out.println("==============================");
// 将此时的 position 转为 limit, 并将 position 置为 0 - 个别 flip 当前就是开始读取缓冲区类
buffer.flip();
System.out.println("capacity:" + buffer.capacity());
System.out.println("limit:" + buffer.limit());
System.out.println("position:" + buffer.position());
byte[] b = new byte[buffer.limit()];
buffer.get(b,0,2);
System.out.println(new String(b,0,2));
/**
* 后果:
* capacity:1024
* limit:3
* position:0
* he
*/
}
}
间接 / 非间接缓冲区
- 间接缓冲区: 程序间接操作物理映射文件
- 非间接缓冲区:jvm – 操作系统 – 物理内存
通道(Channel)
Channel:相似于流, 然而 Channel 不能间接拜访数据, 只能与缓冲区进行交互
通道主体实现类
1.FileChannel
: 用于读取 写入 映射和操作文件的通道
2.DataGramChannel
: 通过 UDP 读取网络中的数据通道
3.SocketChannel
: 通过 Tcp 读写通道的数据
4.ServerSocketChannel
: 能够监听新进入的 Tcp 连贯, 对每一个新连贯创立一个 SocketChannel
提供 getChannel()办法的类
1.FileInputStream
2.FileOutputStream
3.RandomAccessFile
4.Socket
5.ServerSocket
6.DataGramSocket
通道间接传输
1.transferFrom()
2.transferTo()
public class ChannelSimple {
/**
* 利用通道实现文件复制(非间接缓冲区)
*/
public static void FileNoDirectBufferTest(){
try {
// 创立输入输出流
FileInputStream inputStream = new FileInputStream("../test.txt");
FileOutputStream outputStream = new FileOutputStream("../test2.txt");
// 依据流获取通道
FileChannel inputChannel = inputStream.getChannel();
FileChannel outputChannel = outputStream.getChannel();
// 创立缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 从通道读取数据到缓冲区
while (-1 != inputChannel.read(byteBuffer)){
//limit - position,position - 0
byteBuffer.flip();
// 将缓冲区中的数据写出
outputChannel.write(byteBuffer);
byteBuffer.clear();}
outputChannel.close();
inputChannel.close();
outputStream.close();
inputStream.close();} catch (IOException e) {e.printStackTrace();
}
}
/**
* 利用间接缓冲区实现文件复制(内存映射文件)
* @throws IOException
*/
public static void FileMpDirectBufferTest() throws IOException{
// 创立通道
FileChannel inputChannel = FileChannel.open(Paths.get("../test.txt"), StandardOpenOption.READ);
FileChannel outputChannel = FileChannel.open(Paths.get("../test2.txt"),StandardOpenOption.CREATE,StandardOpenOption.WRITE,StandardOpenOption.READ);
// 内存映射文件
MappedByteBuffer inputBuffer = inputChannel.map(FileChannel.MapMode.READ_ONLY,0,inputChannel.size());
MappedByteBuffer outputBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE,0,inputChannel.size());
// 间接对缓冲区进行数据读写操作
byte [] dst = new byte[inputBuffer.limit()];
inputBuffer.get(dst);
outputBuffer.put(dst);
outputChannel.close();
inputChannel.close();}
/**
* 利用间接缓冲区复制
* @throws IOException
*/
public static void FileDirectBufferTest() throws IOException {
// 创立通道
FileChannel inputChannel = FileChannel.open(Paths.get("../test.txt"), StandardOpenOption.READ);
FileChannel outputChannel = FileChannel.open(Paths.get("../test2.txt"),StandardOpenOption.CREATE,StandardOpenOption.WRITE,StandardOpenOption.READ);
//inputChannel.transferTo(0,inputChannel.size(),outputChannel);
// 等同 下面的正文
outputChannel.transferFrom(inputChannel,0,inputChannel.size());
outputChannel.close();
inputChannel.close();}
}
扩散读取和汇集写入
- 扩散读取 (Scatter): 将一个
Channel
中的数据扩散贮存到多个Buffer
中 - 汇集写入 (Gather): 将多个
Buffer
中的数据写入同一个Channel
中
public class ScatterAndGather {public static void main(String[] args) {
try {
// 创立输入输出流
FileInputStream inputStream = new FileInputStream("../test.txt");
FileOutputStream outputStream = new FileOutputStream("../test2.txt");
// 依据流获取通道
FileChannel inputChannel = inputStream.getChannel();
FileChannel outputChannel = outputStream.getChannel();
// 创立缓冲区
ByteBuffer byteBuffer1 = ByteBuffer.allocate((int)inputChannel.size()/2);
ByteBuffer byteBuffer2 = ByteBuffer.allocate((int)inputChannel.size()/2);
ByteBuffer[] byteBuffers = new ByteBuffer[]{byteBuffer1,byteBuffer2};
// 从通道读取数据到缓冲区 - 扩散写入
while (-1 != inputChannel.read(byteBuffers)){for (ByteBuffer buffer:byteBuffers){
//limit - position,position - 0
buffer.flip();}
// 汇集写出
for (ByteBuffer buffer:byteBuffers) {
// 将缓冲区中的数据写出
outputChannel.write(buffer);
buffer.clear();}
}
outputChannel.close();
inputChannel.close();
outputStream.close();
inputStream.close();} catch (IOException e) {e.printStackTrace();
}
}
}
选择器(Selector)
Selector
个别被称为选择器, 也被称为多路复用器. 用于查看一个或多个通道是否处于可读 \ 可写, 如此能够实现一个线程治理多个Channel
应用 Selector 带来的益处
应用更少的线程来解决Channel
, 能够避免上下文切换带来的性能耗费
能够多路复用的Channel
能够被抉择 (多路复用) 的Channel
都继承自SelectableChannel
SelectableChannel
||
AbstractSelectableChannel
|| || ||
DataGramChannel SocketChannel ServerSocketChannel
所以 FileChannel
不适应与Selector
, 即不能切换为非阻塞模式
Selector 应用根本步骤
1. 创立Selector: Selector selector = Selector.open();
2. 设置为非阻塞为:
`channel.configureBlocking(false);`
3. 注册 Channel
到Selector
:
/**
* 参数 -1: 要注册到的多路复用器
* 参数 -2: 是一个 "interest 汇合", 即要监听事件的汇合(有以下四种)
* OP_CONNECT 连贯
* OP_ACEEPT 接管
* OP_READ 读
* OP_WRITE 写
*/
SelectionKey key = channel.register(selector,SelectionKey.OP_READ);
如果要监听多种事件如下:
SelectionKey key = channel.register(selector,SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
4. 而后就 连贯就绪 | 接管就绪 | 读就绪 | 写就绪
Selector 次要办法
办法 | 形容 |
---|---|
Set<SelectKey> keys() |
返回所有 SelectionKey 汇合, 代表 注册在这个 Selector 上 的Channel |
Set<SelectKey> selectedKeys() |
返回已抉择了的(即有 io 操作的)SelectionKey |
int select() |
监控所有注册了的 Channel , 如果有须要 io 的操作时会将对应的selectKey 退出到 selectedKeys 汇合中, 返回的则是被抉择 (有 io 操作的)Channel 数量, 这个操作时阻 塞的即只有被抉择的 Channel 数量 >= 1 才 返回 |
int select(timeout) |
有超时时长, 始终没有 io 操作的 Channel 呈现, 达到 timeout 呈现的工夫后将主动返回 |
int selectNow() |
无阻塞 立刻返回 |
Selector wakeUp() |
使正在 select() 立刻返回 |
void close() |
敞开 |
SelectionKey 次要办法
SelectionKey
示意 Channel
和Selector
之间的关系,Channel
向 Selector
注册就会产生一个SelectionKey
办法 | 形容 |
---|---|
int interestOps() |
感兴趣事件的汇合 boolean isInterested = interestSet & SelectionKey.OP_CONNECT … |
int readyOps() |
获取通道筹备好就绪的操作 |
SelectableChannel channel() |
获取注册通道 |
Selector selector() |
获取选择器 |
boolean isConnectable() |
检测 Channel 中是否有连贯事件就绪 |
boolean isAcceptable() |
检测 Channel 中是否有接管事件就绪 |
boolean isReadaable() |
检测 Channel 中是否有读事件就绪 |
boolean isWriteable() |
检测 Channel 中是否有写事件就绪 |
Object attach() |
将一个对象附着到 SelectionKey 上, 次要是一些用于标识的信息 |
Object attachment() |
获取注册信息 也能够在 Channel 注册的时候附着信息 SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject); |
void cancel() |
申请勾销此键的通道到其选择器的注册 |
NioServer
public class NioServer {public static void main(String[] args) throws IOException {
Integer flag = 0;
// 创立服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 非阻塞模式
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(9021));
// 创立选择器
Selector selector = Selector.open();
// 注册 接管
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 有一个事件时就操作
while (selector.select() > 0) {
// 获取事件汇合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();
// 如果是接管就绪
if (selectionKey.isAcceptable()) {
// 获取客户端连贯
SocketChannel socketChannel = serverSocketChannel.accept();
// 切换成非阻塞
socketChannel.configureBlocking(false);
// 注册在多路复用器上 读
socketChannel.register(selector, SelectionKey.OP_READ);
// 读事件
} else if (selectionKey.isReadable()) {
// 获取客户端连贯
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 设置缓存
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = 0;
while (-1 != (len = socketChannel.read(byteBuffer))) {
flag = 0;
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(),0,len));
byteBuffer.clear();}
flag++;
// 判断此时是否有 io 事件, 陷入空轮询 - 间断空轮询 100 次
// 申请勾销此键的通道在其选择器的注册, 也就是 selector.select(); 的数量 -1
if(flag == 100){selectionKey.cancel();
socketChannel.close();}
}
}
iterator.remove();}
}
}
NioClient
package com.yuan.nio.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class NioClient {public static void main(String[] args) {
try {SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost",9021));
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("hello".getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
socketChannel.close();} catch (IOException e) {e.printStackTrace();
}
}
}