共计 4048 个字符,预计需要花费 11 分钟才能阅读完成。
上一篇文章分享了 Netty 服务端启动过程,本文持续分享 Netty 客户端启动过程。
源码剖析基于 Netty 4.1
Connect
客户端启动过程比较简单,次要是 Connect 操作。
Netty 客户端启动疏导类是 Bootstrap,同样继承了 AbstractBootstrap,它只有一个 EventLoopGroup,下文称为 ConnectGroup。
Bootstrap#connect -> doResolveAndConnect -> doResolveAndConnect0
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {final EventLoop eventLoop = channel.eventLoop();
// #1
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
...
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {channel.close();
promise.setFailure(resolveFailureCause);
} else {
// #2
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
...
} catch (Throwable cause) {promise.tryFailure(cause);
}
return promise;
}
#1
AddressResolver 负责解析 SocketAddress。它能够做一些地址转换工作。如 Netty 提供了 RoundRobinInetAddressResolver,能够对上游服务集群进行轮询调用。
Bootstrap#resolver 是一个 AddressResolverGroup,它负责结构 AddressResolver,默认应用 DefaultAddressResolverGroup。#2
调用 doConnect,执行 Connect 操作。
doConnect -> AbstractChannel#connect -> DefaultChannelPipeline#connect -> HeadContext#connect -> AbstractNioUnsafe#connect
(这里波及 DefaultChannelPipeline 的内容后续有文章解析)
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
...
try {
...
boolean wasActive = isActive();
// #1
if (doConnect(remoteAddress, localAddress)) {fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// #2
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {connectTimeoutFuture = eventLoop().schedule(new Runnable() {public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out:" + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// #3
promise.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {if (future.isCancelled()) {if (connectTimeoutFuture != null) {connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();}
}
#1
调用 SocketChannel#connect,如果是非阻塞 Socket 调用,该办法返回 false。#2
给 EventLoop 增加一个定时工作,如果连贯超时则敞开 Channel。
Netty 中也提供了 ReadTimeoutHandler 解决读超时的场景。#3
给 promise 增加一个回调办法,connect 操作实现时,如果 connect 操作被勾销了,则敞开 Channel。
NioSocketChannel#doConnect
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
...
boolean success = false;
try {
// #1
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
// #2
if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {if (!success) {doClose();
}
}
}
#1
调用(jvm)SocketChannel#connect 办法,同样,非阻塞 SocketChannel 调用该办法,返回 false。#2
关注 OP_CONNECT 事件。
EventLoop 中负责解决 OP_CONNECT 事件(EventLoop 前面有文章解析),调用 AbstractNioUnsafe#finishConnect 实现连贯操作。
public final void finishConnect() {
...
try {boolean wasActive = isActive();
// #1
doFinishConnect();
// #2
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// #3
if (connectTimeoutFuture != null) {connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
#1
doFinishConnect 办法由子类 NioSocketChannel 实现,就是调用 (jvm)SocketChannel#finishConnect() 办法#2
设置 connectPromise 解决胜利#3
勾销 connectTimeoutFuture 提早工作
注册关注 Read 事件
AbstractNioUnsafe#fulfillConnectPromise -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive
后面解析服务端启动过程时说过,HeadContext#channelActive 会调用 readIfIsAutoRead 办法,判断是否开启 autoRead,开启则主动触发 read 事件处理办法。
HeadContext#readIfIsAutoRead -> AbstractChannel#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead
AbstractNioChannel#doBeginRead 在解析服务端启动过程时也说过,这里会注册关注 Read 事件。
客户端启动实现后,客户端和服务端就能够开始进行 Read/Write 操作了。
如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!