上一篇文章分享了Netty服务端启动过程,本文持续分享Netty客户端启动过程。
源码剖析基于Netty 4.1

Connect

客户端启动过程比较简单,次要是Connect操作。
Netty客户端启动疏导类是Bootstrap,同样继承了AbstractBootstrap,它只有一个EventLoopGroup,下文称为ConnectGroup。

Bootstrap#connect -> doResolveAndConnect -> doResolveAndConnect0

private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,                                           final SocketAddress localAddress, final ChannelPromise promise) {    try {        final EventLoop eventLoop = channel.eventLoop();        // #1        final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);                ...                final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);        if (resolveFuture.isDone()) {            final Throwable resolveFailureCause = resolveFuture.cause();            if (resolveFailureCause != null) {                channel.close();                promise.setFailure(resolveFailureCause);            } else {                // #2                doConnect(resolveFuture.getNow(), localAddress, promise);            }            return promise;        }        ...    } catch (Throwable cause) {        promise.tryFailure(cause);    }    return promise;}

#1
AddressResolver负责解析SocketAddress。它能够做一些地址转换工作。如Netty提供了RoundRobinInetAddressResolver,能够对上游服务集群进行轮询调用。
Bootstrap#resolver是一个AddressResolverGroup,它负责结构AddressResolver,默认应用DefaultAddressResolverGroup。
#2 调用doConnect,执行Connect操作。

doConnect -> AbstractChannel#connect -> DefaultChannelPipeline#connect -> HeadContext#connect -> AbstractNioUnsafe#connect
(这里波及DefaultChannelPipeline的内容后续有文章解析)

public final void connect(        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {    ...    try {        ...        boolean wasActive = isActive();        // #1        if (doConnect(remoteAddress, localAddress)) {            fulfillConnectPromise(promise, wasActive);        } else {            connectPromise = promise;            requestedRemoteAddress = remoteAddress;            // #2            int connectTimeoutMillis = config().getConnectTimeoutMillis();            if (connectTimeoutMillis > 0) {                connectTimeoutFuture = eventLoop().schedule(new Runnable() {                    public void run() {                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;                        ConnectTimeoutException cause =                                new ConnectTimeoutException("connection timed out: " + remoteAddress);                        if (connectPromise != null && connectPromise.tryFailure(cause)) {                            close(voidPromise());                        }                    }                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);            }            // #3            promise.addListener(new ChannelFutureListener() {                public void operationComplete(ChannelFuture future) throws Exception {                    if (future.isCancelled()) {                        if (connectTimeoutFuture != null) {                            connectTimeoutFuture.cancel(false);                        }                        connectPromise = null;                        close(voidPromise());                    }                }            });        }    } catch (Throwable t) {        promise.tryFailure(annotateConnectException(t, remoteAddress));        closeIfClosed();    }}

#1 调用SocketChannel#connect,如果是非阻塞Socket调用,该办法返回false。
#2 给EventLoop增加一个定时工作,如果连贯超时则敞开Channel。
Netty中也提供了ReadTimeoutHandler解决读超时的场景。
#3 给promise增加一个回调办法,connect操作实现时,如果connect操作被勾销了,则敞开Channel。

NioSocketChannel#doConnect

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {    ...    boolean success = false;    try {        // #1        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);        // #2        if (!connected) {            selectionKey().interestOps(SelectionKey.OP_CONNECT);        }        success = true;        return connected;    } finally {        if (!success) {            doClose();        }    }}

#1 调用(jvm)SocketChannel#connect办法,同样,非阻塞SocketChannel调用该办法,返回false。
#2 关注OP_CONNECT事件。

EventLoop中负责解决OP_CONNECT事件(EventLoop前面有文章解析),调用AbstractNioUnsafe#finishConnect实现连贯操作。

public final void finishConnect() {    ...    try {        boolean wasActive = isActive();        // #1        doFinishConnect();        // #2        fulfillConnectPromise(connectPromise, wasActive);    } catch (Throwable t) {        fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));    } finally {        // #3        if (connectTimeoutFuture != null) {            connectTimeoutFuture.cancel(false);        }        connectPromise = null;    }}

#1 doFinishConnect办法由子类NioSocketChannel实现,就是调用(jvm)SocketChannel#finishConnect()办法
#2 设置connectPromise解决胜利
#3 勾销connectTimeoutFuture提早工作

注册关注Read事件
AbstractNioUnsafe#fulfillConnectPromise -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive
后面解析服务端启动过程时说过,HeadContext#channelActive会调用readIfIsAutoRead办法,判断是否开启autoRead,开启则主动触发read事件处理办法。
HeadContext#readIfIsAutoRead -> AbstractChannel#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead
AbstractNioChannel#doBeginRead在解析服务端启动过程时也说过,这里会注册关注Read事件。

客户端启动实现后,客户端和服务端就能够开始进行Read/Write操作了。

如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!