共计 5698 个字符,预计需要花费 15 分钟才能阅读完成。
通过后面几章的学习,咱们曾经 可能把握了 JDK NIO 的开发方式,咱们来总结一下 NIO 开发的流程:
- 创立一个服务端通道
ServerSocketChannel
- 创立一个选择器
Selector
- 将服务端通道注册到选择器上,并且关注咱们感兴趣的事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- 绑定服务管道的地址
serverSocketChannel.bind(new InetSocketAddress(8989));
- 开始进行事件抉择,抉择咱们 感兴趣 的事件做对应的操作!
具体的代码信息请参照第一章:多路复用模型章节,这里不做太多的赘述!
无关多路复用的概念,咱们也在第一章进行了剖析。多路复用模型可能最大限度的将一个线程的执行能力榨干,一条线程执行所有的数据,包含新连贯的接入、数据的读取、计算与回写,然而假如,咱们的数据计算及其迟缓,那么该工作的执行就势必影响下一个新链接的接入!
传统 NIO 单线程模型
如图,咱们能理解到,单线程状况下,读事件因为要做一些业务性操作(数据库连贯、图片、文件下载)等操作,导致线程阻塞再,读事件的解决上,此时单线程程序无奈进行下一次新链接的解决!咱们对该线程模型进行优化,select 事件处理封装为工作,提交到线程池!
NIO 多线程模型
下面的这种数据结构可能解决掉因为计算工作耗时过长,导致新链接接入阻塞的问题,咱们是否再次进行一次优化呢?
咱们是否创立多个事件选择器,每个事件选择器,负责不同的 Socket 连贯,就像上面这种:
NIO 多线程优化模型
这样咱们就能够每一个 Select 选择器负责多个客户端 Socket 连贯,主线程只须要将客户端新连贯抉择一个选择器注册到 select 选择器上就能够了!所以咱们的架构图,就变成了下图这样:
咱们在 select 选择器外部解决计算工作的时候,也能够将工作封装为 task,提交到线程池外面去,彻底将新连贯接入和读写事件处理分来到,互不影响!事实上,这也是 Netty 的核心思想之一,咱们能够依据下面的图例,本人简略写一个:
代码实现
构建一个事件执行器 对应上图的 select 选择器
/**
* Nio 事件处理器
*
* @author huangfu
* @date
*/
public class MyNioEventLoop implements Runnable {static final ByteBuffer ALLOCATE = ByteBuffer.allocate(128);
private final Selector selector;
private final LinkedBlockingQueue<Runnable> linkedBlockingQueue;
public MyNioEventLoop(Selector selector) {
this.selector = selector;
linkedBlockingQueue = new LinkedBlockingQueue<>();}
public Selector getSelector() {return selector;}
public LinkedBlockingQueue<Runnable> getLinkedBlockingQueue() {return linkedBlockingQueue;}
// 疏忽 hashCode 和 eques
/**
* 工作处理器
*/
@Override
public void run() {while (!Thread.currentThread().isInterrupted()) {
try {
// 进行事件抉择 这里咱们只解决读事件
if (selector.select() > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 解决读事件
while (iterator.hasNext()) {SelectionKey next = iterator.next();
iterator.remove();
if (next.isReadable()) {SocketChannel channel = (SocketChannel) next.channel();
int read = channel.read(ALLOCATE);
if(read > 0) {System.out.printf("线程 %s【%s】发来消 - 息:",Thread.currentThread().getName(), channel.getRemoteAddress());
System.out.println(new String(ALLOCATE.array(), StandardCharsets.UTF_8));
}else if(read == -1) {System.out.println("连贯断开");
channel.close();}
ALLOCATE.clear();}
}
selectionKeys.clear();}else {
// 解决异步工作 进行注册
while (!linkedBlockingQueue.isEmpty()) {Runnable take = linkedBlockingQueue.take();
// 异步事件执行
take.run();}
}
} catch (IOException | InterruptedException e) {e.printStackTrace();
}
}
}
}
构建一个选择器组
/**
* 选择器组
*
* @author huangfu
* @date 2021 年 3 月 12 日 09:44:37
*/
public class SelectorGroup {private final List<MyNioEventLoop> SELECTOR_GROUP = new ArrayList<>(8);
private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
private final static AtomicInteger IDX = new AtomicInteger();
/**
* 初始化选择器
* @param count 处理器数量
* @throws IOException 异样欣慰
*/
public SelectorGroup(int count) throws IOException {for (int i = 0; i < count; i++) {Selector open = Selector.open();
MyNioEventLoop myNioEventLoop = new MyNioEventLoop(open);
SELECTOR_GROUP.add(myNioEventLoop);
}
}
public SelectorGroup() throws IOException {this(AVAILABLE_PROCESSORS << 1);
}
/**
* 轮询获取一个选择器
* @return 返回一个选择器
*/
public MyNioEventLoop next(){int andIncrement = IDX.getAndIncrement();
int length = SELECTOR_GROUP.size();
return SELECTOR_GROUP.get(Math.abs(andIncrement % length));
}
}
构建一个执行器记录器
/**
* @author huangfu
* @date
*/
public class ThreadContext {
/**
* 记录以后应用过的选择器
*/
public static final Set<MyNioEventLoop> RUN_SELECT = new HashSet<>();}
构建一个新连贯接入选择器
/**
* 连接器
*
* @author huangfu
* @date 2021 年 3 月 12 日 10:15:37
*/
public class Acceptor implements Runnable {
private final ServerSocketChannel serverSocketChannel;
private final SelectorGroup selectorGroup;
public Acceptor(ServerSocketChannel serverSocketChannel, SelectorGroup selectorGroup) {
this.serverSocketChannel = serverSocketChannel;
this.selectorGroup = selectorGroup;
}
@Override
public void run() {
try {SocketChannel socketChannel = serverSocketChannel.accept();
MyNioEventLoop next = selectorGroup.next();
// 向队列追加一个注册工作
next.getLinkedBlockingQueue().offer(() -> {
try {
// 客户端注册为非阻塞
socketChannel.configureBlocking(false);
// 注册到选择器 关注一个读事件
socketChannel.register(next.getSelector(), SelectionKey.OP_READ);
} catch (Exception e) {e.printStackTrace();
}
});
// 唤醒对应的工作,让其解决异步工作
next.getSelector().wakeup();
System.out.println("检测到连贯:" + socketChannel.getRemoteAddress());
// 当以后选择器曾经被应用过了 就不再应用了,间接注册就行了
if (ThreadContext.RUN_SELECT.add(next)) {
// 启动工作
new Thread(next).start();}
} catch (IOException e) {e.printStackTrace();
}
}
}
创立启动器
/**
* 反应器
*
* @author huangfu
* @date 2021 年 3 月 12 日 10:15:14
*/
public class Reactor implements Runnable {
private final Selector selector;
public Reactor(Selector selector) {this.selector = selector;}
@Override
public void run() {
try {System.out.println("服务启动胜利");
while (!Thread.currentThread().isInterrupted()) {
// d 期待连贯事件
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {SelectionKey next = iterator.next();
iterator.remove();
// 进行数据散发
dispatch(next);
}
}
} catch (IOException e) {e.printStackTrace();
}
}
/**
* 将新连贯散发到新连贯接入器
* @param next 所有事件主键
*/
private void dispatch(SelectionKey next) {Runnable attachment = (Runnable) next.attachment();
if(attachment!=null) {attachment.run();
}
}
}
启动测试
/**
* @author huangfu
* @date
*/
public class TestMain {public static void main(String[] args) throws IOException {
// 创立一个选择器组 传递选择器组的大小 决定应用多少选择器来实现
SelectorGroup selectorGroup = new SelectorGroup(2);
// 开启一个服务端管道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 开启一个服务端专用的选择器
Selector selector = Selector.open();
// 设置非阻塞
serverSocketChannel.configureBlocking(false);
// 创立一个连接器
Acceptor acceptor = new Acceptor(serverSocketChannel, selectorGroup);
// 将服务端通道注册到服务端选择器上 这里会绑定一个新连贯接入器
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, acceptor);
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8989));
// 启动处理器
new Reactor(selector).run();}
}
总结
- 单线程下的 NIO 存在性能瓶颈,当某一计算过程迟缓的时候会阻塞住整个线程,导致影响其余事件的解决!
- 为了解决这一缺点,咱们提出了应用异步线程的形式去操作工作,将耗时较长的业务,封装为一个异步工作,提交到线程池执行!
- 为了使业务操作和新连贯接入齐全分来到,咱们做了另外一重优化,咱们封装了一个选择器组,轮询的形式获取选择器,每一个选择器都可能解决多个新连贯,socket 连贯 ->selector 选择器 = 多 -> 1,在每一个选择器外面又能够应用线程池来解决工作,进一步提高吞吐量!
正文完