关于nio:Java-NIO-基础四-选择器

4次阅读

共计 5753 个字符,预计需要花费 15 分钟才能阅读完成。

从最根底的层面上来看,选择器提供了问询通道是否就绪操作 I / O 的能力,选择器能够监控注册在下面的多个通道,通道注册时会返回选择键(记录通道与选择器之间的关联关系),选择器管理者这些注册的键、和就绪状态键的汇合

SelectableChannel
所有继承 SelectableChannel 的通道都能够在选择器中注册,FileChannel 没有继承这个类,所以无奈应用选择器

选择键(SelectionKey)选择键是选择器的重点内容,选择器就绪的通道通过返回选择键汇合来告诉

public abstract class SelectionKey {
    public static final int OP_READ
    public static final int OP_WRITE
    public static final int OP_CONNECT
    public static final int OP_ACCEPT
    public abstract SelectableChannel channel();
    public abstract Selector selector();
    public abstract void cancel();
    public abstract boolean isValid();
    public abstract int interestOps();
    public abstract void interestOps(int ops);
    public abstract int readyOps();
    public final boolean isReadable()
    public final boolean isWritable()
    public final boolean isConnectable()
    public final boolean isAcceptable()
    public final Object attach(Object ob)
    public final Object attachment()}

选择键保护了通道和选择器之间的关联,能够通过选择键获取 Channel 或 Selector,键对象示意一种非凡的关联关系,当这种关系须要终止时,能够调用 cancel()办法勾销,调用这个办法时,不会立刻被勾销,而是将这个键放到被勾销的汇合里,当 Selector 下次调用 select()办法时会真正被清理掉。当通道敞开时,选择键会主动被勾销,当选择器敞开时,所有键都会被清理掉。

一个选择器键蕴含有两个筹备好的操作汇合,包含感兴趣的事件汇合 instrest 和就绪的操作汇合 ready,通过掩码保留

感兴趣的事件汇合 interestOps()
通常一个键的 instrest 注册时就曾经确认,然而能够在注册后通过 interestOps(newOps)传入一个新的 ops 来扭转这个值

channel.register(this.selector, SelectionKey.OP_READ);

下面的代码注册的键 interest 蕴含 read 事件,能够在对通道 IO 异步解决时,扭转这个 ops 来长期勾销对 read 事件的关注,以避免反复解决未解决完的通道

就绪的操作汇合 readyOps()
通过这个办法返回就绪的操作,isReadable(),isWritable(),isConnectable(),和 isAcceptable()用来判断这些操作是否就绪,进行下一步的解决

上面列举两个示例来示范选择器的应用
单选择器单线程

public abstract class AbstractNioServer {
    protected final static String CHARSET = "utf-8";
 protected String ip;
 protected Integer port;
 protected Selector selector;
 public AbstractNioServer(String ip, Integer port) {
        this.ip = ip;
 this.port = port;
 }
    /**
 * 客户端连贯申请
 *
 * @param key
 */
 protected abstract void accept(SelectionKey key) throws IOException;
 /**
 * 读取数据
 *
 * @param key
 */
 protected abstract void read(SelectionKey key) throws IOException;
 /**
 * 初始化服务器
 *
 * @throws IOException
 */ public void init() throws IOException {
        // 设置服务器地址端口
 SocketAddress address = new InetSocketAddress(this.ip, this.port);
 // 创立服务端通道
 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
 // 绑定服务器地址
 serverSocketChannel.bind(address);
 // 设置为非阻塞模式
 serverSocketChannel.configureBlocking(false);
 // 创立一个选择器
 this.selector = Selector.open();
 // 将服务器通道注册到选择器中,ServerSocketChannel 只反对 accept 事件注册,validOps 返回 16
 serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
 }
    public void start() throws IOException {this.init();
 while (true) {int count = this.selector.select();
 if (count == 0) {
                // 没有就绪的选择键
 continue;
 }
            Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
 while (iterator.hasNext()) {SelectionKey key = iterator.next();
 if (!key.isValid()) {continue;}
                if (key.isAcceptable()) {
                    // 连贯申请
 accept(key);
 } else if (key.isReadable()) {
                    // 批改键的感兴趣事件,避免被 select 反复调用,解决完事件后及时复原
 key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
 // 读取音讯
 read(key);
 }
                iterator.remove();}
        }
    }
    /**
 * 复原键的感兴趣事件
 * @param key
 */
 protected void resumeInterOpsRead(SelectionKey key) {
        // 还原 key 的感兴趣事件
 key.interestOps(key.interestOps() | SelectionKey.OP_READ);
 // 唤醒 selector 的 select 事件
 key.selector().wakeup();
 }
}
public class SingleNioServer extends AbstractNioServer {public static void main(String[] args) {SingleNioServer server = new SingleNioServer("127.0.0.1", 1008);
 try {server.start();
 } catch (IOException e) {e.printStackTrace();
 }
    }
    public SingleNioServer(String ip, Integer port) {super(ip, port);
 }
    @Override
 protected void accept(SelectionKey key) throws IOException {ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
 //SocketChannel 反对 read、write、connect 事件注册,validOps 返回 13=1+4+8
 SocketChannel channel = serverChannel.accept();
 if (channel == null) {return;}
        System.out.println("新的连贯申请");
 channel.configureBlocking(false);
 // 如果是阻塞通道进行注册,会抛出 IllegalBlockingModeException 异样
 channel.register(this.selector, SelectionKey.OP_READ);
 }
    @Override
 protected void read(SelectionKey key) throws IOException {SocketChannel channel = (SocketChannel) key.channel();
 try {ByteBuffer buffer = ByteBuffer.allocate(1024);
 int len = channel.read(buffer);
 buffer.flip();
 if (len > 0) {String str = Charset.forName(CHARSET).decode(buffer).toString();
 System.out.println("客户端音讯:" + str);
 String msg = "音讯已收到";
 byte[] sendData = msg.getBytes(CHARSET);
 ByteBuffer sendBuffer = ByteBuffer.wrap(sendData);
 channel.write(sendBuffer);
 super.resumeInterOpsRead(key);
 } else if (len == -1) {System.out.println("socket client close");
 key.cancel();
 channel.close();}
        } catch (IOException ex) {key.cancel();
 channel.close();}
    }
}

单选择器多线程

public class MulitpleNioServer extends AbstractNioServer {public static void main(String[] args) throws IOException {MulitpleNioServer server = new MulitpleNioServer("127.0.0.1", 1008);
 server.start();}
    /**
 * 线程池
 */
 private ExecutorService executorService = Executors.newFixedThreadPool(5);
 public MulitpleNioServer(String ip, Integer port) {super(ip, port);
 }
    @Override
 protected void accept(SelectionKey key) throws IOException {ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
 //SocketChannel 反对 read、write、connect 事件注册,validOps 返回 13=1+4+8
 SocketChannel channel = serverChannel.accept();
 System.out.println("新的连贯申请");
 channel.configureBlocking(false);
 channel.register(this.selector, SelectionKey.OP_READ);
 }
    @Override
 protected void read(SelectionKey key) throws IOException {executorService.submit(new Runnable() {
            @Override
 public void run() {readData(key);
 }
        });
 }
    private void readData(SelectionKey key) {SocketChannel channel = (SocketChannel) key.channel();
 try {ByteBuffer buffer = ByteBuffer.allocate(1024);
 if (channel.isOpen()) {if (channel.isConnected()) {int len = channel.read(buffer);
 buffer.flip();
 if (len > 0) {String str = Charset.forName(CHARSET).decode(buffer).toString();
 System.out.println("客户端音讯:" + str);
 String msg = "音讯已收到";
 byte[] sendData = msg.getBytes(CHARSET);
 ByteBuffer sendBuffer = ByteBuffer.wrap(sendData);
 channel.write(sendBuffer);
 } else if (len == -1) {System.out.println("socket client close1");
 key.cancel();
 channel.close();}
                    super.resumeInterOpsRead(key);
 }
            }
        } catch (IOException ex) {System.out.println("client is close2");
 key.cancel();
 try {channel.close();
 } catch (IOException e) {e.printStackTrace();
 }
        }
    }
}
正文完
 0