前言
上篇 [【从入门到放弃 -Java】并发编程 -NIO 使用]() 简单介绍了 nio 的基础使用,本篇将深入源码分析 nio 中 channel 的实现。
简介
channel 即通道,可以用来读、写数据,它是全双工的可以同时用来读写操作。这也是它与 stream 流的最大区别。
channel 需要与 buffer 配合使用,channel 通道的一端是 buffer,一端是数据源实体,如文件、socket 等。在 nio 中,通过 channel 的不同实现来处理 不同实体与数据 buffer 中的数据传输。
channel 接口:
package java.nio.channels;
import java.io.IOException;
import java.io.Closeable;
/**
* A nexus for I/O operations.
*
* <p> A channel represents an open connection to an entity such as a hardware
* device, a file, a network socket, or a program component that is capable of
* performing one or more distinct I/O operations, for example reading or
* writing.
*
* <p> A channel is either open or closed. A channel is open upon creation,
* and once closed it remains closed. Once a channel is closed, any attempt to
* invoke an I/O operation upon it will cause a {@link ClosedChannelException}
* to be thrown. Whether or not a channel is open may be tested by invoking
* its {@link #isOpen isOpen} method.
*
* <p> Channels are, in general, intended to be safe for multithreaded access
* as described in the specifications of the interfaces and classes that extend
* and implement this interface.
*
*
* @author Mark Reinhold
* @author JSR-51 Expert Group
* @since 1.4
*/
public interface Channel extends Closeable {
/**
* Tells whether or not this channel is open.
*
* @return <tt>true</tt> if, and only if, this channel is open
*/
public boolean isOpen();
/**
* Closes this channel.
*
* <p> After a channel is closed, any further attempt to invoke I/O
* operations upon it will cause a {@link ClosedChannelException} to be
* thrown.
*
* <p> If this channel is already closed then invoking this method has no
* effect.
*
* <p> This method may be invoked at any time. If some other thread has
* already invoked it, however, then another invocation will block until
* the first invocation is complete, after which it will return without
* effect. </p>
*
* @throws IOException If an I/O error occurs
*/
public void close() throws IOException;}
常见的 channel 实现有:
- FileChannel:文件读写数据通道
- SocketChannel:TCP 读写网络数据通道
- ServerSocketChannel:服务端网络数据读写通道,可以监听 TCP 连接。对每一个新进来的连接都会创建一个 SocketChannel。
- DatagramChannel:UDP 读写网络数据通道
FileChannel
FileChannel 是一个抽象类,它继承了 AbstractInterruptibleChannel 类,并实现了 SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel 接口。
具体的实现类主要是 sun.nio.ch.FileChannelImpl。下面详细分析下 FileChannelImpl 中每个方法的具体实现。
open
private FileChannelImpl(FileDescriptor var1, String var2, boolean var3, boolean var4, boolean var5, Object var6) {
// 主要记载操作系统维护的文件描述符
this.fd = var1;
// 是否可读
this.readable = var3;
// 是否可写
this.writable = var4;
// 是否以追加的方式打开
this.append = var5;
this.parent = var6;
this.path = var2;
// 底层使用 native 的 read 和 write 来处理文件的
this.nd = new FileDispatcherImpl(var5);
}
//FileInputStream::getChannel 调用 FileChannelImpl.open(fd, path, true, false, this) 获取只读 channel
public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, Object var4) {return new FileChannelImpl(var0, var1, var2, var3, false, var4);
}
//FileOutputStream::getChannel 调用 FileChannelImpl.open(fd, path, false, true, append, this) 获取只写 channel
public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, boolean var4, Object var5) {return new FileChannelImpl(var0, var1, var2, var3, var4, var5);
}
private FileChannelImpl(FileDescriptor fd, String path, boolean readable,
boolean writable, boolean direct, Object parent)
{
this.fd = fd;
// 是否可读
this.readable = readable;
// 是否可写
this.writable = writable;
// 对于从流创建的 channel,在结束时要做不同的清理动作,(openJDK 中才有,sun 的 jdk 中没有)this.parent = parent;
// 源文件的 path
this.path = path;
// 是否使用 DirectIO
this.direct = direct;
this.nd = new FileDispatcherImpl();
if (direct) {
assert path != null;
this.alignment = nd.setDirectIO(fd, path);
} else {this.alignment = -1;}
// 当 parent 不存在时,则注册一个 cleaner,否则交由 parent 做清理动作。// Register a cleaning action if and only if there is no parent
// as the parent will take care of closing the file descriptor.
// FileChannel is used by the LambdaMetaFactory so a lambda cannot
// be used here hence we use a nested class instead.
this.closer = parent != null ? null :
CleanerFactory.cleaner().register(this, new Closer(fd));
}
// Used by FileInputStream.getChannel(), FileOutputStream.getChannel
// and RandomAccessFile.getChannel()
public static FileChannel open(FileDescriptor fd, String path,
boolean readable, boolean writable,
boolean direct, Object parent)
{return new FileChannelImpl(fd, path, readable, writable, direct, parent);
}
- open 方法主要是返回一个新 new 的 FileChannelImpl 对象,初始化时设置 fileDescriptor、readable、writable、append、parent、path 等属性,看变量名很容易理解,在此不赘述变量含义。
read
// 实现自 SeekableByteChannel 接口的方法,将文件中的内容读取到给定的 byteBuffer 中
public int read(ByteBuffer dst) throws IOException {
// 保证读写时,channel 处于开启状态
ensureOpen();
// 判断是否可读
if (!readable)
throw new NonReadableChannelException();
synchronized (positionLock) {if (direct)
Util.checkChannelPositionAligned(position(), alignment);
int n = 0;
int ti = -1;
try {
// 开始阻塞,并注册为 Interruptible,可以被中断
beginBlocking();
// 将当前线程添加到 NativeThreadSet 中,并返回索引,方便后续操作。//NativeThreadSet 是一个线程安全的本地线程集合,方便管理,用来发送信号
ti = threads.add();
if (!isOpen())
return 0;
do {
// 当未被系统中断(即读取完毕)或 channel 未被关闭,则一直读,将内容写入到 byteBuffer(dst)中
n = IOUtil.read(fd, dst, -1, direct, alignment, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
// 把当前线程从 set 中移出
threads.remove(ti);
// 结束,释放锁
endBlocking(n > 0);
assert IOStatus.check(n);
}
}
}
// 实现自 ScatteringByteChannel 接口的方法,将文件中的内容依次读取到给定的 byteBuffer 数组中。public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
// 保证读写时,channel 处于开启状态
ensureOpen();
// 判断是否可读
if (!readable)
throw new NonReadableChannelException();
synchronized (positionLock) {if (direct)
Util.checkChannelPositionAligned(position(), alignment);
long n = 0;
int ti = -1;
try {
// 开始阻塞,并注册为 Interruptible,可以被中断
beginBlocking();
// 将当前线程添加到 NativeThreadSet 中,并返回索引,方便后续操作。//NativeThreadSet 是一个线程安全的本地线程集合,方便管理,用来发送信号
ti = threads.add();
if (!isOpen())
return 0;
do {
// 当未被系统中断(即读取完毕)或 channel 未被关闭,则一直读,将内容写入到 byteBuffer(dst)中
n = IOUtil.read(fd, dsts, offset, length,
direct, alignment, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
// 把当前线程从 set 中移出
threads.remove(ti);
// 结束,释放锁
endBlocking(n > 0);
assert IOStatus.check(n);
}
}
}
write
// 实现自 SeekableByteChannel 接口的方法,将 byteBuffer 中的内容写入到文件中
public int write(ByteBuffer src) throws IOException {
// 保证写时,channel 处于开启状态
ensureOpen();
// 判断是否可写
if (!writable)
throw new NonWritableChannelException();
synchronized (positionLock) {if (direct)
Util.checkChannelPositionAligned(position(), alignment);
int n = 0;
int ti = -1;
try {
// 开始阻塞,并注册为 Interruptible,可以被中断
beginBlocking();
// 将当前线程添加到 NativeThreadSet 中,并返回索引,方便后续操作。//NativeThreadSet 是一个线程安全的本地线程集合,方便管理,用来发送信号
ti = threads.add();
if (!isOpen())
return 0;
do {
// 当未被系统中断(即写入完毕)或 channel 未被关闭,则一直写,将内容写入到文件中
n = IOUtil.write(fd, src, -1, direct, alignment, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
// 把当前线程从 set 中移出
threads.remove(ti);
// 结束,释放锁
assert IOStatus.check(n);
}
}
}
// 实现自 GatheringByteChannel 接口的方法,将 byteBuffer 数组中的内容依次写入到文件中
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
// 保证写时,channel 处于开启状态
ensureOpen();
// 判断是否可写
if (!writable)
throw new NonWritableChannelException();
synchronized (positionLock) {if (direct)
Util.checkChannelPositionAligned(position(), alignment);
long n = 0;
int ti = -1;
try {
// 开始阻塞,并注册为 Interruptible,可以被中断
beginBlocking();
// 将当前线程添加到 NativeThreadSet 中,并返回索引,方便后续操作。//NativeThreadSet 是一个线程安全的本地线程集合,方便管理,用来发送信号
ti = threads.add();
if (!isOpen())
return 0;
do {
// 当未被系统中断(即写入完毕)或 channel 未被关闭,则一直写,将内容写入到文件中
n = IOUtil.write(fd, srcs, offset, length,
direct, alignment, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
// 把当前线程从 set 中移出
threads.remove(ti);
// 结束,释放锁
assert IOStatus.check(n);
}
}
}
position
// 实现自 SeekableByteChannel 接口的方法,获取当前 channel 的 position
public long position() throws IOException {ensureOpen();
synchronized (positionLock) {
long p = -1;
int ti = -1;
try {beginBlocking();
ti = threads.add();
if (!isOpen())
return 0;
boolean append = fdAccess.getAppend(fd);
do {
//append 模式下,position 在 channel 的末尾
// in append-mode then position is advanced to end before writing
p = (append) ? nd.size(fd) : nd.seek(fd, -1);
} while ((p == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(p);
} finally {threads.remove(ti);
endBlocking(p > -1);
assert IOStatus.check(p);
}
}
}
// 实现自 SeekableByteChannel 接口的方法,设置当前 channel 的 position 为 newPosition
public FileChannel position(long newPosition) throws IOException {ensureOpen();
if (newPosition < 0)
throw new IllegalArgumentException();
synchronized (positionLock) {
long p = -1;
int ti = -1;
try {beginBlocking();
ti = threads.add();
if (!isOpen())
return null;
do {
// 设置当前 position 为 newPosition
p = nd.seek(fd, newPosition);
} while ((p == IOStatus.INTERRUPTED) && isOpen());
return this;
} finally {threads.remove(ti);
endBlocking(p > -1);
assert IOStatus.check(p);
}
}
}
size
实现自 SeekableByteChannel 接口的方法,返回当前实体(文件)的大小
truncate
实现自 SeekableByteChannel 接口的方法,用来截取文件至 newSize 大小
force
实现自 SeekableByteChannel 接口的方法,用来将 channel 中尚未写入磁盘的数据强制落盘
transferTo
将 fileChannel 中的数据传递至另一个 channel
transferFrom
从其它 channel 读取数据至 fileChannel
SocketChannel
open
/**
* Opens a socket channel.
*
* <p> The new channel is created by invoking the {@link
* java.nio.channels.spi.SelectorProvider#openSocketChannel
* openSocketChannel} method of the system-wide default {@link
* java.nio.channels.spi.SelectorProvider} object. </p>
*
* @return A new socket channel
*
* @throws IOException
* If an I/O error occurs
*/
public static SocketChannel open() throws IOException {return SelectorProvider.provider().openSocketChannel();}
open 方法是调用 SelectorProvider 中实现了 java.nio.channels.spi.SelectorProvider#openSocketChannel 的方法,底层实际是 new SocketChannelImpl,调用 native 方法创建 socket
connect
public boolean connect(SocketAddress sa) throws IOException {
// 校验 Address 是否合法
InetSocketAddress isa = Net.checkAddress(sa);
// 获取系统安全管理器
SecurityManager sm = System.getSecurityManager();
if (sm != null)
// 校验 IP 和端口是否被允许连接
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
InetAddress ia = isa.getAddress();
// 如果是本机地址,则获取本机的 host
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
try {
// 加读锁
readLock.lock();
try {
// 加写锁
writeLock.lock();
try {
int n = 0;
// 是否阻塞
boolean blocking = isBlocking();
try {
// 开启 connect 前的校验并设置为 ST_CONNECTIONPENDING,如果 blocking 是 true 即阻塞模式,则记录当前线程的 ID,以便接收信号处理。beginConnect(blocking, isa);
do {
// 调用 native connect 方法
n = Net.connect(fd, ia, isa.getPort());
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {
// 结束连接
endConnect(blocking, (n > 0));
}
assert IOStatus.check(n);
return n > 0;
} finally {
// 释放写锁
writeLock.unlock();}
} finally {
// 释放读锁
readLock.unlock();}
} catch (IOException ioe) {
// connect failed, close the channel
close();
throw SocketExceptions.of(ioe, isa);
}
}
configureBlocking
实现自 SelectableChannel 的接口方法,调用 native 方法设置 socket 的阻塞状态
register
在 AbstractSelectableChannel 中定义,注册要监听的事件。
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (!isOpen())
throw new ClosedChannelException();
synchronized (regLock) {if (isBlocking())
throw new IllegalBlockingModeException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
SelectionKey k = findKey(sel);
if (k != null) {k.attach(att);
k.interestOps(ops);
} else {
// 向 Selector 中注册事件
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}
read
// 实现自 ReadableByteChannel 接口的方法,从 socket 中读取数据至 ByteBuffer
@Override
public int read(ByteBuffer buf) throws IOException {Objects.requireNonNull(buf);
readLock.lock();
try {boolean blocking = isBlocking();
int n = 0;
try {
// 检查 channel 是否开启并已经是 connected 的状态。如果 blocking 是 true 即阻塞模式,则记录当前线程的 ID,以便接收信号处理。beginRead(blocking);
// check if input is shutdown
if (isInputClosed)
return IOStatus.EOF;
// 如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用 native 方法不需要等待。if (blocking) {
do {n = IOUtil.read(fd, buf, -1, nd);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {n = IOUtil.read(fd, buf, -1, nd);
}
} finally {endRead(blocking, n > 0);
if (n <= 0 && isInputClosed)
return IOStatus.EOF;
}
return IOStatus.normalize(n);
} finally {readLock.unlock();
}
}
// 实现自 ScatteringByteChannel 接口的方法,从 socket 中依次读取数据至 ByteBuffer 数组
@Override
public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{Objects.checkFromIndexSize(offset, length, dsts.length);
readLock.lock();
try {boolean blocking = isBlocking();
long n = 0;
try {beginRead(blocking);
// check if input is shutdown
if (isInputClosed)
return IOStatus.EOF;
// 如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用 native 方法不需要等待。if (blocking) {
do {n = IOUtil.read(fd, dsts, offset, length, nd);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {n = IOUtil.read(fd, dsts, offset, length, nd);
}
} finally {endRead(blocking, n > 0);
if (n <= 0 && isInputClosed)
return IOStatus.EOF;
}
return IOStatus.normalize(n);
} finally {readLock.unlock();
}
}
write
// 实现自 ReadableByteChannel 接口的方法,将 ByteBuffer 中的数据写入 socket
@Override
public int write(ByteBuffer buf) throws IOException {Objects.requireNonNull(buf);
writeLock.lock();
try {boolean blocking = isBlocking();
int n = 0;
try {beginWrite(blocking);
// 如果是阻塞模式,则一直读取直到数据读取完毕;非阻塞模式则直接调用 native 方法不需要等待。if (blocking) {
do {n = IOUtil.write(fd, buf, -1, nd);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {n = IOUtil.write(fd, buf, -1, nd);
}
} finally {endWrite(blocking, n > 0);
if (n <= 0 && isOutputClosed)
throw new AsynchronousCloseException();}
return IOStatus.normalize(n);
} finally {writeLock.unlock();
}
}
@Override
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{Objects.checkFromIndexSize(offset, length, srcs.length);
writeLock.lock();
try {boolean blocking = isBlocking();
long n = 0;
try {beginWrite(blocking);
// 如果是阻塞模式,则一直等待直到数据写入完毕;非阻塞模式则直接调用 native 方法不需要等待。if (blocking) {
do {n = IOUtil.write(fd, srcs, offset, length, nd);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {n = IOUtil.write(fd, srcs, offset, length, nd);
}
} finally {endWrite(blocking, n > 0);
if (n <= 0 && isOutputClosed)
throw new AsynchronousCloseException();}
return IOStatus.normalize(n);
} finally {writeLock.unlock();
}
}
// 实现自 ReadableByteChannel 接口的方法,将 ByteBuffer 数组中的数据依次写入 socket
/**
* Writes a byte of out of band data.
*/
int sendOutOfBandData(byte b) throws IOException {writeLock.lock();
try {boolean blocking = isBlocking();
int n = 0;
try {beginWrite(blocking);
// 如果是阻塞模式,则一直等待直到数据写入完毕;非阻塞模式则直接调用 native 方法不需要等待。if (blocking) {
do {n = sendOutOfBandData(fd, b);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {n = sendOutOfBandData(fd, b);
}
} finally {endWrite(blocking, n > 0);
if (n <= 0 && isOutputClosed)
throw new AsynchronousCloseException();}
return IOStatus.normalize(n);
} finally {writeLock.unlock();
}
}
ServerSocketChannel
socket
@Override
public ServerSocket socket() {synchronized (stateLock) {if (socket == null)
socket = ServerSocketAdaptor.create(this);
return socket;
}
}
bind
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {synchronized (stateLock) {ensureOpen();
if (localAddress != null)
throw new AlreadyBoundException();
InetSocketAddress isa = (local == null)
? new InetSocketAddress(0)
: Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkListen(isa.getPort());
// 绑定前做一些前置处理,如将 tcp socket 文件描述符转换成 SDP
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
// 绑定 IP 和地址
Net.bind(fd, isa.getAddress(), isa.getPort());
// 开始监听,设置 socket 上最多可以挂起 backlog 个连接,若 backlog 小于 1 则默认设置 50 个
Net.listen(fd, backlog < 1 ? 50 : backlog);
localAddress = Net.localAddress(fd);
}
return this;
}
accept
@Override
public SocketChannel accept() throws IOException {acceptLock.lock();
try {
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
boolean blocking = isBlocking();
try {begin(blocking);
do {
// 阻塞等待接收客户端链接
n = accept(this.fd, newfd, isaa);
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {end(blocking, n > 0);
assert IOStatus.check(n);
}
if (n < 1)
return null;
// 新接收的 socket 初始设置为阻塞模式(因此非阻塞模式的每次需要显示设置)// newly accepted socket is initially in blocking mode
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
// 用新接收的 socket 创建 SocketChannel
SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
// check permitted to accept connections from the remote address
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
} catch (SecurityException x) {sc.close();
throw x;
}
}
return sc;
} finally {acceptLock.unlock();
}
}
ServerSocketChannel 并没有 read 和 write 方法,只是继承了 AbstractSelectableChannel,以便在 selector 中使用
DatagramChannel
open
public DatagramChannelImpl(SelectorProvider sp)
throws IOException
{super(sp);
ResourceManager.beforeUdpCreate();
try {
// 如果不支持 IPv6 则使用 IPv4
this.family = Net.isIPv6Available()
? StandardProtocolFamily.INET6
: StandardProtocolFamily.INET;
// 设置非流式的 socket(tcp 是流模式协议,udp 是数据报模式协议)this.fd = Net.socket(family, false);
this.fdVal = IOUtil.fdVal(fd);
} catch (IOException ioe) {ResourceManager.afterUdpClose();
throw ioe;
}
}
receive
public SocketAddress receive(ByteBuffer dst) throws IOException {if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
readLock.lock();
try {boolean blocking = isBlocking();
int n = 0;
ByteBuffer bb = null;
try {SocketAddress remote = beginRead(blocking, false);
boolean connected = (remote != null);
SecurityManager sm = System.getSecurityManager();
if (connected || (sm == null)) {
// connected or no security manager
do {n = receive(fd, dst, connected);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNAVAILABLE)
return null;
} else {
// Cannot receive into user's buffer when running with a
// security manager and not connected
bb = Util.getTemporaryDirectBuffer(dst.remaining());
for (;;) {
do {n = receive(fd, bb, connected);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNAVAILABLE)
return null;
InetSocketAddress isa = (InetSocketAddress)sender;
try {sm.checkAccept(isa.getAddress().getHostAddress(),
isa.getPort());
} catch (SecurityException se) {
// Ignore packet
bb.clear();
n = 0;
continue;
}
bb.flip();
dst.put(bb);
break;
}
}
//sender: 发送方地址,Set by receive0 (## ugh)
assert sender != null;
return sender;
} finally {if (bb != null)
Util.releaseTemporaryDirectBuffer(bb);
endRead(blocking, n > 0);
assert IOStatus.check(n);
}
} finally {readLock.unlock();
}
}
send
public int send(ByteBuffer src, SocketAddress target)
throws IOException
{Objects.requireNonNull(src);
InetSocketAddress isa = Net.checkAddress(target, family);
writeLock.lock();
try {boolean blocking = isBlocking();
int n = 0;
try {
// 当 connect 后,remote 会设置为连接的地址
SocketAddress remote = beginWrite(blocking, false);
if (remote != null) {
// connected
if (!target.equals(remote)) {throw new AlreadyConnectedException();
}
do {n = IOUtil.write(fd, src, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
} else {
// not connected
SecurityManager sm = System.getSecurityManager();
if (sm != null) {InetAddress ia = isa.getAddress();
if (ia.isMulticastAddress()) {sm.checkMulticast(ia);
} else {sm.checkConnect(ia.getHostAddress(), isa.getPort());
}
}
do {n = send(fd, src, isa);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
}
} finally {endWrite(blocking, n > 0);
assert IOStatus.check(n);
}
return IOStatus.normalize(n);
} finally {writeLock.unlock();
}
}
connect
@Override
public DatagramChannel connect(SocketAddress sa) throws IOException {InetSocketAddress isa = Net.checkAddress(sa, family);
SecurityManager sm = System.getSecurityManager();
if (sm != null) {InetAddress ia = isa.getAddress();
if (ia.isMulticastAddress()) {sm.checkMulticast(ia);
} else {sm.checkConnect(ia.getHostAddress(), isa.getPort());
sm.checkAccept(ia.getHostAddress(), isa.getPort());
}
}
readLock.lock();
try {writeLock.lock();
try {synchronized (stateLock) {ensureOpen();
if (state == ST_CONNECTED)
throw new AlreadyConnectedException();
int n = Net.connect(family,
fd,
isa.getAddress(),
isa.getPort());
if (n <= 0)
throw new Error(); // Can't happen
// connected
remoteAddress = isa;
state = ST_CONNECTED;
// refresh local address
localAddress = Net.localAddress(fd);
// flush any packets already received.
boolean blocking = isBlocking();
if (blocking) {IOUtil.configureBlocking(fd, false);
}
try {ByteBuffer buf = ByteBuffer.allocate(100);
while (receive(buf) != null) {buf.clear();
}
} finally {if (blocking) {IOUtil.configureBlocking(fd, true);
}
}
}
} finally {writeLock.unlock();
}
} finally {readLock.unlock();
}
return this;
}
udp 是数据报模式的协议,是没有 connect 的。这里的 connect 实际上是在底层忽略了与其他地址的数据传输。
在 connect 后,就可以像 socketChannel 似得使用 read 和 write 了
总结
本文学习了各种 channel 的实现,主要是对底层 native 方法的一些封装,针对不同属性的实体(文件、socket),使用对应的 channel 与 byteBuffer 传输数据。再通过 byteBuffer 与 byte 数据进行转换。
channel 的实现中,封装了大量的 native 方法,重要的底层实现全在 native 中,后续可以深入学习下。
本文中出现的 byteBuffer 和 selector 将在接下来的文章中,单独分析。
更多文章见:https://nc2era.com
本文作者:aloof_
阅读原文
本文为云栖社区原创内容,未经允许不得转载。