关于nio:大话网络通信

1、术语并发 vs 并行并发和并行是相干的概念,但有一些小的区别。并发意味着两个或多个工作正在获得停顿,即便它们可能不会同时执行。例如,这能够通过工夫切片来实现,其中局部工作按程序执行,并与其余工作的局部混合。另一方面,当执行的工作能够真正同时进行时,就会呈现并行简略说启动一个线程在一个core上就是并行,启动两个线程在一个core上就是并发异步 vs 同步如果调用者在办法返回值或引发异样之前无奈获得停顿,则认为办法调用是同步的。另一方面,异步调用容许调用者在无限的步骤之后持续进行,并且能够通过一些附加机制 (它可能是已注册的回调、Future 或音讯)来告诉办法的实现简略来说Java API层来说的,如下 :ExecutorService executorService = Executors.newFixedThreadPool(2); Future<Boolean> future = executorService.submit(new Callable<Boolean>() { @Override public Boolean call() throws Exception { System.out.println("执行业务逻辑"); // 依据业务逻辑判断给定返回 return true; } }); future.get(); // 同步API,必须等到返回 if(future.isDone()) { future.get();// 异步API,只有执行完,再get后果 } 同步 API 能够应用阻塞来实现同步,但这不是必要的。CPU 密集型工作可能会产生相似 于阻塞的行为。一般来说,最好应用异步 API,因为它们保证系统可能进行非阻塞 vs 阻塞如果一个线程的提早能够无限期地提早其余一些线程,这就是咱们探讨的阻塞。一个很好的例子是,一个线程能够应用互斥来独占应用一个资源。如果一个线程无限期地占用资源(例如意外运行有限循环),则期待该资源的其余线程将无奈进行。相同,非阻塞意味着没有线程可能无限期地提早其余线程非阻塞操作优先于阻塞操作,因为当零碎蕴含阻塞操作时,零碎的总体进度并不能失去很好的保障 死锁 vs 饥饿 vs 活锁当多个线程在期待对方达到某个特定的状态以便可能获得停顿时,就会呈现死锁。因为没有其余线程达到某种状态,所有受影响的子系统都无奈持续运行。死锁与阻塞密切相关,因为线程可能无限期地提早其余线程的过程在死锁的状况下,没有线程能够获得停顿,相同,当有线程能够获得停顿,但可能有一个或多个线程不能获得停顿时,就会产生饥饿。典型的场景是一个调度算法,它总是抉择高优先级的工作而不是低优先级的工作。如果传入的高优先级工作的数量始终足够多,那么低优先级工作将永远不会实现活锁相似于死锁,因为没有线程获得停顿。不同之处在于,线程不会被解冻在期待别人停顿的状态中,而是一直地扭转本人的状态。一个示例场景是,两个线程有两个雷同资源可用时。他们每一个都试图取得资源,但他们也会查看对方是否也须要资源。 如果资源是由另一个线程申请的,他们会尝试获取该资源的另一个实例。在可怜的情 况下,两个线程可能会在两种资源之间“反弹”,从不获取资源,但总是屈服于另一种资源2、BIO vs NIOBIOserverSocket.accept(),这里会阻塞socket.getInputStream.read(),也会阻塞 尽管能够应用了线程池,因为read()办法的阻塞,其实线程池也是不能复用的,说白了,就是须要一个客户端一个线程进行服务 思考:那BIO就没有应用场景了吗?其实不是,BIO在建设长连贯的流式传输场景还是很有用的,比如说HDSF,客户端向DataNode传输数据应用的就是建设一个BIO的管道,流式上传数据的。此时引入一个问题,那HDFS DataNode就不思考到线程阻塞么?是这样的,其实要晓得你不可能多个客户端上传文件都是针对某个DataNode(NameNode会进行抉择DataNode),所以线程阻塞的压力是会摊派的。NIO还是善于小数据量的RPC申请,能承受百万客户端的连贯 NIONIO中有三个重要组件 : Buffer(ByteBuffer次要应用)、Channel(双向通道,可读可写)和Selector(多路复用选择器) Buffer罕用的就是 ByteBuffer,缓冲池,能够作为channel写的单位,也能够承受channel读取的返回外面重要的属性 :position、capacity、flip、limit和hasRemain 每个channel都须要记录可能切分的音讯,因为ByteBuffer不能被多个channel应用,因而须要为每个channel保护一个独立的ByteBuffer。ByteBuffer不能太大,比方一个ByteBuffer 1M的话,须要反对百万连贯要1TB内存,因而须要设计大小可变的ByteBuffer 1、首先调配一个较小的buffer,比方4k,如果发现不够的话,再调配8kb的buffer,将4kb buffer内容拷贝到8kb buffer,有点是音讯间断容易解决,毛病是数据拷贝消耗性能 2、多个数组组成buffer,一个数组不够,把多进去的内容写入新的数组,毛病不间断解析简单,有点防止了拷贝引起的性能损耗 ...

March 29, 2023 · 1 min · jiezi

关于nio:分布式-令人头疼的堆外内存泄露怎么排查

作者:鲍凤其 爱可生 dble 团队开发成员,次要负责 dble 需要开发,故障排查和社区问题解答。少说废话,放码过去。 本文起源:原创投稿 *爱可生开源社区出品,原创内容未经受权不得随便应用,转载请分割小编并注明起源。 大家在应用 Java NIO 的过程中,是不是也遇到过堆外内存泄露的问题?是不是也苦恼过如何排查? 上面就给大家介绍一个在dble中排查堆外内存泄露的案例。 景象有客户在应用dble之后,有一天dble对后端MySQL实例的心跳检测全副超时,导致业务中断,最初通过重启解决。 剖析过程dble 日志首先当然是剖析dble日志。从dble日志中能够发现: 故障工夫点所有后端 MySQL 实例心跳都超时日志中呈现大量“You may need to turn up page size. The maximum size of the DirectByteBufferPool that can be allocated at one time is 2097152, and the size that you would like to allocate is 4194304”的日志日志片段: //心跳超时2022-08-15 11:40:32.147 WARN [TimerScheduler-0] (com.actiontech.dble.backend.heartbeat.MySQLHeartbeat.setTimeout(MySQLHeartbeat.java:251)) - heartbeat to [xxxx:3306] setTimeout, previous status is 1 // 堆外内存可能泄露的可疑日志2022-08-15 11:40:32.153 WARN [$_NIO_REACTOR_BACKEND-20-RW] (com.actiontech.dble.buffer.DirectByteBufferPool.allocate(DirectByteBufferPool.java:76)) - You may need to turn up page size. The maximum size of the DirectByteBufferPool that can be allocated at one time is 2097152, and the size that you would like to allocate is 4194304通过下面的日志猜想: ...

December 15, 2022 · 3 min · jiezi

关于nio:NIO源码JavaNIO源码-JNI分析二Java-NIO源码分析

没看过的倡议先看上一篇,原本打算讲讲linux内核,也看了一些书籍,可是c放了太久了,看代码切实头疼,就先放弃了,写写业务也没必要卷这么深吧。就讲到调用底层api为止我感觉刚刚好。不太善于将源码联合讲故事,所以整片略显干燥,将就看下吧~~demopublic class ServerConnect{ public static void main(String[] args) { selector(); } public static void handleAccept(SelectionKey key) throws IOException{ ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel(); SocketChannel sc = ssChannel.accept(); sc.configureBlocking(false); sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024)); } public static void handleRead(SelectionKey key) throws IOException{ SocketChannel sc = (SocketChannel)key.channel(); ByteBuffer buf = (ByteBuffer)key.attachment(); long bytesRead = sc.read(buf); while(bytesRead>0){ buf.flip(); while(buf.hasRemaining()){ System.out.print((char)buf.get()); } buf.clear(); bytesRead = sc.read(buf); } if(bytesRead == -1){ sc.close(); } } public static void handleWrite(SelectionKey key) throws IOException{ ByteBuffer buf = (ByteBuffer)key.attachment(); buf.flip(); SocketChannel sc = (SocketChannel) key.channel(); while(buf.hasRemaining()){ sc.write(buf); } buf.compact(); } public static void selector() { Selector selector = null; ServerSocketChannel ssc = null; try{ selector = Selector.open(); ssc= ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(8080)); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT); while(true){ if(selector.select(3000) == 0){ continue; } Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while(iter.hasNext()){ SelectionKey key = iter.next(); if(key.isAcceptable()){ handleAccept(key); } if(key.isReadable()){ handleRead(key); } if(key.isWritable() && key.isValid()){ handleWrite(key); } if(key.isConnectable()){ System.out.println("isConnectable = true"); } iter.remove(); } } }catch(IOException e){ e.printStackTrace(); }finally{ try{ if(selector!=null){ selector.close(); } if(ssc!=null){ ssc.close(); } }catch(IOException e){ e.printStackTrace(); } } }}源码剖析Selector.open()此处会有SelectorProvider是个模板办法,不必零碎的实现不同,这就很蛋疼,没有EPollSelectorProvider,莫的方法,只能去github找了一份源码迁徙到gitee,jdk8感兴趣的能够自行下载,想看其余版本的同学就要自行寻找了。 ...

September 23, 2022 · 9 min · jiezi

关于nio:Netty网络编程NIO与零拷贝

1.什么是DMA2.什么是用户态和内核态3.一般BIO的拷贝流程剖析4.mmap零碎函数5.sendFile零碎函数(零拷贝)6.java堆外内存如何回收 1.什么是DMA DMA(Direct Memory Access间接存储器拜访),咱们先从一张图来理解一下DMA是一个什么安装。 假如在什么没有DMA的状况下,如果CPU想从内存里读取数据并发送到网卡中,在读的过程中,咱们能够晓得:1.1)CPU的速度最快。1.2)当CPU在内存中读取数据的时候,读取的速度瓶颈在于内存的读写速度。1.3)当CPU实现读取,将数据写入网卡的时候,写入的速度瓶颈在于网卡的速度。1.4)CPU在读写的时候,是无奈做其它事件的。 这个时候咱们就能够得出结论: 1.5)cpu的速度取决于这一系列操作环上最慢的那一个。1.6)cpu利用率极低,大部分工夫都在期待IO。 此时如果有了DMA,那么咱们的读写就会变得和如图一样: CPU只须要把读写工作委托给DMA,帮助CPU搬运数据,这些操作都由DMA主动执行,而不须要依赖于CPU的大量中断负载,此时cpu就能够去做其它的事件了。 2.什么是用户态和内核态其实最初咱们的服务器程序,都是要在linux上运行的,Linux依据命令的重要水平,也分为不同的权限。Linux操作系统就将权限分成了2个等级,别离就是用户态和内核态。 用户态:用户态的过程可能拜访的资源就有极大的限度,有些指令操作无关痛痒,随便执行都没事,大部分都是属于用户态的。 内核态:运行在内核态的过程能够“随心所欲”,能够间接调用操作系统内核函数。 比方:咱们调用malloc函数申请动态内存,此时cpu就要从用户态切换到内核态,调用操作系统底层函数来申请空间。 3.一般BIO的拷贝流程剖析 咱们来看一下一般IO的拷贝流程: 咱们来看这一段代码: 咱们先从服务器上读取了一个文件,而后通过连贯和流传输到申请客户端上,咱们能够看到大抵的申请流程是这样的: 当程序或者操作者对CPU收回指令,这些指令和数据暂存在内存里,在CPU闲暇时传送给CPU,CPU解决后把后果输入到输出设备上 3.1)用户态程序接到申请,要从磁盘上读取文件,切换到内核态,这里是第1次用户态内核态切换。3.2)当要读取的文件通过DMA复制到内核缓冲区的时候,咱们还要把这些数据传送给CPU,CPU之后再把这些数据送到输出设备上,这里是第1次cpu拷贝。3.3)当内核态程序数据读取结束,切换回用户态,这里是第2次内核态用户态切换。3.4)当程序创立一个缓冲区,并将数据写入socket缓冲区,这里是第3次用户态内核态切换。3.5)此时cpu要把数据拷贝到socket缓冲区,这里是第2次cpu拷贝。3.6)实现所有操作之后,应用程序从内核态切换回用户态,继续执行后续操作(程序到此为止)。这里是第4次用户态内核态切换。 此时咱们能够看出,传统的IO拷贝流程,经验了4次用户态和内核态的切换,进行了2次cpu复制,性能消耗微小,咱们有没有更节俭资源的做法呢? 4.mmap零碎函数 linux的底层内核函数mmap函数对底层进行了一个优化: 4.1)用户态程序接到申请,要从磁盘上读取文件,切换到内核态,这里是第1次用户态内核态切换。4.2)当要读取的文件通过DMA复制到内核缓冲区实现,此时内核缓冲区,用户数据缓冲区共享一块物理内存空间,这里就无需cpu拷贝到用户空间中。4.3)此时读取文件结束,用户切换回用户态,这是第2次用户态内核态切换。4.4)申请一块缓冲区,须要调用内核函数,这是第3次用户态内核态切换。4.5)内核态通过cpu复制,将共享空间的数据内容拷贝到socket缓冲区中,这是第1次cpu拷贝。4.6)实现所有操作之后,应用程序从内核态切换回用户态,继续执行后续操作(程序到此为止)。这里是第4次用户态内核态切换。 咱们能够看出,mmap函数少了一次cpu复制,对于空间的利用率进步了,不过还是须要4次用户态和内核态的切换。 5.sendFile零碎函数(零拷贝) 零拷贝:指的是没有cpu拷贝,数据还是须要通过DMA拷贝到内存中,再发送进来的。 4.1)用户态程序接到申请,要从磁盘上读取文件,切换到内核态,这里是第1次用户态内核态切换。4.2)当数据通过DMA复制进入内核缓冲区并且实现,咱们还是通过cpu复制把数据复制到socket缓冲区,不过这里的cpu复制只复制很大量的内容,能够简直忽略不计。 4.3)此时数据通过DMA复制发送给目的地。4.4)程序切换回用户态,这是第2次用户态内核态切换。 咱们发现,sendFile零碎函数,只须要两次用户态到内核态的切换,而且一次cpu复制都不须要,大大节约了资源。 6.java堆外内存如何回收 介绍了零拷贝技术,其实Netty底层是应用堆外内存来实现零拷贝技术的,api:ByteBuffer.allocateDirect(),这条命令间接在堆外内存开拓了一块空间,咱们都晓得GC是收集堆内存垃圾的,那堆外内存又是如何收集的呢? 堆外内存的劣势:堆外内存的劣势在于IO上,java在应用socket发送数据的时候,如果应用堆外内存,就能够间接应用堆外内存往socket上发送数据,就节俭了先把堆外数据拷贝到堆内数据的开销。 咱们先来看看ByteBuffer.allocateDirect()的源码:咱们能够看出,java应用unsafe类来调配了一块堆外内存 那么堆外内存是如何回收的呢?咱们来看这样一行代码: cleaner就是用来回收堆外内存的,然而它是如何工作的呢?咱们认真钻研一下cleaner这个类,它是一个链表构造: 通过create(Object,Runnable)办法创立cleaner对象,调用本身的add办法,将其退出链表中。 clean有个重要的clean办法, 它首先将对象从本身链表中删除:而后执行this.thunk的run办法,thunk就是由创立的时候传入的Runnable函数:能够看出,run办法是一个开释堆外内存的函数。 逻辑咱们曾经梳理完,然而JVM如何开释其占用的堆外内存呢?如何跟Cleaner关联起来? 首先,Cleaner继承了PhantomReference(虚援用),对于强脆弱虚援用,在后面的博客曾经赘述过:深刻了解JVM(八)——强脆弱虚援用 简略地再介绍一下虚援用,当GC某个对象的时候,如果此对象上有虚援用,会将其退出PhantomReference退出到ReferenceQueue队列。 Cleaner继承PhantomReference,而PhantomReference又继承Reference,Reference初始化的时候,会运行一个动态代码块: 咱们能够看出,ReferenceHandler作为一个优先级比拟高的守护线程被启动了。 在看他的解决逻辑之前,咱们先理解一下对象的四种状态; Active:激活。创立ref对象时就是激活状态Pending:期待入援用队列。所对应的援用被GC,就要入队。Enqueued:入队状态。 如果指定了refQueue生产pending挪动到enqueued状态。refQueue.poll时进入生效状态如果没有指定refQueue,间接到生效状态。Inactive:生效接下来咱们能够看业务逻辑了: 这是一个死循环,咱们再往里点: static boolean tryHandlePending(boolean waitForNotify) { Reference<Object> r; Cleaner c; try { //可能有多线程对一个援用队列操作,所以要加锁 synchronized (lock) { //如果以后对象是 期待入援用队列 的状态 if (pending != null) { r = pending; // 'instanceof' might throw OutOfMemoryError sometimes // so do this before un-linking 'r' from the 'pending' chain... //转化为clean对象 c = r instanceof Cleaner ? (Cleaner) r : null; // unlink 'r' from 'pending' chain //解除援用 pending = r.discovered; r.discovered = null; } else { //如果没有,期待唤醒 // The waiting on the lock may cause an OutOfMemoryError // because it may try to allocate exception objects. if (waitForNotify) { lock.wait(); } // retry if waited return waitForNotify; } } } catch (OutOfMemoryError x) { // Give other threads CPU time so they hopefully drop some live references // and GC reclaims some space. // Also prevent CPU intensive spinning in case 'r instanceof Cleaner' above // persistently throws OOME for some time... Thread.yield(); // retry return true; } catch (InterruptedException x) { // retry return true; } // Fast path for cleaners //革除内存 if (c != null) { c.clean(); return true; } ReferenceQueue<? super Object> q = r.queue; if (q != ReferenceQueue.NULL) q.enqueue(r); return true; }咱们能够得出:1)当对象状态是Pending的时候,就会进入if,将这个对象转化为clean对象,并将这个援用置空2)进行clean的垃圾收集3)这个线程始终在后盾启动,如果有援用,就会唤醒该线程。 ...

July 22, 2022 · 2 min · jiezi

关于nio:线程安全

本文次要记录最近在解决一个兄弟的代码bug中的几点比较突出的问题. 多线程注意事项1、HashMap 不是线程平安的 在应用多线程的时候 , 肯定要留神本人所应用和设计的数据结构是否是线程平安的.比方 Java中平时用的最多的Map汇合就是HashMap了,它是线程不平安的为了避免出现线程平安的问题,不能应用HashMap作为成员变量,要寻求应用线程平安的Map 如 HashTable 、SynchronizedMap、ConcurrentHashMap ; 当然,也能够本人写锁来管制hashMap的平安. eg: 读写锁如下,是咱们现成的我的项目中的代码,多线程的参数传递,应用了HashMap,导致了很大的生产事变 2、多线程在进行对象的拷贝时,务必应用深拷贝模式 存在对象时 , 倡议应用序列化对象的形式进行深拷贝 . 因为对象的数据结构较为简单 , hashMap的putAll() 甚至是apache开源的CloneUtils.clone() 都不可能完全正确的进行深拷贝.2.1 此处,插播一个问题,不晓得大家留神到没有,下面的代码中,HashMap 在进行初始化的时候,并没有指定map的大小,这里也就出了一个新的问题: 服务始终在占用高CPU HashMap 在并发的环境下进行rehash的时候会造成链表的闭环,因而在进行get()操作的时候导致了CPU占用100% . 为什么进行rehash ? 具体去看源码 , 大略就是map的大小触碰到了HashMap的阈值threshold(map实现的时候就有了),当HashMap的容量达到 threshold 时就须要进行扩容,这个时候就要进行ReHash操作了,能够看addEntry函数的实现,当size达到threshold时会调用 resize 进行扩容2.2 在排查CPU 占用的问题的时候,咱们定位到一个jdk的bug : Selector BUG上面咱们对这个进行一个解说 2.2.1 Selector BUG呈现的起因 Selector的轮询后果为空,也没有wakeup或新音讯解决,则产生空轮询,CPU使用率100%,(因为selector的select办法,返回numKeys是0,所以上面本应该对key值进行遍历的事件处理基本执行不了,又回到最下面的while(true)循环,周而复始,一直的轮询,直到linux零碎呈现100%的CPU状况,其它执行工作干不了活) 2.2.2 Netty的解决办法 对Selector的select操作周期进行统计,每实现一次空的select操作进行一次计数 若在某个周期内间断产生N(默认是512)次空轮询,则触发了epoll死循环bug 重建Selector,判断是否是其余线程发动的重建申请,若不是则将原SocketChannel从旧的Selector上去除注册,从新注册到新的Selector上,并将原来的Selector敞开。 2.2.3 该bug相干信息 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=2147719 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933 这个bug的形容内容为,在NIO的selector中,即便是关注的select轮询事件的key为0的话,NIO照样一直的从select本应该阻塞的状况中wake up出.3、发现Cpu 打满 常用命令(三板斧) 1、top命令,按P依照CPU使用率排序找到占用率最高的过程pid2、top -Hp pid 命令找到占用率最高的线程tid3、printf "%x\n" tid 命令把十进制线程tid转化为十六进制4、jstack -l tid > stack.txt 打印栈信息5、用十六进制的tid在堆栈信息中搜寻6、如果没有搜寻到,反复执行几遍第4步,肯定能有本文参考: https://pdai.tech/md/java/io/... ...

April 20, 2022 · 1 min · jiezi

关于nio:Java里的零拷贝

对于linux零拷贝技术能够先看下后面一篇文章IO零拷贝,因为java里的零拷贝底层也是依赖的操作系统实现,须要阐明下,Linux提供的零拷贝技术Java并不是全反对,只反对2种:mmap内存映射、sendfile,别离是由FileChannel.map()与FileChannel.transferTo()/transferFrom()实现。波及的类次要有FileChannel,MappedByteBuffer,DirectByteBuffer。 MappedByteBuffer先看下ChannelFile的map办法: public abstract MappedByteBuffer map(MapMode mode, long position, long size)throws IOException;mode 限定内存映射区域(MappedByteBuffer)对内存映像文件的拜访模式,有只读,读写与写时拷贝三种。position 文件映射的起始地址,对应内存映射区域的首地址size 文件映射的字节长度,从position往后的字节数,对应内存映射区域的大小map办法正是NIO基于内存映射(mmap)这种零拷贝形式的一种实现形式。办法返回一个MappedByteBuffer,MappedByteBuffer继承于ByteBuffer,扩大的办法有force(),load(),isLoad()这三个办法: force(),对于处于READ_WRITE模式下的缓冲区,将对缓冲区内容共性强制刷新到本地文件load(),将缓冲区的内容载入物理内存中,并返回这个缓冲区的援用isLoad(),判断缓冲区的内容是否在物理内存中,是返回true,不是返回false看个示例 public class MappedByteBufferDemo { public static final String CONTENT = "zero copy by MappedByteBuffer"; public static final String FILE_NAME= "zero_copy/mmap.txt"; public static final String CHARSET = "UTF-8"; /** * 写文件数据:关上文件通道 fileChannel 并提供读权限、写权限和数据清空权限, * 通过 fileChannel 映射到一个可写的内存缓冲区 mappedByteBuffer, * 将指标数据写入 mappedByteBuffer,通过 force() 办法把缓冲区更改的内容强制写入本地文件。 */ @Test public void writeToFileByMappedByteBuffer(){ //文件门路依据理论来定,我是放在我的项目的resources目录下 Path path = Paths.get(getClass().getResource("/"+FILE_NAME).getPath()); byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET)); try(FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE,StandardOpenOption.TRUNCATE_EXISTING)){ MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, bytes.length); if (mappedByteBuffer != null){ mappedByteBuffer.put(bytes); mappedByteBuffer.force(); } }catch (IOException e){ e.printStackTrace(); } } /** * * 读文件数据:关上文件通道 fileChannel 并提供只读权限,通过 fileChannel 映射到一个 * 只可读的内存缓冲区 mappedByteBuffer,读取 mappedByteBuffer 中的字节数组即可失去文件数据。 */ @Test public void readFileFromMappedByteBuffer(){ Path path = Paths.get(getClass().getResource("/"+FILE_NAME).getPath()); int length = CONTENT.getBytes(Charset.forName(CHARSET)).length; try(FileChannel fileChannel = FileChannel.open(path,StandardOpenOption.READ)){ MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, length); if (mappedByteBuffer != null){ byte[] bytes = new byte[length]; mappedByteBuffer.get(bytes); String content = new String(bytes, StandardCharsets.UTF_8); assertEquals(content,"zero copy by MappedByteBuffer"); } }catch (IOException e){ e.printStackTrace(); } }}这里咱们再来看看map()办法,它是在FileChannelImpl类里实现的,来看下外围代码: ...

January 11, 2022 · 5 min · jiezi

关于nio:IO零拷贝

用户态与内核态Linux操作系统体系架构分为用户态与内核态。内核次要管制计算机的硬件资源,为下层利用提供运行反对。用户态为下层利用的流动空间,应用程序的执行须要内核的反对,如CPU资源,存储资源,IO资源等,用户态通过内核提供的拜访接口也就是零碎调用来应用这些资源。 基于间接外在(DMA)实现的文件传输 应用程序调用read(),上下文切换到内核,DMA将磁盘数据复制到内核的缓存空间read()返回,上下文切换到用户态,CPU将数据复制到用户的缓存空间应用程序调用write(),上下文再次切换到内核,CPU将数据复制到内核socket缓存write()返回,上下文再次切换到用户态,DMA将socket缓存数据复制到网卡缓存上这里一共呈现了4次上下文切换,4次数据拷贝通过sendfile实现的零拷贝 应用程序发现sendfile零碎调用,用户空间切换到内核态通过DMA将磁盘文件拷贝到内核缓冲区DMA收回中断,CPU解决中断,将数据从内核缓冲区拷贝到内核中与socket相干的缓冲区。而后sendfile零碎调用返回,从内核态切换到用户态DMA将内核空间socket缓冲区数据拷贝到网卡这里一共呈现了2次高低切换,3次数据拷贝未完。。。

January 6, 2022 · 1 min · jiezi

关于nio:RPC的通信NettyNetty的底层是NioJava的Io模型你了解多少

RPC的通信Netty,Netty的底层是Nio,Java的Io模型你理解多少? I/O 模型简略的了解:就是用什么样的通道进行数据的发送和接管,很大水平上决定了程序通信的性能,Java 共反对 3 种网络编程模型/IO 模式:BIO、NIO、AIO。 什么是BIO?同步并阻塞(传统阻塞型),服务器实现模式为一个连贯一个线程,即客户端有连贯申请时服务器 端就须要启动一个线程进行解决,如果这个连贯不做任何事件会造成不必要的线程开销、BIO 形式实用于连贯数目比拟小且固定的架构,这种形式对服务器资源要求比拟高,并发局限于利用中,JDK1.4以前的惟一抉择,但程序简略易了解。 同步非阻塞IO同步非阻塞,服务器实现模式为一个线程解决多个申请(连贯),即客户端发送的连贯申请都会注 册到多路复用器上,多路复用器轮询到连贯有 I/O 申请就进行解决,NIO 形式实用于连贯数目多且连贯比拟短(轻操作)的架构,比方聊天服务器,弹幕零碎,服务器间通信等。编程比较复杂,JDK1.4 开始反对。 Java AIO(NIO.2) : 异步非阻塞,AIO 引入异步通道的概念,采纳了 Proactor 模式,简化了程序编写,无效 的申请才启动线程,它的特点是先由操作系统实现后才告诉服务端程序启动线程去解决,个别实用于连接数较 多且连接时间较长的利用。AIO 形式应用于连贯数目多且连贯比拟长(重操作)的架构,比方相册服务器,充沛调用 OS 参加并发操作,编程比较复杂,JDK7 开始反对。Java BIO 问题剖析每个申请都须要创立独立的线程,与对应的客户端进行数据 Read,业务解决,数据 Write 。当并发数较大时,须要创立大量线程来解决连贯,系统资源占用较大。连贯建设后,如果以后线程临时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源节约nio介绍Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改良的输出/输入的新个性,被统称为 NIO(即 New IO),是同步非阻塞的NIO 相干类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。【根本案例】NIO 有三大外围局部:Channel(通道),**Buffer(缓冲区), Selector(**选择器)Selector 、 Channel 和 Buffer 的关系图 Selector 、 Channel 和 Buffer 的关系图 ...

August 17, 2021 · 1 min · jiezi

关于nio:linuxbio与nio的区别

BIO 一个线程对应一个连贯,read时阻塞,直到有数据返回. NIOselectapplication晓得有I/O工夫产生,但并不知道哪几个流,所以只能轮询所有流读取数据,工夫复杂度O(n),同时解决的流越多,工夫越多,且有最大连贯限度 poll application将文件描述符数组拷贝到内核空间,内核轮询每个fd对应的设施状态,工夫复杂度O(n),没有最大连接数的限度,基于链表来存储文件描述符fd epoll事件驱动,epoll会把哪个流产生了怎么的I/O事件告诉咱们,首先通过create函数创立一个epoll实例,而后应用ctl add/delete函数新增删除epoll实例上要监听的事件类型,调用wait函数期待事件产生,当产生事件时,内核会告诉application处理事件,application再去读取流epoll应用mmap文件映射,能够实现application和内核空间的音讯零拷贝,缩小复制开销. select,poll,epoll之间的区别过程最大连接数:select:有,受linux最大连接数影响poll:无,基于链表来存储epoll:有链接限度,但数量很大 1G内存机器能够关上10万连贯,fd剧增的IO效率select:线性遍历,不论链接是否沉闷均会遍历poll:同selectepoll:基于事件回调,沉闷socket才会被动调起callback,沉闷链接少时,性能很好,但当所有连贯都沉闷时,有性能问题消息传递形式select: 内核与用户空间之间须要互相拷贝poll:同上epoll:通过mmap共享内存

July 5, 2021 · 1 min · jiezi

关于nio:Java-NIO-基础四-选择器

从最根底的层面上来看,选择器提供了问询通道是否就绪操作I/O的能力,选择器能够监控注册在下面的多个通道,通道注册时会返回选择键(记录通道与选择器之间的关联关系),选择器管理者这些注册的键、和就绪状态键的汇合 SelectableChannel所有继承SelectableChannel的通道都能够在选择器中注册,FileChannel没有继承这个类,所以无奈应用选择器 选择键(SelectionKey) 选择键是选择器的重点内容,选择器就绪的通道通过返回选择键汇合来告诉 public abstract class SelectionKey { public static final int OP_READ public static final int OP_WRITE public static final int OP_CONNECT public static final int OP_ACCEPT public abstract SelectableChannel channel(); public abstract Selector selector(); public abstract void cancel(); public abstract boolean isValid(); public abstract int interestOps(); public abstract void interestOps(int ops); public abstract int readyOps(); public final boolean isReadable() public final boolean isWritable() public final boolean isConnectable() public final boolean isAcceptable() public final Object attach(Object ob) public final Object attachment()}选择键保护了通道和选择器之间的关联,能够通过选择键获取Channel或Selector,键对象示意一种非凡的关联关系,当这种关系须要终止时,能够调用cancel()办法勾销,调用这个办法时,不会立刻被勾销,而是将这个键放到被勾销的汇合里,当Selector下次调用select()办法时会真正被清理掉。当通道敞开时,选择键会主动被勾销,当选择器敞开时,所有键都会被清理掉。 ...

November 19, 2020 · 3 min · jiezi

关于nio:NIO优化原理和Tomcat线程模型

1、I/O阻塞书上说BIO、NIO等都属于I/O模型,然而I/O模型这个范畴有点含混,我为此走了不少弯路。咱们日常开发过程中波及到NIO模型利用,如Tomcat、Netty中等线程模型,能够间接将其视为网络I/O模型。本文还是在根底篇章中介绍几种I/O模型形式,前面就默认只解说网络I/O模型了。 1.1、I/O分类BIO、NIO、AIO等都属于I/O模型,所以它们优化的都是零碎I/O的性能,因而首先,咱们要分明常见的I/O有哪些分类: I/O品种场景java中到利用内存I/O从内存中读取数据,将数据写入内存线程从内存中将数据读取到工作空间,将值在工作空间实现更改后,将值由工作空间刷新到内存中(jmm)磁盘I/O读取磁盘文件,写文件到磁盘线程从内存中将数据读取到工作空间,将值在工作空间实现更改后,将值由工作空间刷新到内存中(jmm)网络I/O网络数据的读写和传输tcp/udp的形象api即socket 通信 (java.net)1.2、I/O过程和性能I/O(Input/Output)即数据的输出/输入,为什么大家很关怀I/O的性能呢?因为I/O存在的范畴很广,在高并发的场景下,这部分性能会被有限放大。而且与业务无关,是能够有对立解决方案的。 所有的零碎I/O都分为两个阶段:期待就绪和数据操作。举例来说,读函数,分为期待零碎可读和真正的读;同理,写函数分为期待网卡能够写和真正的写: 期待就绪:期待数据就绪,个别是将数据加载到内核缓存区。无论是从磁盘、网络读取数据,程序能解决的都是进入内核态之后的数据,在这之前,cpu会阻塞住,期待数据进入内核态。数据操作:数据就绪后,个别是将内核缓存中的数据加载到用户缓存区。须要阐明的是期待就绪的阻塞是不应用CPU的,是在“空等”;而真正的读写操作的阻塞是应用CPU的,真正在”干活”,而且这个过程十分快,属于memory copy,带宽通常在1GB/s级别以上,能够了解为根本不耗时。这就呈现一个奇怪的景象 -- 不应用CPU的“期待就绪”,却比理论应用CPU的“数据操作”,占用CPU工夫更多。 传统阻塞I/O模型,即在读写数据过程中会产生阻塞景象。当用户线程收回I/O申请之后,内核会去查看数据是否就绪,如果没有就绪就会期待数据就绪,而用户线程就会处于阻塞状态,用户线程交出CPU。当数据就绪之后,内核会将数据拷贝到用户线程,并返回后果给用户线程,用户线程才会解除block状态。 明确的是,让当前工作线程阻塞,期待数据就绪,是很节约线程资源的事件,上述三种I/O都有肯定的优化计划: 磁盘I/O:古代电脑中都有一个DMA(Direct Memory Access 间接内存拜访) 的外设组件,能够将I/O数据间接传送到主存储器中并且传输不须要CPU的参加,以此将CPU解放出来去实现其余的事件。网络I/O:NIO、AIO等I/O模型,通过向事件选择器注册I/O事件,基于就绪的事件来驱动执行I/O操作,防止的期待过程。内存I/O:内存局部没波及到太多阻塞,优化点在于缩小用户态和内核态之间的数据拷贝。nio中的零拷贝就有mmap和sendfile等实现计划。1.3、网络I/O阻塞这里认真的讲讲网络I/O模型中的阻塞,即socket的阻塞。在计算机通信畛域,socket 被翻译为“套接字”,它是计算机之间进行通信的一种约定或一种形式,是在tcp/ip协定上,形象进去的一层网络通讯协定。 同下面I/O的过程一样,网络I/O也同样分成两个局部: 期待网络数据达到网卡,读取到内核缓冲区。从内核缓冲区复制数据到用户态空间。每个 socket 被创立后,都会调配两个缓冲区,输出缓冲区和输入缓冲区: 输出缓冲区:当应用 read()/recv() 读取数据时,(1)首先会查看缓冲区,如果缓冲区中有数据,那么就读取,否则函数会被阻塞,直到网络上有数据到来。(2)如果要读取的数据长度小于缓冲区中的数据长度,那么就不能一次性将缓冲区中的所有数据读出,残余数据将一直积压,直到有 read()/recv() 函数再次读取。(3)直到读取到数据后 read()/recv() 函数才会返回,否则就始终被阻塞。输入缓冲区:当应用 write()/send() 发送数据时,(1)首先会查看缓冲区,如果缓冲区的可用空间长度小于要发送的数据,那么 write()/send() 会被阻塞(暂停执行),直到缓冲区中的数据被发送到指标机器,腾出足够的空间,才唤醒 write()/send() 函数持续写入数据。(2) 如果TCP协定正在向网络发送数据,那么输入缓冲区会被锁定,不容许写入,write()/send() 也会被阻塞,直到数据发送结束缓冲区解锁,write()/send() 才会被唤醒。(3)如果要写入的数据大于缓冲区的最大长度,那么将分批写入。(4)直到所有数据被写入缓冲区 write()/send() 能力返回。由此可见在网络I/O中,会有很多的因素导致数据的读取和写入过程呈现阻塞,创立socket连贯也一样。socket.accept()、socket.read()、socket.write()这类函数都是同步阻塞的,当一个连贯在解决I/O的时候,零碎是阻塞的,该线程以后的cpu工夫片就节约了。 2、阻塞优化2.1、BIO、NIO、AIOBIO、NIO、AIO比照以socket.read()为例子: 传统的BIO外面socket.read(),如果TCP RecvBuffer里没有数据,函数会始终阻塞,直到收到数据,返回读到的数据。对于NIO,如果TCP RecvBuffer有数据,就把数据从网卡读到内存,并且返回给用户;反之则间接返回0,永远不会阻塞。最新的AIO(Async I/O)外面会更进一步:岂但期待就绪是非阻塞的,就连数据从网卡到内存的过程也是异步的。换句话说,BIO里用户最关怀“我要读”,NIO里用户最关怀”我能够读了”,在AIO模型里用户更须要关注的是“读完了”。 NIONIO的优化体现在两个方面: 网络I/O模式的优化,通过非阻塞的模式,进步了CPU的使用性能。内存I/O的优化,零拷贝等形式,让数据在内核态和用户态之前的传输耗费升高了。NIO一个重要的特点是:socket次要的读、写、注册和接管函数,在期待就绪阶段都是非阻塞的,真正的I/O操作是同步阻塞的(耗费CPU但性能十分高)。 NIO的次要事件有几个:读就绪、写就绪、有新连贯到来。 咱们首先须要注册当这几个事件到来的时候所对应的处理器。而后在适合的机会通知事件选择器:我对这个事件感兴趣。对于写操作,就是写不进来的时候对写事件感兴趣;对于读操作,就是实现连贯和零碎没有方法承载新读入的数据的时;对于accept,个别是服务器刚启动的时候;而对于connect,个别是connect失败须要重连或者间接异步调用connect的时候。 其次,用一个死循环抉择就绪的事件,会执行零碎调用(Linux 2.6之前是select、poll,2.6之后是epoll,Windows是IOCP),还会阻塞的期待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连贯到来。 2.2、Reactor模式Reactor模式称之为响应器模式,通常用于NIO非阻塞IO的网络通信框架中。Reactor设计模式用于解决由一个或多个客户端并发传递给应用程序的的服务申请,能够了解成,Reactor模式是用来实现网络NIO的形式。 Reactor是一种事件驱动机制,是解决并发I/O常见的一种模式,用于同步I/O,其中心思想是将所有要解决的I/O事件注册到一个核心I/O多路复用器上,同时主线程阻塞在多路复用器上,一旦有I/O事件到来或是准备就绪,多路复用器将返回并将相应I/O事件散发到对应的处理器中。 Reactor模式次要分为上面三个局部: 事件接收器Acceptor:次要负责接管申请连贯,接管申请后,会将建设的连贯注册到分离器中。事件分离器Reactor:依赖于循环监听多路复用器Selector,是阻塞的,一旦监听到事件,就会将事件散发到事件处理器。(例如:监听读事件,等到内核态数据就绪后,将事件散发到Handler,Handler将数据读到用户态再做解决)事件处理器Handler:事件处理器次要实现相干的事件处理,比方读写I/O操作。2.3、三种Reactor模式单线程Reactor模式一个线程: 单线程:建设连贯(Acceptor)、监听accept、read、write事件(Reactor)、处理事件(Handler)都只用一个单线程。多线程Reactor模式一个线程 + 一个线程池: 单线程:建设连贯(Acceptor)和 监听accept、read、write事件(Reactor),复用一个线程。工作线程池:处理事件(Handler),由一个工作线程池来执行业务逻辑,包含数据就绪后,用户态的数据读写。主从Reactor模式三个线程池: 主线程池:建设连贯(Acceptor),并且将accept事件注册到从线程池。从线程池:监听accept、read、write事件(Reactor),包含期待数据就绪时,内核态的数据I读写。工作线程池:处理事件(Handler),由一个工作线程池来执行业务逻辑,包含数据就绪后,用户态的数据读写。3、Tomcat线程模型3.1、Api网络申请过程咱们先补一下基础知识,解说后端接口的响应过程。一个http连贯里,残缺的网络处理过程个别分为accept、read、decode、process、encode、send这几步: accept:接管客户端的连贯申请,创立socket连贯(tcp三次握手,创立连贯)。read:从socket读取数据,包含期待读就绪,和理论读数据。decode:解码,因为网络上的数据都是以byte的模式进行传输的,要想获取真正的申请,必然须要解码。process:业务解决,即服务端程序的业务逻辑实现。encode:编码,同理,因为网络上的数据都是以byte的模式进行传输的,也就是socket只接管byte,所以必然须要编码。send:往网络socket写回数据,包含理论写数据,和期待写就绪。3.2、各个线程模型在tomcat的各个版本中,所反对的线程模型也产生了一步步演变。一方面,间接将默认线程模型,从BIO变成了NIO。另一方面,在后续几个版本中,退出了对AIO和APR线程模型的反对,这里要留神,仅仅是反对,而非默认线程模型。 BIO:阻塞式IO,tomcat7之前默认,采纳传统的java IO进行操作,该模式下每个申请都会创立一个线程,实用于并发量小的场景。NIO:同步非阻塞,比传统BIO能更好的反对大并发,tomcat 8.0 后默认采纳该模式。AIO:异步非阻塞 (NIO2),tomcat8.0后反对。多用于连贯数目多且连贯比拟长(重操作)的架构,比方相册服务器,充沛调用OS参加并发操作,编程比较复杂。APR:tomcat 以JNI模式调用http服务器的外围动态链接库来解决文件读取或网络传输操作,须要编译装置APR库(也就是说IO操作的局部间接调用native代码实现)。各个线程模型中,NIO是作为目前最实用的线程模型,因而也是目前Tomcat默认的线程模型,因而本文对此着重解说。 ...

September 22, 2020 · 1 min · jiezi

从入门到放弃Java并发编程NIOBuffer

前言上篇【从入门到放弃-Java】并发编程-NIO-Channel中我们学习到channel是双向通道,数据通过channel在实体(文件、socket)和缓冲区(buffer)中可以双向传输。 本文我们就来学习下buffer 简介buffer即缓冲区,实际上是一块内存,可以用来写入、读取数据。是一个线性的、大小有限的、顺序承载基础数据类型的内存块。 buffer有三个重要的属性: capacity:缓冲池大小,是不可变的。当buffer写满时,需要先清空才能继续写入。limit:是buffer中不可以被读或者写的第一个元素的位置,limit的大小永远不会超过capacity(在写模式下,limit等于capacity)position:是buffer中可以被读或者写的第一个元素的位置,position的大小永远不会超过limit除了boolean外,每一个基础数据类型都有对应的buffer。如:ByteBuffer、CharBuffer、LongBuffer等 buffer不是线程安全的,如果要在多线程中使用 需要加锁控制 接下来以ByteBuffer为例开始学习。 ByteBufferallocateDirectpublic static ByteBuffer allocateDirect(int capacity) { //会创建一个容量大小为capacity的DirectByteBuffer(ByteBuffer的子类) return new DirectByteBuffer(capacity);}allocatepublic static ByteBuffer allocate(int capacity) { if (capacity < 0) throw createCapacityException(capacity); //会创建一个容量大小为capacity的HeapByteBuffer(ByteBuffer的子类) return new HeapByteBuffer(capacity, capacity);}HeapByteBuffer和DirectByteBuffer的区别: DirectByteBuffer是直接调用native方法在本机os::malloc()创建堆外内存;HeapByteBuffer是直接在jvm的堆中分配内存。当buffer中的数据和磁盘、网络等的交互都在操作系统的内核中发生时,使用DirectByteBuffer能避免从内核态->用户态->内核态的切换开销,所有的处理都在内核中进行,性能会比较好当频繁创建操作数据量比较小的buffer时,使用HeapByteBuffer在jvm堆中分配内存能抵消掉使用DirectByteBuffer带来的好处。wrappublic static ByteBuffer wrap(byte[] array, int offset, int length){ try { return new HeapByteBuffer(array, offset, length); } catch (IllegalArgumentException x) { throw new IndexOutOfBoundsException(); }}public static ByteBuffer wrap(byte[] array) { return wrap(array, 0, array.length); }将byte数组包装成一个ByteBuffer ...

July 8, 2019 · 2 min · jiezi

从入门到放弃Java并发编程NIOChannel

前言上篇[【从入门到放弃-Java】并发编程-NIO使用]()简单介绍了nio的基础使用,本篇将深入源码分析nio中channel的实现。 简介channel即通道,可以用来读、写数据,它是全双工的可以同时用来读写操作。这也是它与stream流的最大区别。 channel需要与buffer配合使用,channel通道的一端是buffer,一端是数据源实体,如文件、socket等。在nio中,通过channel的不同实现来处理 不同实体与数据buffer中的数据传输。 channel接口: package java.nio.channels;import java.io.IOException;import java.io.Closeable;/** * A nexus for I/O operations. * * <p> A channel represents an open connection to an entity such as a hardware * device, a file, a network socket, or a program component that is capable of * performing one or more distinct I/O operations, for example reading or * writing. * * <p> A channel is either open or closed. A channel is open upon creation, * and once closed it remains closed. Once a channel is closed, any attempt to * invoke an I/O operation upon it will cause a {@link ClosedChannelException} * to be thrown. Whether or not a channel is open may be tested by invoking * its {@link #isOpen isOpen} method. * * <p> Channels are, in general, intended to be safe for multithreaded access * as described in the specifications of the interfaces and classes that extend * and implement this interface. * * * @author Mark Reinhold * @author JSR-51 Expert Group * @since 1.4 */public interface Channel extends Closeable { /** * Tells whether or not this channel is open. * * @return <tt>true</tt> if, and only if, this channel is open */ public boolean isOpen(); /** * Closes this channel. * * <p> After a channel is closed, any further attempt to invoke I/O * operations upon it will cause a {@link ClosedChannelException} to be * thrown. * * <p> If this channel is already closed then invoking this method has no * effect. * * <p> This method may be invoked at any time. If some other thread has * already invoked it, however, then another invocation will block until * the first invocation is complete, after which it will return without * effect. </p> * * @throws IOException If an I/O error occurs */ public void close() throws IOException;}常见的channel实现有: ...

July 8, 2019 · 12 min · jiezi

史上最强Java-NIO入门担心从入门到放弃的请读这篇

本文原题“《NIO 入门》,作者为“Gregory M. Travis”,他是《JDK 1.4 Tutorial》等书籍的作者。 1、引言Java NIO是Java 1.4版加入的新特性,虽然Java技术日新月异,但历经10年,NIO依然为Java技术领域里最为重要的基础技术栈,而且依据现实的应用趋势,在可以预见的未来,它仍将继续在Java技术领域占据重要位置。 网上有关Java NIO的技术文章,虽然写的也不错,但通常是看完一篇马上懵逼。接着再看!然后,会更懵逼。。。 哈哈哈! 本文作者厚积薄发,以远比一般的技术博客或技术作者更深厚的Java技术储备,为你由浅入深,从零讲解到底什么是Java NIO。本文即使没有多少 Java 编程经验的读者也能很容易地开始学习 NIO。 (本文同步发布于:http://www.52im.net/thread-26...) 2、关于作者Gregory M. Travis:技术顾问、多产的技术作家,现居纽约。他从Java语言发布的第1天起,就已经是Java程序员啦! Gregory M. Travis是《JDK 1.4 Tutorial》一书的作者,Java程序员应该都清楚,能写好JDK Tutorial这种书籍或手册的,除了SUN(现在是Oracle)公司的Java创建者们,余下的也只有各路实打实的Java大牛们才能hold住。 3、在开始之前3.1 关于本教程 新的输入/输出 (NIO) 库是在 JDK 1.4 中引入的。NIO 弥补了原来的 I/O 的不足,它在标准 Java 代码中提供了高速的、面向块的 I/O。通过定义包含数据的类,以及通过以块的形式处理这些数据,NIO 不用使用本机代码就可以利用低级优化,这是原来的 I/O 包所无法做到的。 在本教程中,我们将讨论 NIO 库的几乎所有方面,从高级的概念性内容到底层的编程细节。除了学习诸如缓冲区和通道这样的关键 I/O 元素外,您还有机会看到在更新后的库中标准 I/O 是如何工作的。您还会了解只能通过 NIO 来完成的工作,如异步 I/O 和直接缓冲区。 在本教程中,我们将使用展示 NIO 库的不同方面的代码示例。几乎每一个代码示例都是一个大的 Java 程序的一部分,您可以在本文末的附件中下载到这个 Java 程序。在做这些练习时,我们推荐您在自己的系统上下载、编译和运行这些程序。在您学习了本教程以后,这些代码将为您的 NIO 编程努力提供一个起点。 本教程是为希望学习更多关于 Java NIO 库的知识的所有程序员而写的。为了最大程度地从这里的讨论中获益,您应该理解基本的 Java 编程概念,如类、继承和使用包。多少熟悉一些原来的 I/O 库(来自java.io.* 包)也会有所帮助。 ...

June 29, 2019 · 9 min · jiezi

NIO之Reactor模式Netty序章

Reactor模式反应堆模式:“反应”器名字中”反应“的由来: “反应”即“倒置”,“控制逆转”,具体事件处理程序不调用反应器,而向反应器注册一个事件处理器,表示自己对某些事件感兴趣,有时间来了,具体事件处理程序通过事件处理器对某个指定的事件发生做出反应。单线程Reactor模式流程: ①服务器端的Reactor是一个线程对象,该线程会启动事件循环,并使用Selector(选择器)来实现IO的多路复用。channel注册一个Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。②客户端向服务器端发起一个连接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的READ事件以及对应的READ事件处理器注册到Reactor中,这样一来Reactor就会监听该连接的READ事件了。③当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理。比如,读处理器会通过SocketChannel的read()方法读取数据,此时read()操作可以直接读取到数据,而不会堵塞与等待可读的数据到来。④每当处理完所有就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理。注意,Reactor的单线程模式的单线程主要是针对于I/O操作而言,也就是所有的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成的。 基于单线程反应器模式手写一个NIO通信服务端处理器: /** * 类说明:nio通信服务端处理器 */public class NioServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * 构造方法 * @param port 指定要监听的端口号 */ public NioServerHandle(int port) { try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.register(selector,SelectionKey.OP_ACCEPT); started = true; System.out.println("服务器已启动,端口号:"+port); } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { //循环遍历selector while(started){ try{ //阻塞,只有当至少一个注册的事件发生的时候才会继续. selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Throwable t){ t.printStackTrace(); } } //selector关闭后会自动释放里面管理的资源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //处理新接入的请求消息 if(key.isAcceptable()){ //获得关心当前事件的channel ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); //通过ServerSocketChannel的accept创建SocketChannel实例 //完成该操作意味着完成TCP三次握手,TCP物理链路正式建立 SocketChannel sc = ssc.accept(); System.out.println("======socket channel 建立连接" ); //设置为非阻塞的 sc.configureBlocking(false); //连接已经完成了,可以开始关心读事件了 sc.register(selector,SelectionKey.OP_READ); } //读消息 if(key.isReadable()){ System.out.println("======socket channel 数据准备完成," + "可以去读==读取======="); SocketChannel sc = (SocketChannel) key.channel(); //创建ByteBuffer,并开辟一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //读取请求码流,返回读取到的字节数 int readBytes = sc.read(buffer); //读取到字节,对字节进行编解码 if(readBytes>0){ //将缓冲区当前的limit设置为position=0, // 用于后续对缓冲区的读取操作 buffer.flip(); //根据缓冲区可读字节数创建字节数组 byte[] bytes = new byte[buffer.remaining()]; //将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String message = new String(bytes,"UTF-8"); System.out.println("服务器收到消息:" + message); //处理数据 String result = response(message) ; //发送应答消息 doWrite(sc,result); } //链路已经关闭,释放资源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } //发送应答消息 private void doWrite(SocketChannel channel,String response) throws IOException { //将消息编码为字节数组 byte[] bytes = response.getBytes(); //根据数组容量创建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //将字节数组复制到缓冲区 writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //发送缓冲区的字节数组 channel.write(writeBuffer); }}public class NioServer { private static NioServerHandle nioServerHandle; public static void start(){ if(nioServerHandle !=null) nioServerHandle.stop(); nioServerHandle = new NioServerHandle(DEFAULT_PORT); new Thread(nioServerHandle,"Server").start(); } public static void main(String[] args){ start(); }}客户端处理器: ...

June 13, 2019 · 3 min · jiezi

Netty如何接入新连接

欢迎关注公众号:【爱编程】如果有需要后台回复2019赠送1T的学习资料哦!!前文再续,书接上一回【NioEventLoop】。在研究NioEventLoop执行过程的时候,检测IO事件(包括新连接),处理IO事件,执行所有任务三个过程。其中检测IO事件中通过持有的selector去轮询事件,检测出新连接。这里复用同一段代码。 Channel的设计在开始分析前,先了解一下Channel的设计 顶层Channel接口定义了socket事件如读、写、连接、绑定等事件,并使用AbstractChannel作为骨架实现了这些方法。查看器成员变量,发现大多数通用的组件,都被定义在这里 第二层AbstractNioChannel定义了以NIO,即Selector的方式进行读写事件的监听。其成员变量保存了selector相关的一些属性。 第三层内容比较多,定义了服务端channel(左边继承了AbstractNioMessageChannel的NioServerSocketChannel)以及客户端channel(右边继承了AbstractNioByteChannel的NioSocketChannel)。 如何接入新连接?本文开始探索一下Netty是如何接入新连接?主要分为四个部分 1.检测新连接2.创建NioSocketChannel3.分配线程和注册Selector4.向Selector注册读事件1.检测新连接Netty服务端在启动的时候会绑定一个bossGroup,即NioEventLoop,在bind()绑定端口的时候注册accept(新连接接入)事件。扫描到该事件后,便处理。因此入口从:NioEventLoop#processSelectedKeys()开始。 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //省略代码 // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop //如果当前NioEventLoop是workGroup 则可能是OP_READ,bossGroup是OP_ACCEPT if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //新连接接入以及读事件处理入口 unsafe.read(); } }关键的新连接接入以及读事件处理入口unsafe.read(); a).这里的unsafe是在Channel创建过程的时候,调用了父类AbstractChannel#AbstractChannel()的构造方法,和pipeline一起初始化的。 protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }服务端:unsafe 为NioServerSockeChannel的父类AbstractNioMessageChannel#newUnsafe()创建,可以看到对应的是AbstractNioMessageChannel的内部类NioMessageUnsafe; ...

June 7, 2019 · 7 min · jiezi

如何利用Java-NIO实现高性能高并发的http服务器

在学习Java NIO的过程中,我一直理解不了Java NIO是怎么用来实现高并发的服务器的,网上给出的例子里,基本上没有多少说到这一点的,Tomcat,Jetty这些的源码又太庞大了,导致我无从下手。 后来搜了下才发现,JDK自带了一个httpserver的实现,看了下代码,非常简洁,非常规范,一下子就让我搞懂了Java NIO是怎么实现高并发的了。 NIO是同步非阻塞模型,同步是指用NIO读取数据,需要你的线程一直在运行着,直到数据读写完毕。非阻塞是指监听通道的时候是非阻塞的,比如向通道询问有没有数据可读的时候,可以马上就返回有或者没有。如果有数据,你的线程就可以去处理读数据这部分的逻辑;如果没有数据,你的线程可以去忙其他的事情,这样子单个线程的处理能力就会高很多,不用总是等着数据。 在jdk的httpserver里,处理请求的过程大致如下图:

June 5, 2019 · 1 min · jiezi

【J2SE】java NIO 基础学习

NIO与IO的区别IONIO阻塞式非阻塞式、选择器selectors面向流:单向流动,直接将数据从一方流向另一方面向缓存:将数据放到缓存区中进行存取,经通道进行数据的传输缓冲Buffer根据数据类型的不同,提供了对应的类型缓冲区(boolean类型除外),每一个Buffer类都是Buffer接口的一个实例。通过Buffer类.allocate()方法获取缓冲区;对缓冲区的数据进行操作可以使用put方法和get方法。四个核心属性// Invariants: mark <= position <= limit <= capacityprivate int mark = -1;private int position = 0;private int limit;private int capacity;capacity:容量,表示缓冲区中最大存储容量,一旦声明不可更改。limit:界限,表示限制可对缓冲区操作数据的范围,范围外的数据不可被操作。position:位置,表示当前操作的数据位于缓冲区中的位置。mark:标记,表示记录当前position的位置。常用方法(以ByteBuffer为例)public static ByteBuffer allocateDirect(int capacity):分配一个直接缓冲区public static ByteBuffer allocate(int capacity):分配一个间接缓冲区当分配一个缓冲区时,capacity=capacity,mark=-1, position=0, limit=capacity,源码分析如下:public static ByteBuffer allocate(int capacity) { … return new HeapByteBuffer(capacity, capacity);}// class HeapByteBuffer extends ByteBufferHeapByteBuffer(int cap, int lim) { // 调用ByteBuffer的构造函数传入默认参数:mark=-1, position=0, limit=capacity super(-1, 0, lim, cap, new byte[cap], 0);};// public abstract class ByteBuffer extends BufferByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) { super(mark, pos, lim, cap); this.hb = hb; // final byte[] hb; this.offset = offset; // final int offset;}Buffer(int mark, int pos, int lim, int cap) { … this.capacity = cap; limit(lim); // 设置limit position(pos); // 设置position if (mark >= 0) { … this.mark = mark; }}public final ByteBuffer put(byte[] src):将一个字节数组放入缓冲区。每当放置一个字节时,position将会+1,保证position的值就是下一个可插入数据的buffer单元位置。源码分析如下:public final ByteBuffer put(byte[] src) { return put(src, 0, src.length); }// 由allocate方法调用分配缓冲区可知,返回的是Buffer的实现类HeapByteBuffer对象public ByteBuffer put(byte[] src, int offset, int length) { checkBounds(offset, length, src.length); // 检查是否下标越界 if (length > remaining()) // 检查是否超出了可操作的数据范围= limit-position throw new BufferOverflowException(); System.arraycopy(src, offset, hb, ix(position()), length); position(position() + length); // 重设position return this;}public ByteBuffer get(byte[] dst):从缓冲区中读取数据到 dst中。应在 flip() 方法后调用。获取数据,是在缓冲区字节数组中的position位置处开始,读取一次完毕后,并会记录当前读取的位置,即position,以便于下一次调用get方法继续读取。public ByteBuffer get(byte[] dst) { return get(dst, 0, dst.length);}// 调用HeapByteBuffer对象的get方法public ByteBuffer get(byte[] dst, int offset, int length) { … // 从缓冲区的字节数组final byte[] hb中,拷贝从 hb的 offset+position(注:offset=0) 处的长度为length的数据到 dst中 System.arraycopy(hb, ix(position()), dst, offset, length); position(position() + length); // 设置position return this;}通过源码分析可知,当put操作后,position记录的是下一个可用的buffer单元,而get会从position位置处开始获取数据,这显然是无法获得的,因此需要重新设置 position, 即 flip()方法。public final Buffer flip() :翻转缓冲区,在一个通道读取或PUT操作序列之后,调用此方法以准备一个通道写入或相对获取操作的序列将此通道的缓冲区的界限设置为当前position,保证了有可操作的数据。public final Buffer flip() { limit = position; position = 0; mark = -1; return this;}public final Buffer mark():标记当前position可用于在put操作转get操作时标记当前的position位置,以便于调用reset方法从该位置继续操作public final Buffer mark() { mark = position; return this;}public final Buffer reset():回到mark标记的位置public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this;}public final Buffer clear():清除缓冲,重置初始化原始状态public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this;}public final Buffer rewind():倒回,用于重新读取数据public final Buffer rewind() { position = 0; mark = -1; return this;}直接缓冲区与间接缓冲区间接缓冲:通过allocate方法分配的缓冲区。当程序发起read请求获取磁盘文件时,该文件首先被OS读取到内核地址空间中,并copy一份原始数据传入JVM用户地址空间,再传给应用程序。增加了一个copy操作,导致效率降低。直接缓冲:通过allocateDirecr方法分配的缓冲区,此缓冲区建立在物理内存中。直接在两个空间中开辟内存空间,创建映射文件,去除了在内核地址空间和用户地址空间中的copy操作,使得直接通过物理内存传输数据。虽然有效提高了效率,但是分配和销毁该缓冲区的成本高于间接缓冲,且对于缓冲区中的数据将交付给OS管理,程序员无法控制。通道Channel用于源节点与目标节点之间的连接,负责对缓冲区中的数据提供传输服务。常用类 FileChannel:用于读取、写入、映射和操作文件的通道。 SocketChannel:通过 TCP 读写网络中的数据。 ServerSocketChannerl:通过 UDP 读写网络中的数据通道。 DatagramChannel:通过 UDP 读写网络中的数据通道。 本地IO:FileInputStream、FileOutputStream、RandomAccessFile 网络IO:Socket、ServerSocket、DatagramSocket获取Channel方式(以FileChannel为例) 1. Files.newByteChannel工具类静态方法 2. getChannel方法:通过对象动态获取,使用间接缓冲区。FileInputStream fis = new FileInputStream(ORIGINAL_FILE);FileOutputStream fos = new FileOutputStream(OUTPUT_FILE);// 获取通道FileChannel inChannel = fis.getChannel();FileChannel outChannel = fos.getChannel();// 提供缓冲区(间接缓冲区)ByteBuffer buffer = ByteBuffer.allocate(1024);while (inChannel.read(buffer) != -1) { buffer.flip(); outChannel.write(buffer); buffer.clear();} 3. 静态open方法:使用open获取到的Channel通道,使用直接缓冲区。FileChannel inChannel = FileChannel.open(Paths.get(ORIGINAL_FILE), StandardOpenOption.READ);FileChannel outChannel = FileChannel.open(Paths.get(OUTPUT_FILE), StandardOpenOption.READ, StandardOpenOption.CREATE, StandardOpenOption.WRITE); // 使用物理内存 内存映射文件 MappedByteBuffer inBuffer = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());MappedByteBuffer outBuffer = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size()); byte[] dst = new byte[inBuffer.limit()];inBuffer.get(dst);outBuffer.put(dst);// 使用DMA 直接存储器存储inChannel.transferTo(0, inChannel.size(), outChannel);outChannel.transferFrom(inChannel, 0, inChannel.size());public static FileChannel open(Path path, OpenOption… options):从path路径中以某种方式获取文件的ChannelStandardOpenOption描述CREATE创建一个新的文件,如果存在,则覆盖。CREATE_NEW创建一个新的文件,如果该文件已经存在则失败。DELETE_ON_CLOSE关闭时删除。DSYNC要求将文件内容的每次更新都与底层存储设备同步写入。READ读方式SPARSE稀疏文件SYNC要求将文件内容或元数据的每次更新都同步写入底层存储设备。TRUNCATE_EXISTING如果文件已经存在,并且打开 wirte访问,则其长度将截断为0。WRITE写方式APPEND如果文件以wirte访问打开,则字节将被写入文件的末尾而不是开头。public abstract MappedByteBuffer map(MapMode mode, long position, long size):将通道的文件区域映射到内从中。当操作较大的文件时,将数据映射到物理内存中才是值得的,因为映射到内存是需要开销的。FileChannel.MapMode描述PRIVATE专用映射模式(写入时拷贝)READ_ONLY只读模式READ_WRIT读写模式public abstract long transferFrom(ReadableByteChannel src, long position, long count):从给定的可读取通道src,传输到本通道中。直接使用直接存储器(DMA)对数据进行存储。public abstract long transferTo(long position, long count, WritableByteChannel target):将本通道的文件传输到可写入的target通道中。分散(Scatter)与聚集(Gather) 分散读取:将通道中的数据分散到多个缓冲区中。 public final long read(ByteBuffer[] dsts) 聚集写入:将多个缓冲区中的数据聚集到一个Channel通道中。public final long write(ByteBuffer[] srcs)字符集(Charset)public final ByteBuffer encode(CharBuffer cb):编码public final CharBuffer decode(ByteBuffer bb):解码网络通信的阻塞与非阻塞阻塞是相对网络传输而言的。传统的IO流都是阻塞的,在网络通信中,由于 IO 阻塞,需要为每一个客户端创建一个独立的线程来进行数据传输,性能大大降低;而NIO是非阻塞的,当存在空闲线程时,可以转去操作其他通道,因此不必非要创建一个独立的线程来服务每一个客户端请求。选择器(Selector)SelectableChannle对象的多路复用器,可同时对多个SelectableChannle对象的 IO 状态监听,每当创建一个Channel时,就向Selector进行注册,交由Selector进行管理,只有Channel准备就绪时,Selector可会将任务分配给一个或多个线程去执行。Selector可以同时管理多个Channel,是非阻塞 IO 的核心。NIO 阻塞式服务器Server不断监听客户端Client的请求,当建立了一个Channel时,服务器进行read操作,接收客户端发送的数据,只有当客户端断开连接close,或者执行shutdownOutput操作时,服务器才知晓没有数据了,否则会一直进行read操作;当客户端在read操作获取服务器的反馈时,若服务器没有关闭连接或者shutdownInput时也会一直阻塞。示例代码如下: static final String ORIGINAL_FILE = “F:/1.png”; static final String OUTPUT_FILE = “F:/2.jpg”;public void server() throws Exception { // 打开TCP通道,绑定端口监听 ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress(9988)); ByteBuffer buf = ByteBuffer.allocate(1024); // 获取连接 SocketChannel accept = null; while ((accept= serverChannel.accept()) != null) { FileChannel fileChannel = FileChannel.open( Paths.get(OUTPUT_FILE), StandardOpenOption.CREATE, StandardOpenOption.WRITE); // 读取客户端的请求数据 while (accept.read(buf) != -1) { buf.flip(); fileChannel.write(buf); buf.clear(); } // 发送执行结果 buf.put(“成功接收”.getBytes()); buf.flip(); accept.write(buf); buf.clear(); fileChannel.close(); // 关闭连接,否则客户端会一直等待读取导致阻塞,可使用shutdownInput,但任务已结束,该close accept.close(); } serverChannel.close();} public void client() throws Exception { // 打开一个socket通道 SocketChannel clientChannel = SocketChannel.open( new InetSocketAddress(“127.0.0.1”, 9988)); // 创建缓冲区和文件传输通道 FileChannel fileChannel = FileChannel.open(Paths.get(ORIGINAL_FILE), StandardOpenOption.READ); ByteBuffer buf = ByteBuffer.allocate(1024); while ( fileChannel.read(buf) != -1) { buf.flip(); clientChannel.write(buf); buf.clear(); } // 关闭输出(不关闭通道),告知服务器已经发送完毕,去掉下面一行代码服务区将一直读取导致阻塞 clientChannel.shutdownOutput(); int len = 0; while ((len = clientChannel.read(buf)) != -1) { buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); } fileChannel.close(); clientChannel.close();}NIO 非阻塞式通过在通道Channel中调用configureBlocking将blocking设置为false,让Channel可以进行异步 I/O 操作。public void client() throws Exception { // 打开一个socket通道 SocketChannel clientChannel = SocketChannel.open( new InetSocketAddress(“127.0.0.1”, 9988)); ByteBuffer buf = ByteBuffer.allocate(1024); // 告知服务器,已经发送完毕 // clientChannel.shutdownOutput(); // 设置非阻塞 clientChannel.configureBlocking(Boolean.FALSE); buf.put(“哈哈”.getBytes()); buf.flip(); clientChannel.write(buf); clientChannel.close();}public void server() throws Exception { // 打开TCP通道,绑定端口监听 ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(Boolean.FALSE); serverChannel.bind(new InetSocketAddress(9988)); // 创建一个Selector用于管理Channel Selector selector = Selector.open(); // 将服务器的Channel注册到selector中,并添加 OP_ACCEPT 事件,让selector监听通道的请求 serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 一直判断是否有已经准备就绪的Channel while (selector.select() > 0) { // 存在一个已经准备就绪的Channel,获取SelectionKey集合中获取触发该事件的所有key Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey sk = keys.next(); SocketChannel accept = null; ByteBuffer buffer = null; // 针对不同的状态进行操作 if (sk.isAcceptable()) { // 可被连接,设置非阻塞并注册到selector中 accept = serverChannel.accept(); accept.configureBlocking(Boolean.FALSE); accept.register(selector, SelectionKey.OP_READ); } else if (sk.isReadable()) { // 可读,获取该选择器上的 Channel进行读操作 accept = (SocketChannel) sk.channel(); buffer = ByteBuffer.allocate(1024); int len = 0; while ((len = accept.read(buffer)) != -1) { buffer.flip(); System.out.println(new String(buffer.array(), 0, len)); buffer.clear(); } } } // 移除本次操作的SelectionKey keys.remove(); } serverChannel.close();} 方法使用说明ServerSocketChannel对象只能注册accept 事件。设置configureBlocking为false,才能使套接字通道中进行异步 I/O 操作。调用selectedKeys方法,返回发生了SelectionKey对象的集合。调用remove方法,用于从SelectionKey集合中移除已经被处理的key,若不处理,那么它将继续以当前的激活事件状态继续存在。Pipe管道Channel都是双向通道传输,而Pipe就是为了实现单向管道传送的通道对,有一个source通道(Pipe.SourceChannel)和一个sink通道(Pipe.SinkChannel)。sink用于写数据,source用于读数据。直接使用Pipe.open()获取Pipe对象,操作和FileChannel一样。 ...

March 16, 2019 · 4 min · jiezi

如何理解I/O多路复用

java nio提供了一套称为I/O多路复用的编程范式,那么什么叫做I/O多路复用呢?所谓的I/O多路复用,从字面意思上来理解,就是:有多个I/O操作(或是写,或是读,或是请求),这多个I/O操作都共用一个逻辑流。为了讲清复用的是什么,首先得先说明一下逻辑流的概念。逻辑流是什么?这里的逻辑流和操作系统中"线程是进程的一个逻辑流"是一个意思。下面的就是一个逻辑流:{int a = 5;int b aa;double c = a/b;}下面又是一个逻辑流:{long b = 5;int c = b+3;}如果在一个进程中,如果没有线程,那么程序是顺序执行的,那么所有的代码都是属于一个逻辑流。比如说,上面的两端代码,如果合在一个进程当中,它们一定是这种结构:{int a = 5;int b aa;double c = a/b;}….{long b = 5;int c = b+3;}或是{long b = 5;int c = b+3;}….{int a = 5;int b aa;double c = a/b;}也就是说它们一定属于一个逻辑流(一个顺序结构)。理解了这个,那么所谓的I/O复用,指的就是在一个逻辑流里处理多个I/O事件!!!如何做到?利用Selector多路复用器,轮询监听各路I/O,如果一旦有I/O事件发生,那么就去处理,否则程序阻塞。来看一个程序,加深理解:package qiuqi.filedownloadtest;import java.io.FileInputStream;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.;import java.util.Iterator;public class FileServer { public static void main(String[] args) throws IOException { startServer(); } public static void startServer() throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(9999)); serverSocketChannel.configureBlocking(false); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if(key.isAcceptable()) { System.out.println(“收到连接这个I/O事件”); catch (IOException e){e.printStackTrace();} } } } }}这是一个监听网络I/O的多路复用程序,java中只能监听网络I/O,不能监听标准输入输出等I/O(不过这些在linux里都可以)。我们发现,这个程序的原理就是开启一个网络I/O类,ServerSocketChannel,把它注册到Selector(选择器)上,然后选择器就开始轮询,直到发现一个I/O事件,于是就进入第一个while循环进行处理,否则一直阻塞在select()>0处。这是一个极其简陋的程序,但是它揭示了多路复用的真正内涵,也就是用一个逻辑流监听,处理多个I/O(不过处理程序其实可以开启多线程,也就是指第一个while循环里的部分)。这就是I/O多路复用!!! ...

March 5, 2019 · 1 min · jiezi

基于零拷贝技术的的java NIO文件下载服务器

什么是零拷贝?我们首先来认识一下传统的I/O操作。假如说用户进程现在要把一个文件复制到另一个地方。那么用户程序必须先把这个文件读入内存,然后再把内存里的数据写入另一个文件。不过文件读入内存也不是直接读入用户进程的内存,而是先读入操作系统内核的内存,然后再从操作系统内核的内存区读到用户进程的内存。与之对应的是,写文件也不是直接写到磁盘上的文件,而是用户进程先把自己内存的数据传到操作系统内核的内存,然后再从操作系统内核的内存区写到磁盘。而这其中涉及到诸多的系统调用。因此看上去简单的操作至少要分为四部1磁盘文件读入操作系统2操作系统读到用户进程3用户进程写到操作系统4操作系统写入磁盘文件零拷贝和传统I/O有和不同?零拷贝就是指,传输一个文件的时候,不需要把文件读到用户进程再处理,而是直接把文件读到操作系统一个内存区,然后再移动到操作系统的另一个内存区,最后写入文件。这样一来,步骤变成这样:1磁盘文件读入操作系统2操作系统把数据写入操作系统另一个区域3操作系统写入磁盘文件虽然只少了一步,但是这里不仅减少了数据移动的时间损耗,而且减少了系统调用的次数,因此大大缩短了时间。更加详细的解释请看https://blog.csdn.net/u010530…java里如何实现零拷贝呢?这就要说起java nio中的FileChannel.transferTo()方法了,该方法是把FileChannel中的数据利用零靠的技术转移到另一个channel。这另一个channel往往是FileChannel,不过SocketChannel也是可以的:)。简单实现(静态下载文件,不能根据用户指令来更改下载的文件。)代码如下:单线程版本:package qiuqi.filedownloadtest;import java.io.FileInputStream;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.;import java.util.Iterator;public class FileServer { public static void main(String[] args) throws IOException { startServer(); } public static void startServer() throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(9999)); serverSocketChannel.configureBlocking(false); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if(key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); try (FileInputStream in = new FileInputStream(“C:\Users\dell\Desktop\ZOL手机数据(1).rar”)){ in.getChannel().transferTo(0,in.getChannel().size(),socketChannel); socketChannel.close(); } catch (IOException e){e.printStackTrace();} } } } }}多线程版本:package qiuqi.filedownloadtest;import java.io.FileInputStream;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.;import java.util.Iterator;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class FileServer { static ExecutorService threadpool = Executors.newCachedThreadPool(); public static void main(String[] args) throws IOException { startServer(); } public static void startServer() throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(9999)); serverSocketChannel.configureBlocking(false); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if(key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); threadpool.execute(new Runnable() { @Override public void run() { try (FileInputStream in = new FileInputStream(“C:\Users\dell\Desktop\ZOL手机数据(1).rar”)){ in.getChannel().transferTo(0,in.getChannel().size(),socketChannel); socketChannel.close(); } catch (IOException e){e.printStackTrace();} } }); } } } }}代码就不讲解了。如果学过java nio,那么理解上面的程序轻而易举。如果不熟悉java nio的服务器编程那么请先学习再来观看。最后我想说,java NIO真的是NEW IO即新的IO,而不是NonBlocking IO即非阻塞IO。因为在这套体系里,不仅仅提供了非阻塞的编程模型,而且提供了类似零拷贝,内存映射这样的新技术(对于操作系统来说早就有了)。 ...

March 4, 2019 · 1 min · jiezi

java nio中,为什么客户端一方正常关闭了Socket,而服务端的isReadable()还总是返回true?

我这篇文章想讲的是编程时如何正确关闭tcp连接。首先给出一个网络上绝大部分的java nio代码示例:服务端:1首先实例化一个多路I/O复用器Selector2然后实例化一个ServerSocketChannel3ServerSocketChannel注册为非阻塞(channel.configureBlocking(false);)4ServerSocketChannel注册到Selector,并监听连接事件(serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);)5Selector开始轮询,如果监听到了isAcceptable()事件,就建立一个连接,如果监听到了isReadable()事件,就读数据。6处理完或者在处理每个事件之前将SelectionKey移除出Selector.selectedKeys()代码:package qiuqi.main;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Iterator;public class NioServer { public static void main(String[] args) throws IOException { startServer(); } static void startServer() throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(999)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey sk = iterator.next(); iterator.remove(); if (sk.isAcceptable()) { SocketChannel channel = serverSocketChannel.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } else if (sk.isReadable()) { System.out.println(“读事件!!!”); SocketChannel channel = (SocketChannel) sk.channel(); try { ByteBuffer byteBuffer = ByteBuffer.allocate(200); //这里只读数据,未作任何处理 channel.read(byteBuffer); } catch (IOException e) { //手动关闭channel System.out.println(e.getMessage()); sk.cancel(); if (channel != null) channel.close(); } } } } }}还有说明一下,为什么在if (sk.isReadable()){}这个里面加上异常捕捉,因为可能读数据的时候客户端突然断掉,如果不捕捉这个异常,将会导致整个程序结束。而客户端如果使用NIO编程,那么和服务端很像,然鹅,我们并不需要使用NIO编程,因为这里我想讲的问题和NIO或是普通IO无关,在我想讲的问题上,他俩是一样的,那么我就用普通socket编程来讲解,因为这个好写:)。直接给代码如下:package qiuqi.main;import java.io.IOException;import java.net.InetSocketAddress;import java.net.Socket;public class TraditionalSocketClient { public static void main(String[] args) throws IOException { startClient(); } static void startClient() throws IOException { Socket socket = new Socket(); socket.connect(new InetSocketAddress(999)); socket.getOutputStream().write(new byte[100]); //要注意这个close方法,这是正常关闭socket的方法 //也是导致这个错误的根源 socket.close(); }}我们运行客户端和服务端的代码,输出的结果是:读事件!!!读事件!!!读事件!!!读事件!!!读事件!!!读事件!!!….读事件!!!读事件!!!无限个读事件!!!why???客户端正常关闭,然后显然客户端不可能再给服务端发送任何数据了,服务端怎么可能还有读响应呢?我们现在把客户端代码的最后一行socket.close();这个去掉,再运行一次!输出结果是:读事件!!!读事件!!!远程主机强迫关闭了一个现有的连接。然后。。。就正常了(当然代码里会有异常提示的),这里的正常指的是不会输出多余的读事件!!!了。这又是怎么回事?我们知道如果去掉socket.close();那么客户端是非正常关闭,服务端这边会引发IOException。引发完IOExpection之后,我们的程序在catch{}语句块中手动关闭了channel。既然非正常关闭会引发异常,那么正常关闭呢?什么都不引发?但是这样服务端怎么知道客户端已经关闭了呢?显然服务端会收到客户端的关闭信号(可读数据),而网络上绝大多数代码并没有根据这个关闭信号来结束channel。那么关闭信号是什么?channel.read(byteBuffer);这个语句是有返回值的,大多数情况是返回一个大于等于0的值,表示将多少数据读入byteBuffer缓冲区。然鹅,当客户端正常断开连接的时候,它就会返回-1。虽然这个断开连接信号也是可读数据(会使得isReadable()为true),但是这个信号无法被读入byteBuffer,也就是说一旦返回-1,那么无论再继续读多少次都是-1,并且会引发可读事件isReadable()。因此,这样写问题就能得到解决,下面的代码在try语句块里。 SocketChannel channel = (SocketChannel) sk.channel(); try { ByteBuffer byteBuffer = ByteBuffer.allocate(200); int num; //这里只读数据,未作任何处理 num = channel.read(byteBuffer); if(num == -1) throw new IOException(“读完成”); } catch (IOException e) { System.out.println(e.getMessage()); sk.cancel(); if (channel != null) channel.close(); }这里我根据返回值-1来抛出异常,使得下面的catch语句块捕捉并关闭连接,也可以不抛出异常,直接在try{}里处理。 ...

March 4, 2019 · 1 min · jiezi

Java NIO

1.Java NIO 简介2.Java NIO 与IO 的主要区别3.缓冲区(Buffer)和通道(Channel)4.文件通道(FileChannel)5.NIO 的非阻塞式网络通信选择器(Selector)SocketChannel、ServerSocketChannel、DatagramChannel面向流面向缓冲区Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API。NIO与原来的IO有同样的作用和目的,但是使用的方式完全不同,NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。Java NIO 与IO 的主要区别import java.nio.ByteBuffer;import org.junit.Test;/一、缓冲区(Buffer):在 Java NIO 中负责数据的存取。缓冲区就是数组。用于存储不同数据类型的数据根据数据类型不同(boolean 除外),提供了相应类型的缓冲区:ByteBufferCharBufferShortBufferIntBufferLongBufferFloatBufferDoubleBuffer上述缓冲区的管理方式几乎一致,通过 allocate() 获取缓冲区二、缓冲区存取数据的两个核心方法:put() : 存入数据到缓冲区中get() : 获取缓冲区中的数据三、缓冲区中的四个核心属性:capacity : 容量,表示缓冲区中最大存储数据的容量。一旦声明不能改变。limit : 界限,表示缓冲区中可以操作数据的大小。(limit 后数据不能进行读写)position : 位置,表示缓冲区中正在操作数据的位置。mark : 标记,表示记录当前 position 的位置。可以通过 reset() 恢复到 mark 的位置0 <= mark <= position <= limit <= capacity四、直接缓冲区与非直接缓冲区:非直接缓冲区:通过 allocate() 方法分配缓冲区,将缓冲区建立在 JVM 的内存中直接缓冲区:通过 allocateDirect() 方法分配直接缓冲区,将缓冲区建立在物理内存中。可以提高效率/public class TestBuffer {@Testpublic void test3(){ //分配直接缓冲区 ByteBuffer buf = ByteBuffer.allocateDirect(1024); System.out.println(buf.isDirect());}@Testpublic void test2(){ String str = “abcde”; ByteBuffer buf = ByteBuffer.allocate(1024); buf.put(str.getBytes()); buf.flip(); byte[] dst = new byte[buf.limit()]; buf.get(dst, 0, 2); System.out.println(new String(dst, 0, 2)); System.out.println(buf.position()); //mark() : 标记 buf.mark(); buf.get(dst, 2, 2); System.out.println(new String(dst, 2, 2)); System.out.println(buf.position()); //reset() : 恢复到 mark 的位置 buf.reset(); System.out.println(buf.position()); //判断缓冲区中是否还有剩余数据 if(buf.hasRemaining()){ //获取缓冲区中可以操作的数量 System.out.println(buf.remaining()); }}@Testpublic void test1(){ String str = “abcde”; //1. 分配一个指定大小的缓冲区 ByteBuffer buf = ByteBuffer.allocate(1024); System.out.println("—————–allocate()—————-"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); //2. 利用 put() 存入数据到缓冲区中 buf.put(str.getBytes()); System.out.println("—————–put()—————-"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); //3. 切换读取数据模式 buf.flip(); System.out.println("—————–flip()—————-"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); //4. 利用 get() 读取缓冲区中的数据 byte[] dst = new byte[buf.limit()]; buf.get(dst); System.out.println(new String(dst, 0, dst.length)); System.out.println("—————–get()—————-"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); //5. rewind() : 可重复读 buf.rewind(); System.out.println("—————–rewind()—————-"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); //6. clear() : 清空缓冲区. 但是缓冲区中的数据依然存在,但是处于“被遗忘”状态 buf.clear(); System.out.println("—————–clear()—————-"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); System.out.println((char)buf.get());}}1-通道(Channel)与缓冲区(Buffer)通道和缓冲区Java NIO系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到IO 设备(例如:文件、套接字)的连接。若需要使用NIO 系统,需要获取用于连接IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。缓冲区(Buffer) 缓冲区(Buffer):一个用于特定基本数据类型的容器。由java.nio 包定义的,所有缓冲区都是Buffer 抽象类的子类。 Java NIO 中的Buffer 主要用于与NIO 通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中的。缓冲区(Buffer)Buffer 就像一个数组,可以保存多个相同类型的数据。根据数据类型不同(boolean 除外) ,有以下Buffer 常用子类: ByteBuffer CharBuffer ShortBuffer IntBuffer LongBuffer FloatBuffer DoubleBuffer上述Buffer 类他们都采用相似的方法进行管理数据,只是各自管理的数据类型不同而已。都是通过如下方法获取一个Buffer对象:缓冲区的基本属性Buffer 中的重要概念: 容量(capacity) :表示Buffer 最大数据容量,缓冲区容量不能为负,并且创建后不能更改。 限制(limit):第一个不应该读取或写入的数据的索引,即位于limit 后的数据不可读写。缓冲区的限制不能为负,并且不能大于其容量。 位置(position):下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其限制 标记(mark)与重置(reset):标记是一个索引,通过Buffer 中的mark() 方法指定Buffer 中一个特定的position,之后可以通过调用reset() 方法恢复到这个position.缓冲区的基本属性Buffer 的常用方法缓冲区的数据操作Buffer 所有子类提供了两个用于数据操作的方法:get()与put() 方法获取Buffer 中的数据get() :读取单个字节get(byte[] dst):批量读取多个字节到dst 中get(int index):读取指定索引位置的字节(不会移动position)放入数据到Buffer 中put(byte b):将给定单个字节写入缓冲区的当前位置put(byte[] src):将src 中的字节写入缓冲区的当前位置put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动position) 直接与非直接缓冲区字节缓冲区要么是直接的,要么是非直接的。如果为直接字节缓冲区,则Java 虚拟机会尽最大努力直接在此缓冲区上执行本机I/O 操作。也就是说,在每次调用基础操作系统的一个本机I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。直接字节缓冲区可以通过调用此类的allocateDirect() 工厂方法来创建。此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对应用程序的内存需求量造成的影响可能并不明显。所以,建议将直接缓冲区主要分配给那些易受基础系统的本机I/O 操作影响的大型、持久的缓冲区。一般情况下,最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。直接字节缓冲区还可以通过FileChannel 的map() 方法将文件区域直接映射到内存中来创建。该方法返回MappedByteBuffer 。Java 平台的实现有助于通过JNI 从本机代码创建直接字节缓冲区。如果以上这些缓冲区中的某个缓冲区实例指的是不可访问的内存区域,则试图访问该区域不会更改该缓冲区的内容,并且将会在访问期间或稍后的某个时间导致抛出不确定的异常。字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其isDirect() 方法来确定。提供此方法是为了能够在性能关键型代码中执行显式缓冲区管理。非直接缓冲区直接缓冲区通道(Channel)通道(Channel):由java.nio.channels 包定义的。Channel 表示IO 源与目标打开的连接。Channel 类似于传统的“流”。只不过Channel本身不能直接访问数据,Channel 只能与Buffer 进行交互。通道(Channel)通道(Channel)Java 为Channel 接口提供的最主要实现类如下:•FileChannel:用于读取、写入、映射和操作文件的通道。•DatagramChannel:通过UDP 读写网络中的数据通道。•SocketChannel:通过TCP 读写网络中的数据。•ServerSocketChannel:可以监听新进来的TCP 连接,对每一个新进来的连接都会创建一个SocketChannel。获取通道获取通道的一种方式是对支持通道的对象调用getChannel() 方法。支持通道的类如下: FileInputStream FileOutputStream RandomAccessFile DatagramSocket Socket ServerSocket获取通道的其他方式是使用Files 类的静态方法newByteChannel() 获取字节通道。或者通过通道的静态方法open() 打开并返回指定通道。import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.io.RandomAccessFile;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.MappedByteBuffer;import java.nio.channels.FileChannel;import java.nio.channels.FileChannel.MapMode;import java.nio.charset.CharacterCodingException;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.nio.charset.CharsetEncoder;import java.nio.file.Paths;import java.nio.file.StandardOpenOption;import java.util.Map;import java.util.Map.Entry;import java.util.Set;import org.junit.Test;/* * 一、通道(Channel):用于源节点与目标节点的连接。在 Java NIO 中负责缓冲区中数据的传输。Channel 本身不存储数据,因此需要配合缓冲区进行传输。 * * 二、通道的主要实现类 * java.nio.channels.Channel 接口: * |–FileChannel * |–SocketChannel * |–ServerSocketChannel * |–DatagramChannel * * 三、获取通道 * 1. Java 针对支持通道的类提供了 getChannel() 方法 * 本地 IO: * FileInputStream/FileOutputStream * RandomAccessFile * * 网络IO: * Socket * ServerSocket * DatagramSocket * * 2. 在 JDK 1.7 中的 NIO.2 针对各个通道提供了静态方法 open() * 3. 在 JDK 1.7 中的 NIO.2 的 Files 工具类的 newByteChannel() * * 四、通道之间的数据传输 * transferFrom() * transferTo() * * 五、分散(Scatter)与聚集(Gather) * 分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中 * 聚集写入(Gathering Writes):将多个缓冲区中的数据聚集到通道中 * * 六、字符集:Charset * 编码:字符串 -> 字节数组 * 解码:字节数组 -> 字符串 * */public class TestChannel { //字符集 @Test public void test6() throws IOException{ Charset cs1 = Charset.forName(“GBK”); //获取编码器 CharsetEncoder ce = cs1.newEncoder(); //获取解码器 CharsetDecoder cd = cs1.newDecoder(); CharBuffer cBuf = CharBuffer.allocate(1024); cBuf.put(“威武!”); cBuf.flip(); //编码 ByteBuffer bBuf = ce.encode(cBuf); for (int i = 0; i < 12; i++) { System.out.println(bBuf.get()); } //解码 bBuf.flip(); CharBuffer cBuf2 = cd.decode(bBuf); System.out.println(cBuf2.toString()); System.out.println("——————————————————"); Charset cs2 = Charset.forName(“GBK”); bBuf.flip(); CharBuffer cBuf3 = cs2.decode(bBuf); System.out.println(cBuf3.toString()); } @Test public void test5(){ Map<String, Charset> map = Charset.availableCharsets(); Set<Entry<String, Charset>> set = map.entrySet(); for (Entry<String, Charset> entry : set) { System.out.println(entry.getKey() + “=” + entry.getValue()); } } //分散和聚集 @Test public void test4() throws IOException{ RandomAccessFile raf1 = new RandomAccessFile(“1.txt”, “rw”); //1. 获取通道 FileChannel channel1 = raf1.getChannel(); //2. 分配指定大小的缓冲区 ByteBuffer buf1 = ByteBuffer.allocate(100); ByteBuffer buf2 = ByteBuffer.allocate(1024); //3. 分散读取 ByteBuffer[] bufs = {buf1, buf2}; channel1.read(bufs); for (ByteBuffer byteBuffer : bufs) { byteBuffer.flip(); } System.out.println(new String(bufs[0].array(), 0, bufs[0].limit())); System.out.println("—————–"); System.out.println(new String(bufs[1].array(), 0, bufs[1].limit())); //4. 聚集写入 RandomAccessFile raf2 = new RandomAccessFile(“2.txt”, “rw”); FileChannel channel2 = raf2.getChannel(); channel2.write(bufs); } //通道之间的数据传输(直接缓冲区) @Test public void test3() throws IOException{ FileChannel inChannel = FileChannel.open(Paths.get(“d:/1.mkv”), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get(“d:/2.mkv”), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);// inChannel.transferTo(0, inChannel.size(), outChannel); outChannel.transferFrom(inChannel, 0, inChannel.size()); inChannel.close(); outChannel.close(); } //使用直接缓冲区完成文件的复制(内存映射文件) @Test public void test2() throws IOException{//2127-1902-1777 long start = System.currentTimeMillis(); FileChannel inChannel = FileChannel.open(Paths.get(“d:/1.mkv”), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get(“d:/2.mkv”), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE); //内存映射文件 MappedByteBuffer inMappedBuf = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size()); MappedByteBuffer outMappedBuf = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size()); //直接对缓冲区进行数据的读写操作 byte[] dst = new byte[inMappedBuf.limit()]; inMappedBuf.get(dst); outMappedBuf.put(dst); inChannel.close(); outChannel.close(); long end = System.currentTimeMillis(); System.out.println(“耗费时间为:” + (end - start)); } //利用通道完成文件的复制(非直接缓冲区) @Test public void test1(){//10874-10953 long start = System.currentTimeMillis(); FileInputStream fis = null; FileOutputStream fos = null; //①获取通道 FileChannel inChannel = null; FileChannel outChannel = null; try { fis = new FileInputStream(“d:/1.mkv”); fos = new FileOutputStream(“d:/2.mkv”); inChannel = fis.getChannel(); outChannel = fos.getChannel(); //②分配指定大小的缓冲区 ByteBuffer buf = ByteBuffer.allocate(1024); //③将通道中的数据存入缓冲区中 while(inChannel.read(buf) != -1){ buf.flip(); //切换读取数据的模式 //④将缓冲区中的数据写入通道中 outChannel.write(buf); buf.clear(); //清空缓冲区 } } catch (IOException e) { e.printStackTrace(); } finally { if(outChannel != null){ try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(inChannel != null){ try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(fos != null){ try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } if(fis != null){ try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } } long end = System.currentTimeMillis(); System.out.println(“耗费时间为:” + (end - start)); }} ...

February 4, 2019 · 4 min · jiezi

Spring单例模式与NIO思考

本人java开发,今天在研究秒杀问题的时候,突然间脑子里就产生了这样的思考。 众所周知,Spring默认是单例模式,那么单例模式有什么优缺点呢? 优点一:创建的对象较少!在Tomcat模型为BIO时,会对每一个请求创建一个线程,如果创建的线程数较大,每一个线程中都有@Autowire这种注解,那么创建的对象就会非常之多。另外,如果对象B注入到对象A的属性中,如果对象A被创建了多个对象,那么对象B也会被创建多个,层层依赖。 当然如果是NIO,就没有单例的必要了,因为它只会用一个线程了来处理数据请求,这也注定NIO不能用来IO密集型操作。 缺点一:由于只有一个对象,如果属于类的成员变量,则会被多次调用,类似于类的静态变量。 缺点二:BIO中,如果你在这个对象中的方法上使用了Synchronized,代表锁住的是该对象。

January 28, 2019 · 1 min · jiezi

I/O模型和Java NIO源码分析

最近在学习Java网络编程和Netty相关的知识,了解到Netty是NIO模式的网络框架,但是提供了不同的Channel来支持不同模式的网络通信处理,包括同步、异步、阻塞和非阻塞。学习要从基础开始,所以我们就要先了解一下相关的基础概念和Java原生的NIO。这里,就将最近我学习的知识总结一下,以供大家了解。 为了节约你的时间,本文主要内容如下:异步,阻塞的概念操作系统I/O的类型Java NIO的底层实现异步,同步,阻塞,非阻塞 同步和异步关注的是消息通信机制,所谓同步就是调用者进行调用后,在没有得到结果之前,该调用一直不会返回,但是一旦调用返回,就得到了返回值,同步就是指调用者主动等待调用结果;而异步则相反,执行调用之后直接返回,所以可能没有返回值,等到有返回值时,由被调用者通过状态,通知来通知调用者.异步就是指被调用者来通知调用者调用结果就绪.所以,二者在消息通信机制上有所不同,一个是调用者检查调用结果是否就绪,一个是被调用者通知调用者结果就绪 阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.阻塞调用是指在调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会继续执行.非阻塞调用是指在不能立刻得到结构之前,调用线程不会被挂起,还是可以执行其他事情. 两组概念相互组合就有四种情况,分别是同步阻塞,同步非阻塞,异步阻塞,异步非阻塞.我们来举个例子来分别类比上诉四种情况. 比如你要从网上下载一个1G的文件,按下下载按钮之后,如果你一直在电脑旁边,等待下载结束,这种情况就是同步阻塞;如果你不需要一直呆在电脑旁边,你可以去看一会书,但是你还是隔一段时间来查看一下下载进度,这种情况就是同步非阻塞;如果你一直在电脑旁边,但是下载器在下载结束之后会响起音乐来提醒你,这就是异步阻塞;但是如果你不呆在电脑旁边,去看书,下载器下载结束后响起音乐来提醒你,那么这种情况就是异步非阻塞.Unix的I/O类型 知道上述两组概念之后,我们来看一下Unix下可用的5种I/O模型:阻塞I/O(bloking IO)非阻塞I/O(nonblocking IO)多路复用I/O(IO multiplexing)信号驱动I/O(signal driven IO)异步I/O(asynchronous IO) 前4种都是同步,只有最后一种是异步I/O.需要注意的是Java NIO依赖于Unix系统的多路复用I/O,对于I/O操作来说,它是同步I/O,但是对于编程模型来说,它是异步网络调用.下面我们就以系统read的调用来介绍不同的I/O类型. 当一个read发生时,它会经历两个阶段:1 等待数据准备2 将数据从内核内存空间拷贝到进程内存空间中 不同的I/O类型,在这两个阶段中有不同的行为.但是由于这块内容比较多,而且多为表述性的知识,所以这里我们只给出几张图片来解释,感觉兴趣的同学可以去具体了解一下。Java NIO的底层实现 我们都知道Netty通过JNI的方式提供了Native Socket Transport,为什么Netty要提供自己的Native版本的NIO呢?明明Java NIO底层也是基于epoll调用(最新的版本)的.这里,我们先不明说,大家想一想可能的情况.下列的源码都来自于OpenJDK-8u40-b25版本.open方法 如果我们顺着Selector.open()方法一个类一个类的找下去,很容易就发现Selector的初始化是由DefaultSelectorProvider根据不同操作系统平台生成的不同的SelectorProvider,对于Linux系统,它会生成EPollSelectorProvider实例,而这个实例会生成EPollSelectorImpl作为最终的Selector实现.class EPollSelectorImpl extends SelectorImpl{ ….. // The poll object EPollArrayWrapper pollWrapper; ….. EPollSelectorImpl(SelectorProvider sp) throws IOException { ….. pollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); ….. } …..} EpollArrayWapper将Linux的epoll相关系统调用封装成了native方法供EpollSelectorImpl使用. private native int epollCreate(); private native void epollCtl(int epfd, int opcode, int fd, int events); private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException; 上述三个native方法就对应Linux下epoll相关的三个系统调用//创建一个epoll句柄,size是这个监听的数目的最大值.int epoll_create(int size);//事件注册函数,告诉内核epoll监听什么类型的事件,参数是感兴趣的事件类型,回调和监听的fdint epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);//等待事件的产生,类似于select调用,events参数用来从内核得到事件的集合int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); 所以,我们会发现在EpollArrayWapper的构造函数中调用了epollCreate方法,创建了一个epoll的句柄.这样,Selector对象就算创造完毕了.register方法 与open类似,ServerSocketChannel的register函数底层是调用了SelectorImpl类的register方法,这个SelectorImpl就是EPollSelectorImpl的父类.protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment){ if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); //生成SelectorKey来存储到hashmap中,一共之后获取 SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); //attach用户想要存储的对象 k.attach(attachment); //调用子类的implRegister方法 synchronized (publicKeys) { implRegister(k); } //设置关注的option k.interestOps(ops); return k;} EpollSelectorImpl的相应的方法实现如下,它调用了EPollArrayWrapper的add方法,记录下Channel所对应的fd值,然后将ski添加到keys变量中.在EPollArrayWrapper中有一个byte数组eventLow记录所有的channel的fd值. protected void implRegister(SelectionKeyImpl ski) { if (closed) throw new ClosedSelectorException(); SelChImpl ch = ski.channel; //获取Channel所对应的fd,因为在linux下socket会被当作一个文件,也会有fd int fd = Integer.valueOf(ch.getFDVal()); fdToKey.put(fd, ski); //调用pollWrapper的add方法,将channel的fd添加到监控列表中 pollWrapper.add(fd); //保存到HashSet中,keys是SelectorImpl的成员变量 keys.add(ski); } 我们会发现,调用register方法并没有涉及到EpollArrayWrapper中的native方法epollCtl的调用,这是因为他们将这个方法的调用推迟到Select方法中去了.Select方法 和register方法类似,SelectorImpl中的select方法最终调用了其子类EpollSelectorImpl的doSelect方法protected int doSelect(long timeout) throws IOException { ….. try { …. //调用了poll方法,底层调用了native的epollCtl和epollWait方法 pollWrapper.poll(timeout); } finally { …. } …. //更新selectedKeys,为之后的selectedKeys函数做准备 int numKeysUpdated = updateSelectedKeys(); …. return numKeysUpdated;} 由上述的代码,可以看到,EPollSelectorImpl先调用EPollArrayWapper的poll方法,然后在更新SelectedKeys.其中poll方法会先调用epollCtl来注册先前在register方法中保存的Channel的fd和感兴趣的事件类型,然后epollWait方法等待感兴趣事件的生成,导致线程阻塞.int poll(long timeout) throws IOException { updateRegistrations(); ////先调用epollCtl,更新关注的事件类型 ////导致阻塞,等待事件产生 updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); ….. return updated;} 等待关注的事件产生之后(或在等待时间超过预先设置的最大时间),epollWait函数就会返回.select函数从阻塞状态恢复.selectedKeys方法 我们先来看SelectorImpl中的selectedKeys方法.//是通过Util.ungrowableSet生成的,不能添加,只能减少private Set<SelectionKey> publicSelectedKeys;public Set<SelectionKey> selectedKeys() { …. return publicSelectedKeys;} 很奇怪啊,怎麽直接就返回publicSelectedKeys了,难道在select函数的执行过程中有修改过这个变量吗? publicSelectedKeys这个对象其实是selectedKeys变量的一份副本,你可以在SelectorImpl的构造函数中找到它们俩的关系,我们再回头看一下select中updateSelectedKeys方法.private int updateSelectedKeys() { //更新了的keys的个数,或在说是产生的事件的个数 int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i<entries; i++) { //对应的channel的fd int nextFD = pollWrapper.getDescriptor(i); //通过fd找到对应的SelectionKey SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); if (ski != null) { int rOps = pollWrapper.getEventOps(i); //更新selectedKey变量,并通知响应的channel来做响应的处理 if (selectedKeys.contains(ski)) { if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; } } else { ski.channel.translateAndSetReadyOps(rOps, ski); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { selectedKeys.add(ski); numKeysUpdated++; } } } } return numKeysUpdated;}后记 看到这里,详细大家都已经了解到了NIO的底层实现了吧.这里我想在说两个问题. 一是为什么Netty自己又从新实现了一边native相关的NIO底层方法? 听听Netty的创始人是怎麽说的吧链接。因为Java的版本使用的epoll的level-triggered模式,而Netty则希望使用edge-triggered模式,而且Java版本没有将epoll的部分配置项暴露出来,比如说TCP_CORK和SO_REUSEPORT。 二是看这么多源码,花费这么多时间有什么作用呢?我感觉如果从非功利的角度来看,那么就是纯粹的希望了解的更多,有时候看完源码或在理解了底层原理之后,都会用一种恍然大悟的感觉,比如说AQS的原理.如果从目的性的角度来看,那么就是你知道底层原理之后,你的把握性就更强了,如果出了问题,你可以更快的找出来,并且解决.除此之外,你还可以按照具体的现实情况,以源码为模板在自己造轮子,实现一个更加符合你当前需求的版本. 后续如果有时间,我希望好好了解一下epoll的操作系统级别的实现原理. ...

January 7, 2019 · 2 min · jiezi

Java NIO中Write事件和Connect事件

NIO Server端多路复用开发的一般步骤是://打开选择器Selector selector = Selector.open();//打开通到ServerSocketChannel socketChannel = ServerSocketChannel.open();//配置非阻塞模型socketChannel.configureBlocking(false);//绑定端口socketChannel.bind(new InetSocketAddress(8080));//注册事件,OP_ACCEPT只适用于ServerSocketChannel socketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iter = selectionKeys.iterator(); while(iter.hasNext()) { SelectionKey key = iter.next(); if(key.isAcceptable()) { SocketChannel channel = ((ServerSocketChannel)key.channel()).accept(); channel.configureBlocking(false); channel.register(selector,SelectionKey.OP_READ); } if(key.isWritable()) { } if(key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(512); channel.read(readBuffer); readBuffer.flip(); //handler Buffer //一般是响应客户端的数据 //直接是write写不就完事了嘛,为啥需要write事件? //channel.write(…) } iter.remove(); }}刚开始对NIO的写操作理解的不深,不知道为什么要注册写事件,何时注册写事件,为什么写完之后要取消注册写事件。如果有channel在Selector上注册了SelectionKey.OP_WRITE事件,在调用selector.select();时,系统会检查内核写缓冲区是否可写(什么时候是不可写的呢,比如缓冲区已满,channel调用了shutdownOutPut等等),如果可写,selector.select();立即返回,随后进入key.isWritable()分支。当然你在channel上可以直接调用write(…),也可以将数据发送出去,但这样不够灵活,而且可能浪费CPU。看一个场景,服务端需要发送一个200M的Buffer,看看使用OP_WRITE事件和不使用的区别。//不使用事件,缺点是,程序运行到这会等到200M文件发送完成后才继续往下执行,不符合异步事件模型//的编程思想,如果缓冲区一直处于不可写状态,那么该过程一直在这里死循环,浪费了CPU资源。ByteBuffer buffer = …. //200M的Bufferwhile(buffer.hasRemaining()) { //该方法只会写入小于socket’s output buffer空闲区域的任何字节数 //并返回写入的字节数,可能是0字节。 channel.write(buffer);}//使用事件的方式,谁好谁坏,一看便知if(key.isReadable()) { ByteBuffer buffer = …. //200M的Buffer //注册写事件 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); //绑定Buffer key.attach(buffer);}//isWritable分支if(key.isWritable()) { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel channel = (SocketChannel) key.channel(); if (buffer.hasRemaining()) { channel.write(buffer) } else { //发送完了就取消写事件,否则下次还会进入该分支 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); }}客户端开发的一般步骤://打开选择器Selector selector = Selector.open();//打开通道SocketChannel socketChannel = SocketChannel.open();//配置非阻塞模型socketChannel.configureBlocking(false);//连接远程主机socketChannel.connect(new InetSocketAddress(“127.0.0.1”,8080));//注册事件socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);//循环处理while (true) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while(iter.hasNext()) { SelectionKey key = iter.next(); if(key.isConnectable()) { //连接建立或者连接建立不成功 SocketChannel channel = (SocketChannel) key.channel(); //完成连接的建立 if(channel.finishConnect()) { } } if(key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(500 * 1024 * 1024); buffer.clear(); channel.read(buffer); //buffer Handler } iter.remove(); }}起初对OP_CONNECT事件还有finishConnect不理解,OP_CONNECT事件何时触发,特别是为什么要在key.isConnectable()分支里调用finishConnect方法后才能进行读写操作。首先,在non-blocking模式下调用socketChannel.connect(new InetSocketAddress(“127.0.0.1”,8080));连接远程主机,如果连接能立即建立就像本地连接一样,该方法会立即返回true,否则该方法会立即返回false,然后系统底层进行三次握手建立连接。连接有两种结果,一种是成功连接,第二种是异常,但是connect方法已经返回,无法通过该方法的返回值或者是异常来通知用户程序建立连接的情况,所以由OP_CONNECT事件和finishConnect方法来通知用户程序。不管系统底层三次连接是否成功,selector都会被唤醒继而触发OP_CONNECT事件,如果握手成功,并且该连接未被其他线程关闭,finishConnect会返回true,然后就可以顺利的进行channle读写。如果网络故障,或者远程主机故障,握手不成功,用户程序可以通过finishConnect方法获得底层的异常通知,进而处理异常。 ...

January 5, 2019 · 1 min · jiezi

从零讲解搭建一个NIO消息服务端

本文首发于猫叔的博客 | MySelf,如需转载,请申明出处.假设假设你已经了解并实现过了一些OIO消息服务端,并对异步消息服务端更有兴趣,那么本文或许能带你更好的入门,并了解JDK部分源码的关系流程,正如题目所说,笔者将竟可能还原,以初学者能理解的角度,讲诉并构建一个NIO消息服务端。启动通道并注册选择器启动模式感谢Java一直在持续更新,对应的各个API也做得越来越好了,我们本次生成 服务端套接字通道 也是使用到JDK提供的一个方式 open ,我们将启动一个 ServerSocketChannel ,他是一个 支持同步异步模式 的 服务端套接字通道 。它是一个抽象类,官方给了推荐的方式 open 来开启一个我们需要的 服务端套接字通道实例 。(如下的官方源码相关注释)/** * A selectable channel for stream-oriented listening sockets. / public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel{ /* * Opens a server-socket channel. / public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); }}那么好了,我们现在可以确定我们第一步的代码是什么样子的了!没错,和你想象中的一样,这很简单。public class NioServer { public void server(int port) throws IOException{ //1、打开服务器套接字通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); }}本节的重点是 启动模式 ,那么这意味着,我们需要向 ServerSocketChannel 进行标识,那么它是否提供了对用的方法设置 同步异步(阻塞非阻塞) 呢?这很明显,它是提供的,这也是它的核心功能之一,其实应该是它继承的 父抽象类AbstractSelectableChannel 的实现方法: configgureBlocking(Boolean),这个方法将标识我们的 服务端套接字通道 是否阻塞模式。(如下的官方源码相关注释)/* * Base implementation class for selectable channels. / public abstract class AbstractSelectableChannel extends SelectableChannel{ /* * Adjusts this channel’s blocking mode. / public final SelectableChannel configureBlocking(boolean block) throws IOException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if (blocking == block) return this; if (block && haveValidKeys()) throw new IllegalBlockingModeException(); implConfigureBlocking(block); blocking = block; } return this; }}那么,我们现在可以进行 启动模式的配置 了,读者很聪明。我们的项目Demo可以这样写: false为非阻塞模式、true为阻塞模式 。public class NioServer { public void server(int port) throws IOException{ //1、打开服务器套接字通道 ServerSocketChannel serverSocketzhannel = ServerSocketChannel.open(); //2、设定为非阻塞、调整此通道的阻塞模式。 serverSocketChannel.configureBlocking(false); }}若未配置阻塞模式,注册选择器 会报 java.nio.channels.IllegalBlockingModeException 异常,相关将于该小节大致讲解说明。套接字地址端口绑定做过消息通讯服务器的朋友应该都清楚,我们需要向服务端 指定IP与端口 ,即使是NIO服务器也是一样的,否则,我们的客户端会报 java.net.ConnectException: Connection refused: connect 异常对于NIO的地址端口绑定,我们也需要用到 ServerSocket服务器套接字 。我们知道在写OIO服务端的时候,我们可能仅仅需要写一句即可,如下。 //将服务器绑定到指定端口 final ServerSocket socket = new ServerSocket(port);当然,JDK在实现NIO的时候就已经想到了,同样,我们可以使用 服务器套接字通道 来获取一个 ServerSocket服务器套接字 。这时的它并没有绑定端口,我们需要对应绑定地址,这个类自身就有一个 bind 方法。(如下源码相关注释)/* * This class implements server sockets. A server socket waits for * requests to come in over the network. It performs some operation * based on that request, and then possibly returns a result to the requester. /public class ServerSocket implements java.io.Closeable { /* * * Binds the {@code ServerSocket} to a specific address * (IP address and port number). / public void bind(SocketAddress endpoint) throws IOException { bind(endpoint, 50); }}通过源码,我们知道,绑定iP与端口 需要一个SocketAddress类,我们仅需要将 IP与端口配置到对应的SocketAddress类 中即可。其实JDK中,已经有了一个更加方便且继承了SocketAddress的类:InetSocketAddress。InetSocketAddress有一个需要一个port为参数的构造方法,它将创建 一个ip为通配符、端口为指定值的套接字地址 。这很方便我们的开发,对吧?(如下源码相关注释)/* * * This class implements an IP Socket Address (IP address + port number) * It can also be a pair (hostname + port number), in which case an attempt * will be made to resolve the hostname. If resolution fails then the address * is said to be <I>unresolved</I> but can still be used on some circumstances * like connecting through a proxy. / public class InetSocketAddress extends SocketAddress{ /* * Creates a socket address where the IP address is the wildcard address * and the port number a specified value. / public InetSocketAddress(int port) { this(InetAddress.anyLocalAddress(), port); }}好了,那么接下来我们的项目代码可以继续添加绑定IP与端口了,我想聪明的你应该有所感觉了。public class NioServer { public void server(int port) throws IOException{ //1、打开服务器套接字通道 ServerSocketChannel serverSocketzhannel = ServerSocketChannel.open(); //2、设定为非阻塞、调整此通道的阻塞模式。 serverSocketChannel.configureBlocking(false); //3、检索与此通道关联的服务器套接字。 ServerSocket serverSocket = serverSocketChannel.socket(); //4、此类实现 ip 套接字地址 (ip 地址 + 端口号) InetSocketAddress address = new InetSocketAddress(port); //5、将服务器绑定到选定的套接字地址 serverSocket.bind(address); }}正如开头我们所说的,你的项目中不添加3-5环节的代码并没有问题,但是当客户端接入时,则会报错,因为客户端将要 接入的地址是连接不到的 ,如会报这样的错误。java.net.ConnectException: Connection refused: connect at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.Net.connect(Net.java:449) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647) at com.github.myself.WebClient.main(WebClient.java:16)注册选择器接下来会是 NIO实现的重点 ,可能有点难理解,如果希望大家能一次理解,完全深入有点难讲明白,不过先大致点一下。首先要先介绍以下JDK实现NIO的核心:多路复用器(Selector)——选择器先简单并抽象的理解下,Java通过 选择器来实现处理多个Channel链接 ,将空闲未进行数据操作的搁置,优先执行有需求的数据传输,即 通过一个选择器来选择谁需要谁不需要使用共享的线程 。由此,理所当然,这样的选择器应该也有Java自己定义的获取方法, 其自身的 open 就是启动一个这样的选择器。(如下源码相关注释)/* * A multiplexor of {@link SelectableChannel} objects. / public abstract class Selector implements Closeable { /* * Opens a selector. / public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); } }那么现在,我们还要考虑一件事情,我们的 服务器套接字通道 要如何与 选择器 相关联呢?ServerSocketChannel 有一个注册的方法,这个方法就是将它们两个进行了关联,同时这个注册方法 除了关联选择器外,还标识了注册的状态 ,让我们先看看源码吧。以下的 ServerSocketChannel 继承 —》 AbstractSelectableChannel 继承 —》 SelectableChannel/* * A channel that can be multiplexed via a {@link Selector}. / public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel{ /* * Registers this channel with the given selector, returning a selection * key. / public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException { return register(sel, ops, null); }}我们一般需要将选择器注册上去,并将 ServerSocketChannel 标识为 接受连接 的状态。我们先看看我们的项目代码应该如何写。public class NioServer { public void server(int port) throws IOException{ //1、打开服务器套接字通道 ServerSocketChannel serverSocketzhannel = ServerSocketChannel.open(); //2、设定为非阻塞、调整此通道的阻塞模式。 serverSocketChannel.configureBlocking(false); //3、检索与此通道关联的服务器套接字。 ServerSocket serverSocket = serverSocketChannel.socket(); //4、此类实现 ip 套接字地址 (ip 地址 + 端口号) InetSocketAddress address = new InetSocketAddress(port); //5、将服务器绑定到选定的套接字地址 serverSocket.bind(address); //6、打开Selector来处理Channel Selector selector = Selector.open(); //7、将ServerSocket注册到Selector已接受连接,注册会判断是否为非阻塞模式 SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer readBuff = ByteBuffer.allocate(1024); final ByteBuffer msg = ByteBuffer.wrap(“Hi!\r\n”.getBytes()); while(true){ //下方代码….. } }}注意: 我们前面说到,如果 ServerSocketChannel 没有启动非阻塞模式,那么我们在启动的时候会报 java.lang.IllegalArgumentException 异常,这是为什么呢? 我想我们可能需要更深入底层去看看 register 这个方法(如下源码注释)/* * Base implementation class for selectable channels. / public abstract class AbstractSelectableChannel extends SelectableChannel{ /* * Registers this channel with the given selector, returning a selection key. / public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (blocking) throw new IllegalBlockingModeException(); SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }}我想我们终于真相大白了,原来注册这个方法会对 ServerSocketChannel 的一系列参数进行 校验 ,只有通过,才能注册成功,所以我们也明白了,为什么 非阻塞是false,同时我们也可以看到,它还对我们所给的标识做了校验,一点要优先注册 接受连接(OP_ACCEPT) 这个状态才行,不然依旧会报 java.lang.IllegalArgumentException 异常。这里解释一下,之所以只接受 OP_ACCEPT ,是因为如果没有一个接受其他链接的主服务,那么通信根本无从说起,同时这样的标识在我们的NIO服务端中 只允许标识一次(一个ServerSocketChannel) 。 可能大家还会好奇有什么标识,我想源码的说明确实写的很清楚了。/* * Operation-set bit for read operations. / public static final int OP_READ = 1 << 0;/* * Operation-set bit for write operations. / public static final int OP_WRITE = 1 << 2;/* * Operation-set bit for socket-connect operations. / public static final int OP_CONNECT = 1 << 3;/* * Operation-set bit for socket-accept operations. / public static final int OP_ACCEPT = 1 << 4;好了,这里给一个调试截图,希望大家也可以慢慢的摸索一下。注意这里的服务端并没有构建完成哦,我们还需要下面的几个步骤。NIO选择实例与兴趣点客户端代码说到这里,我们暂时先休息下,转头看看 客户端的代码 吧,这里就简单的介绍下,我们将建立 一个针对服务地址端口的连接 ,然后不停的循环 写操作与读操作 ,没有对客户端进行 关闭操作。大家如果有兴趣的话,也可以自己调试,并看看部分类的JDK源码,如下给出本项目案例的客户端代码。public class WebClient { public static void main(String[] args) throws IOException { try { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress(“0.0.0.0”,8090)); ByteBuffer writeBuffer = ByteBuffer.allocate(32); ByteBuffer readBuffer = ByteBuffer.allocate(32); writeBuffer.put(“hello”.getBytes()); writeBuffer.flip(); while (true){ writeBuffer.rewind(); socketChannel.write(writeBuffer); readBuffer.clear(); socketChannel.read(readBuffer); readBuffer.flip(); System.out.println(new String(readBuffer.array())); } }catch (IOException e){ e.printStackTrace(); } }}准备IO接入操作这里有点复杂,我也尽可能的思考了表达的方式,首先我们先明确一下,所有的连接都会被Selector所囊括,即我们要获取新接入的连接,也要通过Selector来获取,我们一开始启动的 服务器套接字通道ServerSocketChannel 起到一个接入入口(或许不够准确)的作用,客户端连接通过IP与端口进入后,会 被注册的Selector所获取 到,成为 Selector 其中的一员。但是这里的一员 并不会包括一开始注册并被标志为接收连接 的 ServerSocketChannel 。Selector有这样一个方法,它会自动去等待新的连接事件,如果没有连接接入,那么它将一直处于阻塞状态。通过字面意思我们可以大致这样写代码。while(true){ try{ //1、等到需要处理的新事件:阻塞将一直持续到下一个传入事件 selector.select(); }catch(IOException e){ e.printStackTrace(); break; }}那么这样写好像有点像样,毕竟异常我们也捕获了,同时也使用了刚刚 开启并注册完毕的选择器Selector。让我们看看源码中对于这个方法 select 的注释吧。/* * A multiplexor of {@link SelectableChannel} objects. / public abstract class Selector implements Closeable { /* * Selects a set of keys whose corresponding channels are ready for I/O * operations. */ public abstract int select() throws IOException; }好的,看样子是对的,它将返回一组套接字通道已经准备好执行I/O操作的键。那么这个Key究竟是什么呢?这里可能直观的感受下会更好。如下图是我调试下看到的key对象,我想大家应该可以理解了,这个Key中也会 存放对应连接的Channel与Selector 。具体的内部更深层的就探讨了。那么这也解决了我们接下来的 一个疑问 ,我们要怎么向Selector拿连接进来的实例呢?答案很明显,我们仅需要 获取到这个Keys 就好了。选择键集合操作对于获取Keys这个现在应该已经不是什么问题了,通过上面章节的了解,我想大家也可以想到这样的大致语法。//获取所有接收事件的SelectionKey实例Set<SelectionKey> readykeys = selector.selectedKeys();大家或许会好奇,这里的Key对象居然是前面的 SelectionKey.OP_ACCEPT 对象,是的,这也是接下来要讲的,这很奇妙,也很好玩。前面说到的标识,这是每一个Key自有的,并且是可以 改变的状态 ,在刚刚连接的时候,或许我应该大致的描述一下 一个新连接进入选择器后的流程 :select方法将接受到新接入的连接事件,它会被Selector以Key的形式存储,这时我们需要 对其进行判断 ,是否是已经就绪可以被接受的连接,如果是,这时我们需要 获取这个连接 ,同时也将其设定为 非阻塞的状态 ,并将它 注册到选择器上(当然,这时的标识就不能是一开始的 OP_ACCEPT ),你可以选择性的 注册它的标识 ,之后我们可以通过循环遍历Keys来,让 某一标识的连接去执行对应的操作 。说到这里,我想部分新手可能会有点模糊,我想我还是把接下来的代码都一起放出来吧,大家先看看是否能够再次结合文本进行了解。while (true){ try { //等到需要处理的新事件:阻塞将一直持续到下一个传入事件 selector.select(); }catch (IOException e){ e.printStackTrace(); break; } //获取所有接收事件的SelectionKey实例 Set<SelectionKey> readykeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readykeys.iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); try { //检查事件是否是一个新的已经就绪可以被接受的连接 if (key.isAcceptable()){ //channel:返回为其创建此键的通道。 即使在取消密钥后, 此方法仍将继续返回通道。 ServerSocketChannel server = (ServerSocketChannel)key.channel(); //可选择的通道, 用于面向流的连接插槽。 SocketChannel client = server.accept(); //设定为非阻塞 client.configureBlocking(false); //接受客户端,并将它注册到选择器,并添加附件 client.register(selector,SelectionKey.OP_WRITE | SelectionKey.OP_READ,msg.duplicate()); System.out.println(“Accepted connection from " + client); } //检查套接字是否已经准备好读数据 if (key.isReadable()){ SocketChannel client = (SocketChannel)key.channel(); readBuff.clear(); client.read(readBuff); readBuff.flip(); System.out.println(“received:"+new String(readBuff.array())); //将此键的兴趣集设置为给定的值。 OP_WRITE key.interestOps(SelectionKey.OP_WRITE); } //检查套接字是否已经准备好写数据 if (key.isWritable()){ SocketChannel client = (SocketChannel)key.channel(); //attachment : 检索当前附件 ByteBuffer buffer = (ByteBuffer)key.attachment(); buffer.rewind(); client.write(buffer); //将此键的兴趣集设置为给定的值。 OP_READ key.interestOps(SelectionKey.OP_READ); } }catch (IOException e){ e.printStackTrace(); } }}提示:读到此处,还请各位读者能运行整个demo,并调试下,看看与自己理解的是否有差别。流程效果以下我简单叙述一下,我在调试时的理解与效果。1、启动服务端后,运行到 selector.select(); 后阻塞,因为没有监听到新的连接。2、启动客户端后,selector.select() 监听到新连接,往下执行获取到的Keys的size为1,进入Key标识分支判断3、key.isAcceptable() 首次接入为true,设置为非阻塞,并注释到选择器中修改标识为 SelectionKey.OP_WRITE | SelectionKey.OP_READ ,同时添加附件信息 msg.duplicate() ,首次循环结束4、二次循环,连接未关闭,获取到的Keys的size为1,进入Key标识分支判断。5、由于第一次该Key标识改变,所以这次 key.isAcceptable() 为false,而由于改了标识,所以接下来的 key.isReadable() 、 key.isWritable() 都为true,执行读写操作,循环结束。6、接下来的循环,基本上是key.isReadable() 、 key.isWritable() 都为true,执行读写操作。7、设想一下,如果多加一条链接是什么效果。回顾这里给出几个代码的注意点,希望大家可以自己去了解学习。1、关于 ByteBuffer 本文并不重点讲解,大家可以自行了解2、关于Key标识判断的代码,以下两句的删减是否会对代码有所影响呢?key.interestOps(SelectionKey.OP_WRITE);key.interestOps(SelectionKey.OP_READ);3、如果删除了2中的代码,并把客户端注册选择器并给标识的代码改为以下,那么项目运行效果怎么样呢?client.register(selector, SelectionKey.OP_READ,msg.duplicate());4、如果改了3的代码,可是不删除2的代码,那么效果又是怎么样呢?答案留给读者去揭晓吧,如果你有答案,欢迎留言。个人相关项目InChat : 一个轻量级、高效率的支持多端(应用与硬件Iot)的异步网络应用通讯框架 ...

December 26, 2018 · 5 min · jiezi

NIO 之 WatchService

NIO 之 WatchServiceJava 1.6版本以前是不存在目录监控的API的。如果要实现这种功能必须要自己遍历目录,记录各个文件的情况,然后定时全部遍历一次,从 JDK7 之后出现了 WatchService 类,实现了对目录下文件的监控。整体流程整个监控目录文件操作的流程大致如下:获取 WatchService注册指定目录的监视器 WatchService等待目录下的文件发生变化对发生变化的文件进行操作获取 WatchService 实例WatchService 类的实现实际上是对操作系统的文件监视器的封装,相比之前的手动实现,优雅了不少。因为不需要遍历文件整体而言效率也高很多。以下为获取 WatchService 实例的代码,通过 FileSystem.getDefault() 可看出并非是自己实现的。从 newWatchService() 方法名看, WatchService 可以获取多个。WatchService watchService = FileSystems.getDefault().newWatchService();实际上调用此方法后,程序会新开一个线程,监视文件变化发出的信号,此时线程尚未就绪。为目录注册监视器有了监视器,接下来我们需要注册监视器了。Path path = Paths.get(“src”);WatchKey watchKey = path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);注册监视器需要用到 Path 实例,该实例对应的必须是一个目录,不允许是一个文件。方法比较简单,就是说为目录注册一个监视器,监视目录下文件的变化。关于 StandardWatchEventKinds.ENTRY_MODIFY ,表示监视文件的修改事件,它是 WatchEvent.Kind<?> 的实现类。看起来它像是枚举,实际上它并不是。JDK 中帮我们定义了三种事件,新增、修改和删除。获取目录下的变化获取目录的变化需要使用 WatchService 的 take() 方法或 poll() 方法。WatchKey key = watchService.take();WatchKey pollKey = watchService.poll();take() 是一个阻塞方法,会等待监视器发出的信号才返回。poll() 是一个非阻塞方法,会立即返回当时监视器中是否有信号。返回的 WatchKey 对象,实际上是一个单例,和之前 path.register() 方法返回的实例是同一个。它只能保存某一时间点的文件变化信息。处理文件变化事件List<WatchEvent<?>> events = key.pollEvents();for (WatchEvent<?> pollEvent : events) { Object o = pollEvent.context(); WatchEvent.Kind kind = pollEvent.kind();}key.reset();pollEvents() 用于获取文件变化事件,只能获取一次,不能重复获取,类似队列的形式。context() 返回触发该事件的那个文件或目录的路径(相对路径)kind() 返回事件类型(ENTRY_CREATE、ENTRY_DELETE、ENTRY_MODIFY之一)reset() 每次调用 WatchService 的 take() 或 poll() 方法时需要通过本方法重置。一个简单的例子public void watchServiceExample() throws IOException, InterruptedException { WatchService watchService = FileSystems.getDefault().newWatchService(); Path path = Paths.get(“D:/code”); WatchKey watchKey = path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY); while (true) { // 尝试获取下一个变化信息的监控池,如果没有变化则一直等待 WatchKey key = watchService.take(); for (WatchEvent<?> pollEvent : key.pollEvents()) { System.out.println(String.format("%s is %s.", pollEvent.context().toString(), pollEvent.kind().name().substring(6))); } if (!key.reset()) { break; } }}总结WatchService 的优点就不用多说了,这里就说一个缺点: 只能监视当前目录下的文件和目录,不能监视子目录 。参考[疯狂Java]NIO.2:WatchService、WatchKey(监控文件变化)本文如有问题,欢迎在评论区中指正。 ...

December 25, 2018 · 1 min · jiezi

一文让你彻底理解 Java NIO 核心组件

同步、异步、阻塞、非阻塞首先,这几个概念非常容易搞混淆,但NIO中又有涉及,所以总结一下[1]。同步:API调用返回时调用者就知道操作的结果如何了(实际读取/写入了多少字节)。异步:相对于同步,API调用返回时调用者不知道操作的结果,后面才会回调通知结果。阻塞:当无数据可读,或者不能写入所有数据时,挂起当前线程等待。非阻塞:读取时,可以读多少数据就读多少然后返回,写入时,可以写入多少数据就写入多少然后返回。对于I/O操作,根据Oracle官网的文档,同步异步的划分标准是“调用者是否需要等待I/O操作完成”,这个“等待I/O操作完成”的意思不是指一定要读取到数据或者说写入所有数据,而是指真正进行I/O操作时,比如数据在TCP/IP协议栈缓冲区和JVM缓冲区之间传输的这段时间,调用者是否要等待。所以,我们常用的 read() 和 write() 方法都是同步I/O,同步I/O又分为阻塞和非阻塞两种模式,如果是非阻塞模式,检测到无数据可读时,直接就返回了,并没有真正执行I/O操作。总结就是,Java中实际上只有 同步阻塞I/O、同步非阻塞I/O 与 异步I/O 三种机制,我们下文所说的是前两种,JDK 1.7才开始引入异步 I/O,那称之为NIO.2。传统IO我们知道,一个新技术的出现总是伴随着改进和提升,Java NIO的出现亦如此。传统 I/O 是阻塞式I/O,主要问题是系统资源的浪费。比如我们为了读取一个TCP连接的数据,调用 InputStream 的 read() 方法,这会使当前线程被挂起,直到有数据到达才被唤醒,那该线程在数据到达这段时间内,占用着内存资源(存储线程栈)却无所作为,也就是俗话说的占着茅坑不拉屎,为了读取其他连接的数据,我们不得不启动另外的线程。在并发连接数量不多的时候,这可能没什么问题,然而当连接数量达到一定规模,内存资源会被大量线程消耗殆尽。另一方面,线程切换需要更改处理器的状态,比如程序计数器、寄存器的值,因此非常频繁的在大量线程之间切换,同样是一种资源浪费。随着技术的发展,现代操作系统提供了新的I/O机制,可以避免这种资源浪费。基于此,诞生了Java NIO,NIO的代表性特征就是非阻塞I/O。紧接着我们发现,简单的使用非阻塞I/O并不能解决问题,因为在非阻塞模式下,read()方法在没有读取到数据时就会立即返回,不知道数据何时到达的我们,只能不停的调用read()方法进行重试,这显然太浪费CPU资源了,从下文可以知道,Selector组件正是为解决此问题而生。Java NIO 核心组件1.Channel概念Java NIO中的所有I/O操作都基于Channel对象,就像流操作都要基于Stream对象一样,因此很有必要先了解Channel是什么。以下内容摘自JDK 1.8的文档A channel represents an open connection to an entity such as ahardware device, a file, a network socket, or a program component thatis capable of performing one or more distinct I/O operations, forexample reading or writing.从上述内容可知,一个Channel(通道)代表和某一实体的连接,这个实体可以是文件、网络套接字等。也就是说,通道是Java NIO提供的一座桥梁,用于我们的程序和操作系统底层I/O服务进行交互。通道是一种很基本很抽象的描述,和不同的I/O服务交互,执行不同的I/O操作,实现不一样,因此具体的有FileChannel、SocketChannel等。通道使用起来跟Stream比较像,可以读取数据到Buffer中,也可以把Buffer中的数据写入通道。当然,也有区别,主要体现在如下两点:一个通道,既可以读又可以写,而一个Stream是单向的(所以分 InputStream 和 OutputStream)通道有非阻塞I/O模式实现Java NIO中最常用的通道实现是如下几个,可以看出跟传统的 I/O 操作类是一一对应的。FileChannel:读写文件DatagramChannel: UDP协议网络通信SocketChannel:TCP协议网络通信ServerSocketChannel:监听TCP连接2.BufferNIO中所使用的缓冲区不是一个简单的byte数组,而是封装过的Buffer类,通过它提供的API,我们可以灵活的操纵数据,下面细细道来。与Java基本类型相对应,NIO提供了多种 Buffer 类型,如ByteBuffer、CharBuffer、IntBuffer等,区别就是读写缓冲区时的单位长度不一样(以对应类型的变量为单位进行读写)。Buffer中有3个很重要的变量,它们是理解Buffer工作机制的关键,分别是capacity (总容量)position (指针当前位置)limit (读/写边界位置)Buffer的工作方式跟C语言里的字符数组非常的像,类比一下,capacity就是数组的总长度,position就是我们读/写字符的下标变量,limit就是结束符的位置。Buffer初始时3个变量的情况如下图在对Buffer进行读/写的过程中,position会往后移动,而 limit 就是 position 移动的边界。由此不难想象,在对Buffer进行写入操作时,limit应当设置为capacity的大小,而对Buffer进行读取操作时,limit应当设置为数据的实际结束位置。(注意:将Buffer数据 写入 通道是Buffer 读取 操作,从通道 读取 数据到Buffer是Buffer 写入 操作)在对Buffer进行读/写操作前,我们可以调用Buffer类提供的一些辅助方法来正确设置 position 和 limit 的值,主要有如下几个flip(): 设置 limit 为 position 的值,然后 position 置为0。对Buffer进行读取操作前调用。rewind(): 仅仅将 position 置0。一般是在重新读取Buffer数据前调用,比如要读取同一个Buffer的数据写入多个通道时会用到。clear(): 回到初始状态,即 limit 等于 capacity,position 置0。重新对Buffer进行写入操作前调用。compact(): 将未读取完的数据(position 与 limit 之间的数据)移动到缓冲区开头,并将 position 设置为这段数据末尾的下一个位置。其实就等价于重新向缓冲区中写入了这么一段数据。然后,看一个实例,使用 FileChannel 读写文本文件,通过这个例子验证通道可读可写的特性以及Buffer的基本用法(注意 FileChannel 不能设置为非阻塞模式)。FileChannel channel = new RandomAccessFile(“test.txt”, “rw”).getChannel();channel.position(channel.size()); // 移动文件指针到末尾(追加写入)ByteBuffer byteBuffer = ByteBuffer.allocate(20);// 数据写入BufferbyteBuffer.put(“你好,世界!\n”.getBytes(StandardCharsets.UTF_8));// Buffer -> ChannelbyteBuffer.flip();while (byteBuffer.hasRemaining()) { channel.write(byteBuffer);}channel.position(0); // 移动文件指针到开头(从头读取)CharBuffer charBuffer = CharBuffer.allocate(10);CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();// 读出所有数据byteBuffer.clear();while (channel.read(byteBuffer) != -1 || byteBuffer.position() > 0) { byteBuffer.flip(); // 使用UTF-8解码器解码 charBuffer.clear(); decoder.decode(byteBuffer, charBuffer, false); System.out.print(charBuffer.flip().toString()); byteBuffer.compact(); // 数据可能有剩余}channel.close();这个例子中使用了两个Buffer,其中 byteBuffer 作为通道读写的数据缓冲区,charBuffer 用于存储解码后的字符。clear() 和 flip() 的用法正如上文所述,需要注意的是最后那个 compact() 方法,即使 charBuffer 的大小完全足以容纳 byteBuffer 解码后的数据,这个 compact() 也必不可少,这是因为常用中文字符的UTF-8编码占3个字节,因此有很大概率出现在中间截断的情况,请看下图:当 Decoder 读取到缓冲区末尾的 0xe4 时,无法将其映射到一个 Unicode,decode()方法第三个参数 false 的作用就是让 Decoder 把无法映射的字节及其后面的数据都视作附加数据,因此 decode() 方法会在此处停止,并且 position 会回退到 0xe4 的位置。如此一来, 缓冲区中就遗留了“中”字编码的第一个字节,必须将其 compact 到前面,以正确的和后序数据拼接起来。BTW,例子中的 CharsetDecoder 也是 Java NIO 的一个新特性,所以大家应该发现了一点哈,NIO的操作是面向缓冲区的(传统I/O是面向流的)。至此,我们了解了 Channel 与 Buffer 的基本用法。接下来要说的是让一个线程管理多个Channel的重要组件。3.SelectorSelector 是什么Selector(选择器)是一个特殊的组件,用于采集各个通道的状态(或者说事件)。我们先将通道注册到选择器,并设置好关心的事件,然后就可以通过调用select()方法,静静地等待事件发生。通道有如下4个事件可供我们监听:Accept:有可以接受的连接Connect:连接成功Read:有数据可读Write:可以写入数据了为什么要用Selector前文说了,如果用阻塞I/O,需要多线程(浪费内存),如果用非阻塞I/O,需要不断重试(耗费CPU)。Selector的出现解决了这尴尬的问题,非阻塞模式下,通过Selector,我们的线程只为已就绪的通道工作,不用盲目的重试了。比如,当所有通道都没有数据到达时,也就没有Read事件发生,我们的线程会在select()方法处被挂起,从而让出了CPU资源。使用方法如下所示,创建一个Selector,并注册一个Channel。注意:要将 Channel 注册到 Selector,首先需要将 Channel 设置为非阻塞模式,否则会抛异常。Selector selector = Selector.open();channel.configureBlocking(false);SelectionKey key = channel.register(selector, SelectionKey.OP_READ);register()方法的第二个参数名叫“interest set”,也就是你所关心的事件集合。如果你关心多个事件,用一个“按位或运算符”分隔,比如SelectionKey.OP_READ | SelectionKey.OP_WRITE复制代码这种写法一点都不陌生,支持位运算的编程语言里都这么玩,用一个整型变量可以标识多种状态,它是怎么做到的呢,其实很简单,举个例子,首先预定义一些常量,它们的值(二进制)如下可以发现,它们值为1的位都是错开的,因此对它们进行按位或运算之后得出的值就没有二义性,可以反推出是由哪些变量运算而来。怎么判断呢,没错,就是“按位与”运算。比如,现在有一个状态集合变量值为 0011,我们只需要判断 “0011 & OP_READ” 的值是 1 还是 0 就能确定集合是否包含 OP_READ 状态。然后,注意 register() 方法返回了一个SelectionKey的对象,这个对象包含了本次注册的信息,我们也可以通过它修改注册信息。从下面完整的例子中可以看到,select()之后,我们也是通过获取一个 SelectionKey 的集合来获取到那些状态就绪了的通道。一个完整实例概念和理论的东西阐述完了(其实写到这里,我发现没写出多少东西,好尴尬(⊙⊙)),看一个完整的例子吧。这个例子使用Java NIO实现了一个单线程的服务端,功能很简单,监听客户端连接,当连接建立后,读取客户端的消息,并向客户端响应一条消息。需要注意的是,我用字符 ‘0′(一个值为0的字节) 来标识消息结束。单线程Serverpublic class NioServer {public static void main(String[] args) throws IOException { // 创建一个selector Selector selector = Selector.open(); // 初始化TCP连接监听通道 ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.bind(new InetSocketAddress(9999)); listenChannel.configureBlocking(false); // 注册到selector(监听其ACCEPT事件) listenChannel.register(selector, SelectionKey.OP_ACCEPT); // 创建一个缓冲区 ByteBuffer buffer = ByteBuffer.allocate(100); while (true) { selector.select(); //阻塞,直到有监听的事件发生 Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); // 通过迭代器依次访问select出来的Channel事件 while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { // 有连接可以接受 SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); System.out.println(“与【” + channel.getRemoteAddress() + “】建立了连接!”); } else if (key.isReadable()) { // 有数据可以读取 buffer.clear(); // 读取到流末尾说明TCP连接已断开, // 因此需要关闭通道或者取消监听READ事件 // 否则会无限循环 if (((SocketChannel) key.channel()).read(buffer) == -1) { key.channel().close(); continue; } // 按字节遍历数据 buffer.flip(); while (buffer.hasRemaining()) { byte b = buffer.get(); if (b == 0) { // 客户端消息末尾的\0 System.out.println(); // 响应客户端 buffer.clear(); buffer.put(“Hello, Client!\0”.getBytes()); buffer.flip(); while (buffer.hasRemaining()) { ((SocketChannel) key.channel()).write(buffer); } } else { System.out.print((char) b); } } } // 已经处理的事件一定要手动移除 keyIter.remove(); } }}}Client这个客户端纯粹测试用,为了看起来不那么费劲,就用传统的写法了,代码很简短。要严谨一点测试的话,应该并发运行大量Client,统计服务端的响应时间,而且连接建立后不要立刻发送数据,这样才能发挥出服务端非阻塞I/O的优势。public class Client {public static void main(String[] args) throws Exception { Socket socket = new Socket(“localhost”, 9999); InputStream is = socket.getInputStream(); OutputStream os = socket.getOutputStream(); // 先向服务端发送数据 os.write(“Hello, Server!\0”.getBytes()); // 读取服务端发来的数据 int b; while ((b = is.read()) != 0) { System.out.print((char) b); } System.out.println(); socket.close();}}NIO vs IO学习了NIO之后我们都会有这样一个疑问:到底什么时候该用NIO,什么时候该用传统的I/O呢?其实了解他们的特性后,答案还是比较明确的,NIO擅长1个线程管理多条连接,节约系统资源,但是如果每条连接要传输的数据量很大的话,因为是同步I/O,会导致整体的响应速度很慢;而传统I/O为每一条连接创建一个线程,能充分利用处理器并行处理的能力,但是如果连接数量太多,内存资源会很紧张。总结就是:连接数多数据量小用NIO,连接数少用I/O(写起来也简单- -)。Next经过NIO核心组件的学习,了解了非阻塞服务端实现的基本方法。然而,细心的你们肯定也发现了,上面那个完整的例子,实际上就隐藏了很多问题。比如,例子中只是简单的将读取到的每个字节输出,实际环境中肯定是要读取到完整的消息后才能进行下一步处理,由于NIO的非阻塞特性,一次可能只读取到消息的一部分,这已经很糟糕了,如果同一条连接会连续发来多条消息,那不仅要对消息进行拼接,还需要切割,同理,例子中给客户端响应的时候,用了个while()循环,保证数据全部write完成再做其它工作,实际应用中为了性能,肯定不会这么写。另外,为了充分利用现代处理器多核心并行处理的能力,应该用一个线程组来管理这些连接的事件。要解决这些问题,需要一个严谨而繁琐的设计,不过幸运的是,我们有开源的框架可用,那就是优雅而强大的Netty,Netty基于Java NIO,提供异步调用接口,开发高性能服务器的一个很好的选择,之前在项目中使用过,但没有深入学习,打算下一步好好学学它,到时候再写一篇笔记。Java NIO设计的目标是为程序员提供API以享受现代操作系统最新的I/O机制,所以覆盖面较广,除了文中所涉及的组件与特性,还有很多其它的,比如 Pipe(管道)、Path(路径)、Files(文件) 等,有的是用于提升I/O性能的新组件,有的是简化I/O操作的工具,具体用法可以参看最后 References 里的链接。 ...

November 16, 2018 · 2 min · jiezi

从时间碎片角度理解阻塞IO模型及非阻塞模型

阻塞模型限制了服务器的并发处理能力(伸缩性或同时处理的客户端连接数)传统的网络服务器只支持阻塞模型,该模型下,针对每个客户端连接,服务器都必须创建一个线程来处理这个连接上的请求,服务器必须维持着这些线程直到线程中的处理工作结束。服务器上所能创建的线程数量是有限的,WHY?进程上下文切换是耗时的过程创建的进程本身占用资源,比如每个进程或线程占用一定容量的内存等待数据准备和内核缓存复制,导致IO阻塞,占用着线程所以当连接到服务器上的客户端的数量很大时,把服务器上所能创建的线程都占据了时,服务器就无法接受更多的连接了。这限制了服务器处理请求的伸缩性。并非所有客户端都是持续活跃的存在这样一个事实,就是虽然连接到服务器上的客户端很多,但并非所有客户端都是持续活跃着的。它们占据着阻塞式服务器的线程资源——即使它们处于非工作状态。这些线程占据了资源,却不工作。这会造成什么现象呢?就是线程时间的碎片化——一个线程大部分时间是在等待IO操作的结果。为了让服务器能接受更多客户端的连接,非阻塞模型就出现了。如何提升服务器的并发处理能力?消灭碎片化时间,可以提升服务器的并发处理能力。如何消灭碎片化时间? 让线程分工协作各司其职,是一个很好的手段。原来的阻塞模型下,一个线程要干所有的事情。分工协作机制下,一部分线程专门用于接受客户端的连接、一部分专门用于获取请求的数据、一部分专门执行计算工作、还有一部分线程专门用于响应客户端。接受客户端连接的线程在接收到客户端连接后,立即把连接交给后续工序的线程处理,而它自己则继续接受下一个连接。如此类推,各个线程无须等待,不存在碎片化时间,全负荷工作。这样一来,整体上需要的较少的线程,就可以完成以前需要较多线程才能达到的工作时间了。阻塞模型下的实现方式在阻塞模型下,利用异步处理的方式对线程进行分工协作。接收请求的线程可以满负荷工作,但处理IO操作的线程仍然是阻塞着的,仍然存在线程工作不饱和的现象。非阻塞模型彻底消灭线程工作不饱和非阻塞模型下,IO操作不再是阻塞的了,而是立即返回。这样的话,处理IO操作的线程,可以在空闲时对所有请求进行轮询,以便判断哪些IO操作已完成。比如判断某个请求是否可以进行“写”操作,如果还不可以,无须等待,继续判断下一个请求是否可以进行“读”操作,如果可以则立即读取数据,然后把数据转交给专职计算的线程。这样就让线程工作不饱和现象消失了。这是所谓的“同步非阻塞”。轮询的耗时如何消灭?这就要请出“IO复用”这尊大神了。IO复用模型下,线程一次性从操作系统那儿获得一批可以进行IO操作的请求,处理完毕后,再此获得新的一批。线程无须与操作系统交互多次以便轮询每个请求的状态,而是与操作系统交互一次即可获得批量信息。效率进一步提高啦。

November 14, 2018 · 1 min · jiezi

基于 mysql 异步驱动的非阻塞 Mybatis

虽然 spring5 也推出了 WebFlux 这一套异步技术栈,这种极大提升吞吐的玩法在 node 里玩的风生水起,但 java 世界里异步依旧不是主流,Vertx 倒是做了不少对异步的支持,但是其对于数据访问层的封装依旧还是挺精简的,传统的 javaer 还是受不了这种没有对象映射的工具库,于是我尝试将 Mybatis 移植到了异步驱动上,让数据访问层的工作变得更简单一些。给个例子:@Sql(User.class)public interface CommonMapper { @Select(columns = “id,age,username”) @OrderBy(“id desc”) @Page @ModelConditions({ @ModelCondition(field = “username”, criterion = Criterions.EQUAL), @ModelCondition(field = “maxAge”, column = “age”, criterion = Criterions.LESS), @ModelCondition(field = “minAge”, column = “age”, criterion = Criterions.GREATER) }) void query(UserSearch userSearch, DataHandler<List<User>> handler);}上面是 mapper 接口定义,方法的最后一个参数因为异步的原因所以变成了一个回调,不同的是有很多注解来表达 sql,看到这些注解应该不难猜出 sql 语句吧。如果不喜欢你当然可以继续使用 mapper.xml 的方式来写 sql。更多内容移步代码库吧~AsyncDaoasyncDao是一款异步非阻塞模型下的数据访问层工具。MySQL only. 基于MySQL的异步驱动借鉴了Mybatis的mapping 和 dynamicSQL的内容,Mybatiser可以无缝切换注解表达SQL的能力事务支持SpringBoot支持Mybatis like使用上与Mybatis几乎一致,由于异步非阻塞的关系,数据的返回都会通过回调DataHandler来完成,所以方法定义参数的最后一个一定是DataHandler类型。由于需要提取方法的参数名,于是需要加上编译参数-parameters,请将它在IDE和maven里配置上。public interface CommonDao { void query(User user, DataHandler<List<User>> handler); void querySingle(User user, DataHandler<User> handler); void querySingleMap(User user, DataHandler<Map> handler); void insert(User user,DataHandler<Long> handler); void update(User user,DataHandler<Long> handler); void delete(User user,DataHandler<Long> handler);}mapper.xml与Mybatis几乎一致的写法(覆盖常见标签,一些不常用标签可能不支持,动态SQL建议使用注解SQL功能)<?xml version=“1.0” encoding=“UTF-8”?><mapper namespace=“com.tg.async.mapper.CommonDao”> <resultMap id=“BaseResultMap” type=“com.tg.async.mapper.User”> <id column=“id” property=“id”/> <result column=“old_address” property=“oldAddress”/> <result column=“created_at” property=“createdAt”/> <result column=“password” property=“password”/> <result column=“now_address” property=“nowAddress”/> <result column=“state” property=“state”/> <result column=“age” property=“age”/> <result column=“username” property=“username”/> <result column=“updated_at” property=“updatedAt”/> </resultMap> <select id=“query” resultMap=“BaseResultMap”>select * from T_User <where> <if test=“user.username!=null and user.username!=’’">AND username = #{user.username}</if> <if test=“user.age != null”>OR age > #{user.age}</if> </where> order by id desc </select> <insert id=“insert” useGeneratedKeys=“true” keyProperty=“id”>insert into T_User <trim prefix=”(" suffix=")" suffixOverrides=","> <if test=“user.oldAddress != null”>old_address,</if> <if test=“user.createdAt != null”>created_at,</if> <if test=“user.password != null”>password,</if> <if test=“user.nowAddress != null”>now_address,</if> <if test=“user.state != null”>state,</if> <if test=“user.age != null”>age,</if> <if test=“user.username != null”>username,</if> <if test=“user.updatedAt != null”>updated_at,</if> </trim> <trim prefix=“values (” suffix=")" suffixOverrides=","> <if test=“user.oldAddress != null”>#{user.oldAddress},</if> <if test=“user.createdAt != null”>#{user.createdAt},</if> <if test=“user.password != null”>#{user.password},</if> <if test=“user.nowAddress != null”>#{user.nowAddress},</if> <if test=“user.state != null”>#{user.state},</if> <if test=“user.age != null”>#{user.age},</if> <if test=“user.username != null”>#{user.username},</if> <if test=“user.updatedAt != null”>#{user.updatedAt},</if> </trim> </insert> <update id=“update”> update T_User <set> <if test=“user.password != null”>password=#{user.password},</if> <if test=“user.age != null”>age=#{user.age},</if> </set> where id = #{user.id} </update></mapper>注解SQL在XML里写SQL对于一些常见SQL实在是重复劳动,so这里允许你利用注解来表达SQL,该怎么做呢?Table与Model关联@Table(name = “T_User”)public class User { @Id(“id”) private Long id; //建议全部用包装类型,并注意mysql中字段类型与java类型的对应关系,mysql的int不会自动装换到这里的long private String username; private Integer age; @Column(“now_address”) private String nowAddress; @Column(“created_at”) private LocalDateTime createdAt; //asyncDao 里sql的时间类型都用joda,注意不是JDK8提供的那个,而是第三方包org.joda.time @Ignore private String remrk;@Table记录数据表的名字 @Id记录主键信息 @Column映射了表字段和属性的关系,如果表字段和类属性同名,那么可以省略这个注解 @Ingore忽略这个类属性,没有哪个表字段与它关联。定义接口@Sql(User.class)public interface CommonDao { @Select(columns = “id,age,username”) @OrderBy(“id desc”) @Page @ModelConditions({ @ModelCondition(field = “username”, criterion = Criterions.EQUAL), @ModelCondition(field = “maxAge”, column = “age”, criterion = Criterions.LESS), @ModelCondition(field = “minAge”, column = “age”, criterion = Criterions.GREATER) }) void query(UserSearch userSearch, DataHandler<List<User>> handler); @Select(columns = “age,username”) @OrderBy(“id desc”) void queryParam(@Condition String username, @Condition(criterion = Criterions.GREATER) Integer age, @OffSet int offset, @Limit int limit, DataHandler<List<User>> handler); @Select(columns = “username,age”, sqlMode = SqlMode.COMMON) void queryList(@Condition(criterion = Criterions.IN, column = “id”) int[] ids, DataHandler<List<User>> handler); @Insert(useGeneratedKeys = true, keyProperty = “id”) void insert(User user, DataHandler<Long> handler); @Update @ModelConditions(@ModelCondition(field = “id”)) void update(User user, DataHandler<Long> handler); @Delete @ModelConditions(@ModelCondition(field = “id”)) void delete(User user, DataHandler<Long> handler);}看到这些注解你应该能猜出来SQL长什么样,接下来解释一下这些注解查询@Select(columns = “id,age,username”)@OrderBy(“id desc”)@Page@ModelConditions({ @ModelCondition(field = “username”, criterion = Criterions.EQUAL), @ModelCondition(field = “maxAge”, column = “age”, criterion = Criterions.LESS), @ModelCondition(field = “minAge”, column = “age”, criterion = Criterions.GREATER)})void query(UserSearch userSearch, DataHandler<List<User>> handler);@Selectcolumns:默认 select *可以配置columns(“username,age”)选择部分字段;SqlMode:有两个选择,SqlMode.SELECTIVE 和 SqlMode.COMMON,区别是selective会检查查询条件的字段是否为null来实现动态的查询,即值为null时不会成为查询条件。并且@Select,@Count,@Update,@Delete都有selective这个属性。@Conditioncriterion:查询条件,=,<,>,in等,具体见Criterionscolumn:与表字段的对应,若与字段名相同可不配置attach:连接 and,or, 默认是andtest:SqlMode为selective下的判断表达式,类似Mybatis<if test=“username != null”>里的test属性,动态化查询条件@Limit,@OffSet为分页字段。方法的参数不加任何注解一样会被当做查询条件,如下面两个函数效果是一样的:@Select()void queryUser(Integer age,DataHandler<List<User>> handler);@Select()void queryUser(@Condition(criterion = Criterions.EQUAL, column = “age”) Integer age,DataHandler<List<User>> handler);查询Model上面的例子在查询条件比较多时方法参数会比较多,我们可以把查询条件封装到一个类里,使用@ModelConditions来注解查询条件,注意被@ModelConditions注解的方法只能有两个参数,一个是查询model,一个是DataHandler。@Select@Page@ModelConditions({ @ModelCondition(field = “username”, criterion = Criterions.EQUAL), @ModelCondition(field = “minAge”, column = “age”, criterion = Criterions.GREATER), @ModelCondition(field = “maxAge”, column = “age”, criterion = Criterions.LESS), @ModelCondition(field = “ids”, column = “id”, criterion = Criterions.IN)})void queryUser5(UserSearch userSearch,DataHandler<List<User>> handler);@ModelConditionfield:必填,查询条件中类对应的属性column:对应的表字段test:动态SQL的判断表达式@Page只能用在ModelConditions下的查询,并且方法参数的那个类应该有offset,limit这两个属性,或者 使用@Page(offsetField = “offset”,limitField = “limit”)指定具体字段统计@Countvoid count(DataHandler<Integer> handler);//返回Long类型插入@Insert(useGeneratedKeys = true, keyProperty = “id”)//返回自增idvoid insert(User user, DataHandler<Long> handler);更新@Update(columns = “username,age”)//选择更新某几个列void update(User user, DataHandler<Long> handler);//返回affectedRows删除@Deleteint delete(@Condition(criterion = Criterions.GREATER, column = “age”) int min, @Condition(criterion = Criterions.LESS, column = “age”) int max, DataHandler<Long> handler);@Delete@ModelConditions(@ModelCondition(field = “id”))void delete(User user, DataHandler<Long> handler);使用简单的编程使用AsyncConfig asyncConfig = new AsyncConfig();PoolConfiguration configuration = new PoolConfiguration(“username”, “localhost”, 3306, “password”, “database-name”);asyncConfig.setPoolConfiguration(configuration);asyncConfig.setMapperPackages(“com.tg.async.mapper”);//mapper接口asyncConfig.setXmlLocations(“mapper/”);//xml目录,classpath的相对路径,不支持绝对路径AsyncDaoFactory asyncDaoFactory = AsyncDaoFactory.build(asyncConfig);CommonDao commonDao = asyncDaoFactory.getMapper(CommonDao.class); UserSearch userSearch = new UserSearch();userSearch.setUsername(“ha”);userSearch.setMaxAge(28);userSearch.setMinAge(8);userSearch.setLimit(5);CountDownLatch latch = new CountDownLatch(1);commonDao.query(user, users -> { System.out.println(users); latch.countDown();});latch.await(); 事务Mybatis和Spring体系里有一个非常好用的@Translactional注解,我们知道事务本质就是依赖connection的rollback等操作,那么一个事务下多个SQL就要共用这一个connection,如何共享呢?传统的阻塞体系下ThreadLocal就成了实现这一点的完美解决方案。那么在异步世界里,要实现mybatis-spring一样的上层Api来完成事务操作是一件非常困难的事,难点就在于Api太上层,以至于无法实现connection共享。于是这里自能退而求其次,使用编程式的方式来使用事务,抽象出一个Translaction,具体的mapper通过translaction.getMapper()来获取,这样通过同一个Translaction得到的Mapper都将共用一个connection。CountDownLatch latch = new CountDownLatch(1);AsyncConfig asyncConfig = new AsyncConfig();PoolConfiguration configuration = new PoolConfiguration(“username”, “localhost”, 3306, “password”, “database-name”);asyncConfig.setPoolConfiguration(configuration);asyncConfig.setMapperPackages(“com.tg.async.mapper”);asyncConfig.setXmlLocations(“mapper/”);asyncDaoFactory = AsyncDaoFactory.build(asyncConfig);asyncDaoFactory.startTranslation(res -> { Translaction translaction = res.result(); System.out.println(translaction); CommonDao commonDao = translaction.getMapper(CommonDao.class); User user = new User(); user.setUsername(“insert”); user.setPassword(“1234”); user.setAge(28); commonDao.insert(user, id -> { System.out.println(id); translaction.rollback(Void -> { latch.countDown(); }); });});latch.await();SpringBoot虽然Spring5推出了WebFlux,但异步体系在Spring里依旧不是主流。在异步化改造的过程中,大部分人也往往会保留Spring的IOC,而将其他交给Vertx,所以asyncDao对于Spring的支持就是将Mapper注入IOC容器。quick startYAML配置文件:async: dao: mapperLocations: /mapper #xml目录,classpath的相对路径,不支持绝对路径 basePackages: com.tg.mapper #mapper所在包 username: username host: localhost port: 3306 password: pass database: database-name maxTotal: 12 maxIdle: 12 minIdle: 1 maxWaitMillis: 10000添加@Mapper来实现注入@Mapper@Sql(User.class)public interface CommonDao { @Select(columns = “id,age,username”) @OrderBy(“id desc”) @Page(offsetField = “offset”, limitField = “limit”) @ModelConditions({ @ModelCondition(field = “username”, criterion = Criterions.EQUAL), @ModelCondition(field = “maxAge”, column = “age”, criterion = Criterions.LESS), @ModelCondition(field = “minAge”, column = “age”, criterion = Criterions.GREATER) }) void query(UserSearch userSearch, DataHandler<List<User>> handler);}通过@EnableAsyncDao来开启支持,简单示例:@SpringBootApplication@EnableAsyncDaopublic class DemoApplication { public static void main(String[] args){ ApplicationContext applicationContext = SpringApplication.run(DemoApplication.class); CommonDao commonDao = applicationContext.getBean(CommonDao.class); UserSearch userSearch = new UserSearch(); userSearch.setUsername(“ha”); userSearch.setMaxAge(28); userSearch.setMinAge(8); userSearch.setLimit(5); commonDao.query(userSearch, users -> { System.out.println(“result: " + users); }); }} ...

August 30, 2018 · 4 min · jiezi