从最根底的层面上来看,选择器提供了问询通道是否就绪操作I/O的能力,选择器能够监控注册在下面的多个通道,通道注册时会返回选择键(记录通道与选择器之间的关联关系),选择器管理者这些注册的键、和就绪状态键的汇合

SelectableChannel
所有继承SelectableChannel的通道都能够在选择器中注册,FileChannel没有继承这个类,所以无奈应用选择器

选择键(SelectionKey) 选择键是选择器的重点内容,选择器就绪的通道通过返回选择键汇合来告诉

public abstract class SelectionKey {    public static final int OP_READ    public static final int OP_WRITE    public static final int OP_CONNECT    public static final int OP_ACCEPT    public abstract SelectableChannel channel();    public abstract Selector selector();    public abstract void cancel();    public abstract boolean isValid();    public abstract int interestOps();    public abstract void interestOps(int ops);    public abstract int readyOps();    public final boolean isReadable()    public final boolean isWritable()    public final boolean isConnectable()    public final boolean isAcceptable()    public final Object attach(Object ob)    public final Object attachment()}

选择键保护了通道和选择器之间的关联,能够通过选择键获取Channel或Selector,键对象示意一种非凡的关联关系,当这种关系须要终止时,能够调用cancel()办法勾销,调用这个办法时,不会立刻被勾销,而是将这个键放到被勾销的汇合里,当Selector下次调用select()办法时会真正被清理掉。当通道敞开时,选择键会主动被勾销,当选择器敞开时,所有键都会被清理掉。

一个选择器键蕴含有两个筹备好的操作汇合,包含感兴趣的事件汇合instrest和就绪的操作汇合ready,通过掩码保留

感兴趣的事件汇合interestOps()
通常一个键的instrest注册时就曾经确认,然而能够在注册后通过interestOps(newOps)传入一个新的ops来扭转这个值

channel.register(this.selector, SelectionKey.OP_READ);

下面的代码注册的键interest蕴含read事件,能够在对通道IO异步解决时,扭转这个ops来长期勾销对read事件的关注,以避免反复解决未解决完的通道

就绪的操作汇合readyOps()
通过这个办法返回就绪的操作,isReadable( ),isWritable( ),isConnectable( ), 和isAcceptable( )用来判断这些操作是否就绪,进行下一步的解决

上面列举两个示例来示范选择器的应用
单选择器单线程

public abstract class AbstractNioServer {    protected final static String CHARSET = "utf-8"; protected String ip; protected Integer port; protected Selector selector; public AbstractNioServer(String ip, Integer port) {        this.ip = ip; this.port = port; }    /** * 客户端连贯申请 * * @param key */ protected abstract void accept(SelectionKey key) throws IOException; /** * 读取数据 * * @param key */ protected abstract void read(SelectionKey key) throws IOException; /** * 初始化服务器 * * @throws IOException */ public void init() throws IOException {        //设置服务器地址端口 SocketAddress address = new InetSocketAddress(this.ip, this.port); //创立服务端通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //绑定服务器地址 serverSocketChannel.bind(address); //设置为非阻塞模式 serverSocketChannel.configureBlocking(false); //创立一个选择器 this.selector = Selector.open(); //将服务器通道注册到选择器中,ServerSocketChannel只反对accept事件注册,validOps返回16 serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); }    public void start() throws IOException {        this.init(); while (true) {            int count = this.selector.select(); if (count == 0) {                //没有就绪的选择键 continue; }            Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator(); while (iterator.hasNext()) {                SelectionKey key = iterator.next(); if (!key.isValid()) {                    continue; }                if (key.isAcceptable()) {                    //连贯申请 accept(key); } else if (key.isReadable()) {                    //批改键的感兴趣事件,避免被 select 反复调用,解决完事件后及时复原 key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); //读取音讯 read(key); }                iterator.remove(); }        }    }    /** * 复原键的感兴趣事件 * @param key */ protected void resumeInterOpsRead(SelectionKey key) {        //还原key的感兴趣事件 key.interestOps(key.interestOps() | SelectionKey.OP_READ); //唤醒selector的select事件 key.selector().wakeup(); }}
public class SingleNioServer extends AbstractNioServer {    public static void main(String[] args) {        SingleNioServer server = new SingleNioServer("127.0.0.1", 1008); try {            server.start(); } catch (IOException e) {            e.printStackTrace(); }    }    public SingleNioServer(String ip, Integer port) {        super(ip, port); }    @Override protected void accept(SelectionKey key) throws IOException {        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); //SocketChannel反对 read、write、connect 事件注册,validOps返回13=1+4+8 SocketChannel channel = serverChannel.accept(); if (channel == null) {            return; }        System.out.println("新的连贯申请"); channel.configureBlocking(false); //如果是阻塞通道进行注册,会抛出 IllegalBlockingModeException 异样 channel.register(this.selector, SelectionKey.OP_READ); }    @Override protected void read(SelectionKey key) throws IOException {        SocketChannel channel = (SocketChannel) key.channel(); try {            ByteBuffer buffer = ByteBuffer.allocate(1024); int len = channel.read(buffer); buffer.flip(); if (len > 0) {                String str = Charset.forName(CHARSET).decode(buffer).toString(); System.out.println("客户端音讯:" + str); String msg = "音讯已收到"; byte[] sendData = msg.getBytes(CHARSET); ByteBuffer sendBuffer = ByteBuffer.wrap(sendData); channel.write(sendBuffer); super.resumeInterOpsRead(key); } else if (len == -1) {                System.out.println("socket client close"); key.cancel(); channel.close(); }        } catch (IOException ex) {            key.cancel(); channel.close(); }    }}

单选择器多线程

public class MulitpleNioServer extends AbstractNioServer {    public static void main(String[] args) throws IOException {        MulitpleNioServer server = new MulitpleNioServer("127.0.0.1", 1008); server.start(); }    /** * 线程池 */ private ExecutorService executorService = Executors.newFixedThreadPool(5); public MulitpleNioServer(String ip, Integer port) {        super(ip, port); }    @Override protected void accept(SelectionKey key) throws IOException {        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); //SocketChannel反对 read、write、connect 事件注册,validOps返回13=1+4+8 SocketChannel channel = serverChannel.accept(); System.out.println("新的连贯申请"); channel.configureBlocking(false); channel.register(this.selector, SelectionKey.OP_READ); }    @Override protected void read(SelectionKey key) throws IOException {        executorService.submit(new Runnable() {            @Override public void run() {                readData(key); }        }); }    private void readData(SelectionKey key) {        SocketChannel channel = (SocketChannel) key.channel(); try {            ByteBuffer buffer = ByteBuffer.allocate(1024); if (channel.isOpen()) {                if (channel.isConnected()) {                    int len = channel.read(buffer); buffer.flip(); if (len > 0) {                        String str = Charset.forName(CHARSET).decode(buffer).toString(); System.out.println("客户端音讯:" + str); String msg = "音讯已收到"; byte[] sendData = msg.getBytes(CHARSET); ByteBuffer sendBuffer = ByteBuffer.wrap(sendData); channel.write(sendBuffer); } else if (len == -1) {                        System.out.println("socket client close1"); key.cancel(); channel.close(); }                    super.resumeInterOpsRead(key); }            }        } catch (IOException ex) {            System.out.println("client is close2"); key.cancel(); try {                channel.close(); } catch (IOException e) {                e.printStackTrace(); }        }    }}