史上最强Java-NIO入门担心从入门到放弃的请读这篇

本文原题“《NIO 入门》,作者为“Gregory M. Travis”,他是《JDK 1.4 Tutorial》等书籍的作者。 1、引言Java NIO是Java 1.4版加入的新特性,虽然Java技术日新月异,但历经10年,NIO依然为Java技术领域里最为重要的基础技术栈,而且依据现实的应用趋势,在可以预见的未来,它仍将继续在Java技术领域占据重要位置。 网上有关Java NIO的技术文章,虽然写的也不错,但通常是看完一篇马上懵逼。接着再看!然后,会更懵逼。。。 哈哈哈! 本文作者厚积薄发,以远比一般的技术博客或技术作者更深厚的Java技术储备,为你由浅入深,从零讲解到底什么是Java NIO。本文即使没有多少 Java 编程经验的读者也能很容易地开始学习 NIO。 (本文同步发布于:http://www.52im.net/thread-26...) 2、关于作者Gregory M. Travis:技术顾问、多产的技术作家,现居纽约。他从Java语言发布的第1天起,就已经是Java程序员啦! Gregory M. Travis是《JDK 1.4 Tutorial》一书的作者,Java程序员应该都清楚,能写好JDK Tutorial这种书籍或手册的,除了SUN(现在是Oracle)公司的Java创建者们,余下的也只有各路实打实的Java大牛们才能hold住。 3、在开始之前3.1 关于本教程 新的输入/输出 (NIO) 库是在 JDK 1.4 中引入的。NIO 弥补了原来的 I/O 的不足,它在标准 Java 代码中提供了高速的、面向块的 I/O。通过定义包含数据的类,以及通过以块的形式处理这些数据,NIO 不用使用本机代码就可以利用低级优化,这是原来的 I/O 包所无法做到的。 在本教程中,我们将讨论 NIO 库的几乎所有方面,从高级的概念性内容到底层的编程细节。除了学习诸如缓冲区和通道这样的关键 I/O 元素外,您还有机会看到在更新后的库中标准 I/O 是如何工作的。您还会了解只能通过 NIO 来完成的工作,如异步 I/O 和直接缓冲区。 在本教程中,我们将使用展示 NIO 库的不同方面的代码示例。几乎每一个代码示例都是一个大的 Java 程序的一部分,您可以在本文末的附件中下载到这个 Java 程序。在做这些练习时,我们推荐您在自己的系统上下载、编译和运行这些程序。在您学习了本教程以后,这些代码将为您的 NIO 编程努力提供一个起点。 本教程是为希望学习更多关于 Java NIO 库的知识的所有程序员而写的。为了最大程度地从这里的讨论中获益,您应该理解基本的 Java 编程概念,如类、继承和使用包。多少熟悉一些原来的 I/O 库(来自java.io.* 包)也会有所帮助。 ...

June 29, 2019 · 9 min · jiezi

少啰嗦一分钟带你读懂Java的NIO和经典IO的区别

1、引言很多初涉网络编程的程序员,在研究Java NIO(即异步IO)和经典IO(也就是常说的阻塞式IO)的API时,很快就会发现一个问题:我什么时候应该使用经典IO,什么时候应该使用NIO? 在本文中,将尝试用简明扼要的文字,阐明Java NIO和经典IO之间的差异、典型用例,以及这些差异如何影响我们的网络编程或数据传输代码的设计和实现的。 本文没有复杂理论,也没有像网上基它文章一样千篇一律的复制粘贴,有的只是接地气的通俗易懂,希望能给你带来帮助。 (本文同步发布于:http://www.52im.net/thread-26...) 2、相关文章《Java新一代网络编程模型AIO原理及Linux系统AIO介绍》《Java NIO基础视频教程、MINA视频教程、Netty快速入门视频》 3、Java NIO和IO的主要区别下表总结了Java NIO和IO之间的主要区别。我将在表格后面的部分中详细介绍每个区别。 3.1 Stream Oriented vs. Buffer OrientedJava NIO和IO之间的第一个重要区别是IO是面向流的,其中NIO是面向缓冲区的。那么,这意味着什么? 面向流的Java IO意味着您可以从流中一次读取一个或多个字节。你对读取的字节做什么取决于你。它们不会缓存在任何地方。此外,您无法在流中的数据中前后移动。如果需要在从流中读取的数据中前后移动,则需要先将其缓存在缓冲区中。 Java NIO的面向缓冲区的方法略有不同。数据被读入缓冲区,稍后处理该缓冲区。你可以根据需要在缓冲区中前后移动。这使你在处理过程中具有更大的灵活性。但是,你还需要检查缓冲区是否包含完整处理所需的所有数据。并且,你需要确保在将更多数据读入缓冲区时,不要覆盖尚未处理的缓冲区中的数据。 3.2 Blocking vs. Non-blocking IOJava IO的各种流都是blocking的。这意味着,当线程调用read()或write()时,该线程将被阻塞,直到有一些数据要读取,或者数据被完全写入,在此期间,该线程无法执行任何其他操作。 Java NIO的非阻塞模式允许线程请求从通道读取数据,并且只获取当前可用的内容,或者根本没有数据,如果当前没有数据可用。线程可以继续使用其他内容,而不是在数据可供读取之前保持阻塞状态。 非阻塞写入也是如此,线程可以请求将某些数据写入通道,但不要等待它完全写入。然后线程可以继续并在同一时间做其他事情。 线程在IO调用中没有阻塞时花费空闲时间,通常在此期间在其他通道上执行IO。也就是说,单个线程现在可以管理多个输入和输出通道。 4、SelectorsJava NIO的选择器允许单个线程监视多个输入通道。你可以使用选择器注册多个通道,然后使用单个线程“选择”具有可用于处理的输入的通道,或者选择准备写入的通道。这种选择器机制使单个线程可以轻松管理多个通道。 5、NIO和经典IO如何影响应用程序的设计?选择NIO或IO作为IO工具包可能会影响应用程序设计的以下方面: 1)API调用NIO或IO类; 2)处理数据; 3)用于处理数据的线程数。 5.1 API调用当然,使用NIO时的API调用看起来与使用IO时不同。这并不奇怪。而不是仅仅从例如InputStream读取字节的数据字节,必须首先将数据读入缓冲区,然后从那里进行处理。 5.2 数据处理使用纯NIO设计与IO设计时,数据处理也会受到影响。 在IO设计中,您从InputStream或Reader中读取字节的数据字节。想象一下,您正在处理基于行的文本数据流。 例如: Name: AnnaAge: 25Email: [url=mailto:anna@mailserver.com]anna@mailserver.com[/url]Phone: 1234567890这个文本行流可以像这样处理: InputStream input = ... ; // get the InputStream from the client socketBufferedReader reader = newBufferedReader(newInputStreamReader(input));String nameLine = reader.readLine();String ageLine = reader.readLine();String emailLine = reader.readLine();String phoneLine = reader.readLine();注意处理状态是如何,由程序执行的程度决定的。换句话说,一旦第一个reader.readLine()方法返回,您就确定已经读取了整行文本。readLine()会阻塞直到读取整行,这就是原因。您还知道此行包含名称。同样,当第二个readLine()调用返回时,您知道此行包含年龄等。 ...

June 25, 2019 · 1 min · jiezi

dubbo源码解析(十五)远程通信——Mina

远程通讯——Mina目标:介绍基于Mina的来实现的远程通信、介绍dubbo-remoting-mina内的源码解析。前言Apache MINA是一个网络应用程序框架,可帮助用户轻松开发高性能和高可扩展性的网络应用程序。它通过Java NIO在各种传输(如TCP / IP和UDP / IP)上提供抽象的事件驱动异步API。它通常被称为NIO框架库、客户端服务器框架库或者网络套接字库。那么本问就要讲解在dubbo项目中,基于mina的API实现服务端和客户端来完成远程通讯这件事情。下面是mina实现的包结构:源码分析(一)MinaChannel该类继承了AbstractChannel,是基于mina实现的通道。1.属性private static final Logger logger = LoggerFactory.getLogger(MinaChannel.class);/** * 通道的key /private static final String CHANNEL_KEY = MinaChannel.class.getName() + “.CHANNEL”;/* * mina中的一个句柄,表示两个端点之间的连接,与传输类型无关 /private final IoSession session;该类的属性除了封装了一个CHANNEL_KEY以外,还用到了mina中的IoSession,它封装着一个连接所需要的方法,比如获得远程地址等。2.getOrAddChannelstatic MinaChannel getOrAddChannel(IoSession session, URL url, ChannelHandler handler) { // 如果连接session为空,则返回空 if (session == null) { return null; } // 获得MinaChannel实例 MinaChannel ret = (MinaChannel) session.getAttribute(CHANNEL_KEY); // 如果不存在,则创建 if (ret == null) { // 创建一个MinaChannel实例 ret = new MinaChannel(session, url, handler); // 如果两个端点连接 if (session.isConnected()) { // 把新创建的MinaChannel添加到session 中 MinaChannel old = (MinaChannel) session.setAttribute(CHANNEL_KEY, ret); // 如果属性的旧值不为空,则重新设置旧值 if (old != null) { session.setAttribute(CHANNEL_KEY, old); ret = old; } } } return ret;}该方法是一个获得MinaChannel对象的方法,其中每一个MinaChannel都会被放在session的属性值中。3.removeChannelIfDisconnectedstatic void removeChannelIfDisconnected(IoSession session) { if (session != null && !session.isConnected()) { session.removeAttribute(CHANNEL_KEY); }}该方法是当没有连接时移除该通道,比较简单。4.send@Overridepublic void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { // 发送消息,返回future WriteFuture future = session.write(message); // 如果已经发送过了 if (sent) { // 获得延迟时间 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 等待timeout的连接时间后查看是否发送成功 success = future.join(timeout); } } catch (Throwable e) { throw new RemotingException(this, “Failed to send message " + message + " to " + getRemoteAddress() + “, cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, “Failed to send message " + message + " to " + getRemoteAddress() + “in timeout(” + timeout + “ms) limit”); }}该方法是最关键的发送消息,其中调用到了session的write方法,就是mina封装的发送消息。并且根据返回的WriteFuture对象来判断是否发送成功。(二)MinaHandler该类继承了IoHandlerAdapter,是通道处理器实现类,其中就是mina项目中IoHandler接口的几个 方法。/* * url对象 /private final URL url;/* * 通道处理器对象 /private final ChannelHandler handler;该类有两个属性,上述提到的实现IoHandler接口方法都是调用了handler来实现的,我就举例讲一个,其他的都一样的写法:@Overridepublic void sessionOpened(IoSession session) throws Exception { // 获得MinaChannel对象 MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler); try { // 调用接连该通道 handler.connected(channel); } finally { // 如果没有连接则移除通道 MinaChannel.removeChannelIfDisconnected(session); }}该方法在IoHandler中叫做sessionOpened,其实就是连接方法,所以调用的是handler.connected。其他方法也一样,请自行查看。(三)MinaClient该类继承了AbstractClient类,是基于mina实现的客户端类。1.属性/* * 套接字连接集合 /private static final Map<String, SocketConnector> connectors = new ConcurrentHashMap<String, SocketConnector>();/* * 连接的key /private String connectorKey;/* * 套接字连接者 /private SocketConnector connector;/* * 一个句柄 /private volatile IoSession session; // volatile, please copy reference to use该类中的属性都跟mina项目中封装类有关系。2.doOpen@Overrideprotected void doOpen() throws Throwable { // 用url来作为key connectorKey = getUrl().toFullString(); // 先从集合中取套接字连接 SocketConnector c = connectors.get(connectorKey); if (c != null) { connector = c; // 如果为空 } else { // set thread pool. 设置线程池 connector = new SocketConnector(Constants.DEFAULT_IO_THREADS, Executors.newCachedThreadPool(new NamedThreadFactory(“MinaClientWorker”, true))); // config 获得套接字连接配置 SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig(); cfg.setThreadModel(ThreadModel.MANUAL); // 启用TCP_NODELAY cfg.getSessionConfig().setTcpNoDelay(true); // 启用SO_KEEPALIVE cfg.getSessionConfig().setKeepAlive(true); int timeout = getConnectTimeout(); // 设置连接超时时间 cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000); // set codec. // 设置编解码器 connector.getFilterChain().addLast(“codec”, new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this))); // 加入集合 connectors.put(connectorKey, connector); }}该方法是打开客户端,在mina中用套接字连接者connector来表示。其中的操作就是新建一个connector,并且设置相应的属性,然后加入集合。3.doConnect@Overrideprotected void doConnect() throws Throwable { // 连接服务器 ConnectFuture future = connector.connect(getConnectAddress(), new MinaHandler(getUrl(), this)); long start = System.currentTimeMillis(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); // 用于对线程的阻塞和唤醒 final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock // 加入监听器 future.addListener(new IoFutureListener() { @Override public void operationComplete(IoFuture future) { try { // 如果已经读完了 if (future.isReady()) { // 创建获得该连接的IoSession实例 IoSession newSession = future.getSession(); try { // Close old channel 关闭旧的session IoSession oldSession = MinaClient.this.session; // copy reference if (oldSession != null) { try { if (logger.isInfoEnabled()) { logger.info(“Close old mina channel " + oldSession + " on create new mina channel " + newSession); } // 关闭连接 oldSession.close(); } finally { // 移除通道 MinaChannel.removeChannelIfDisconnected(oldSession); } } } finally { // 如果MinaClient关闭了 if (MinaClient.this.isClosed()) { try { if (logger.isInfoEnabled()) { logger.info(“Close new mina channel " + newSession + “, because the client closed.”); } // 关闭session newSession.close(); } finally { MinaClient.this.session = null; MinaChannel.removeChannelIfDisconnected(newSession); } } else { // 设置新的session MinaClient.this.session = newSession; } } } } catch (Exception e) { exception.set(e); } finally { // 减少数量,释放所有等待的线程 finish.countDown(); } } }); try { // 当前线程等待,直到锁存器倒计数到零,除非线程被中断,或者指定的等待时间过去 finish.await(getConnectTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RemotingException(this, “client(url: " + getUrl() + “) failed to connect to server " + getRemoteAddress() + " client-side timeout " + getConnectTimeout() + “ms (elapsed: " + (System.currentTimeMillis() - start) + “ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + “, cause: " + e.getMessage(), e); } Throwable e = exception.get(); if (e != null) { throw e; }}该方法是客户端连接服务器的实现方法。其中用到了CountDownLatch来代表完成完成事件,它来做一个线程等待,直到1个线程完成上述的动作,也就是连接完成结束,才释放等待的线程。保证每次只有一条线程去连接,解决future.awaitUninterruptibly()死锁问题。其他方法请自行查看我写的注释。(四)MinaServer该类继承了AbstractServer,是基于mina实现的服务端实现类。1.属性private static final Logger logger = LoggerFactory.getLogger(MinaServer.class);/* * 套接字接收者对象 /private SocketAcceptor acceptor;2.doOpen@Overrideprotected void doOpen() throws Throwable { // set thread pool. // 创建套接字接收者对象 acceptor = new SocketAcceptor(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), Executors.newCachedThreadPool(new NamedThreadFactory(“MinaServerWorker”, true))); // config // 设置配置 SocketAcceptorConfig cfg = (SocketAcceptorConfig) acceptor.getDefaultConfig(); cfg.setThreadModel(ThreadModel.MANUAL); // set codec. 设置编解码器 acceptor.getFilterChain().addLast(“codec”, new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this))); // 开启服务器 acceptor.bind(getBindAddress(), new MinaHandler(getUrl(), this));}该方法是创建服务器,并且打开服务器。关键就是调用了acceptor的方法。3.doClose@Overrideprotected void doClose() throws Throwable { try { if (acceptor != null) { // 取消绑定,也就是关闭服务器 acceptor.unbind(getBindAddress()); } } catch (Throwable e) { logger.warn(e.getMessage(), e); }}该方法是关闭服务器,就是调用了acceptor.unbind方法。4.getChannels@Overridepublic Collection<Channel> getChannels() { // 获得连接到该服务器到所有连接句柄 Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress()); Collection<Channel> channels = new HashSet<Channel>(); for (IoSession session : sessions) { if (session.isConnected()) { // 每次都用一个连接句柄创建一个通道 channels.add(MinaChannel.getOrAddChannel(session, getUrl(), this)); } } return channels;}该方法是获得所有连接该服务器的通道。5.getChannel@Overridepublic Channel getChannel(InetSocketAddress remoteAddress) { // 获得连接到该服务器到所有连接句柄 Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress()); // 遍历所有句柄,找到要找的通道 for (IoSession session : sessions) { if (session.getRemoteAddress().equals(remoteAddress)) { return MinaChannel.getOrAddChannel(session, getUrl(), this); } } return null;}该方法是获得地址对应的单个通道。(五)MinaTransporterpublic class MinaTransporter implements Transporter { public static final String NAME = “mina”; @Override public Server bind(URL url, ChannelHandler handler) throws RemotingException { // 创建MinaServer实例 return new MinaServer(url, handler); } @Override public Client connect(URL url, ChannelHandler handler) throws RemotingException { // 创建MinaClient实例 return new MinaClient(url, handler); }}该类实现了Transporter接口,是基于mina的传输层实现。可以看到,bind和connect方法分别就是创建了MinaServer和MinaClient实例。这里我建议查看一下《dubbo源码解析(九)远程通信——Transport层》。(六)MinaCodecAdapter该类是基于mina实现的编解码类,实现了ProtocolCodecFactory。1.属性/* * 编码对象 /private final ProtocolEncoder encoder = new InternalEncoder();/* * 解码对象 /private final ProtocolDecoder decoder = new InternalDecoder();/* * 编解码器 /private final Codec2 codec;/* * url对象 /private final URL url;/* * 通道处理器对象 /private final ChannelHandler handler;/* * 缓冲区大小 */private final int bufferSize;属性比较好理解,该编解码器用到了ProtocolEncoder和ProtocolDecoder,而InternalEncoder和InternalDecoder两个类是该类的内部类,它们实现了ProtocolEncoder和ProtocolDecoder,关键的编解码逻辑在这两个类中实现。2.构造方法public MinaCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) { this.codec = codec; this.url = url; this.handler = handler; int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE); // 如果缓存区大小在16字节以内,则设置配置大小,如果不是,则设置8字节的缓冲区大小 this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;}3.InternalEncoderprivate class InternalEncoder implements ProtocolEncoder { @Override public void dispose(IoSession session) throws Exception { } @Override public void encode(IoSession session, Object msg, ProtocolEncoderOutput out) throws Exception { // 动态分配一个1k的缓冲区 ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024); // 获得通道 MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler); try { // 编码 codec.encode(channel, buffer, msg); } finally { // 检测是否断开连接,如果断开,则移除 MinaChannel.removeChannelIfDisconnected(session); } // 写数据到out中 out.write(ByteBuffer.wrap(buffer.toByteBuffer())); out.flush(); }}该内部类是编码类,其中的encode方法中写到了编码核心调用的是codec.encode。4.InternalDecoderprivate class InternalDecoder implements ProtocolDecoder { private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER; @Override public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { int readable = in.limit(); if (readable <= 0) return; ChannelBuffer frame; // 如果缓冲区还有可读字节数 if (buffer.readable()) { // 如果缓冲区是DynamicChannelBuffer类型的 if (buffer instanceof DynamicChannelBuffer) { // 往buffer中写入数据 buffer.writeBytes(in.buf()); frame = buffer; } else { // 缓冲区大小 int size = buffer.readableBytes() + in.remaining(); // 动态分配一个缓冲区 frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize); // buffer的数据把写到frame frame.writeBytes(buffer, buffer.readableBytes()); // 把流中的数据写到frame frame.writeBytes(in.buf()); } } else { // 否则是基于Java NIO的ByteBuffer生成的缓冲区 frame = ChannelBuffers.wrappedBuffer(in.buf()); } // 获得通道 Channel channel = MinaChannel.getOrAddChannel(session, url, handler); Object msg; int savedReadIndex; try { do { // 获得读索引 savedReadIndex = frame.readerIndex(); try { // 解码 msg = codec.decode(channel, frame); } catch (Exception e) { buffer = ChannelBuffers.EMPTY_BUFFER; throw e; } // 拆包 if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { frame.readerIndex(savedReadIndex); break; } else { if (savedReadIndex == frame.readerIndex()) { buffer = ChannelBuffers.EMPTY_BUFFER; throw new Exception(“Decode without read data.”); } if (msg != null) { // 把数据写到输出流里面 out.write(msg); } } } while (frame.readable()); } finally { // 如果frame还有可读数据 if (frame.readable()) { //丢弃可读数据 frame.discardReadBytes(); buffer = frame; } else { buffer = ChannelBuffers.EMPTY_BUFFER; } MinaChannel.removeChannelIfDisconnected(session); } } @Override public void dispose(IoSession session) throws Exception { } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { }}该内部类是解码类,其中decode方法中关键的是调用了codec.decode,其余的操作是利用缓冲区对数据的冲刷流转。后记该部分相关的源码解析地址:https://github.com/CrazyHZM/i…该文章讲解了基于mina的来实现的远程通信、介绍dubbo-remoting-mina内的源码解析,关键需要对mina有所了解。下一篇我会讲解基于netty3实现远程通信部分。 ...

December 26, 2018 · 6 min · jiezi