乐趣区

关于netty:Netty源码解析-客户端启动过程

上一篇文章分享了 Netty 服务端启动过程,本文持续分享 Netty 客户端启动过程。
源码剖析基于 Netty 4.1

Connect

客户端启动过程比较简单,次要是 Connect 操作。
Netty 客户端启动疏导类是 Bootstrap,同样继承了 AbstractBootstrap,它只有一个 EventLoopGroup,下文称为 ConnectGroup。

Bootstrap#connect -> doResolveAndConnect -> doResolveAndConnect0

private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                           final SocketAddress localAddress, final ChannelPromise promise) {
    try {final EventLoop eventLoop = channel.eventLoop();
        // #1
        final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
        
        ...
        
        final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

        if (resolveFuture.isDone()) {final Throwable resolveFailureCause = resolveFuture.cause();

            if (resolveFailureCause != null) {channel.close();
                promise.setFailure(resolveFailureCause);
            } else {
                // #2
                doConnect(resolveFuture.getNow(), localAddress, promise);
            }
            return promise;
        }

        ...
    } catch (Throwable cause) {promise.tryFailure(cause);
    }
    return promise;
}

#1
AddressResolver 负责解析 SocketAddress。它能够做一些地址转换工作。如 Netty 提供了 RoundRobinInetAddressResolver,能够对上游服务集群进行轮询调用。
Bootstrap#resolver 是一个 AddressResolverGroup,它负责结构 AddressResolver,默认应用 DefaultAddressResolverGroup。
#2 调用 doConnect,执行 Connect 操作。

doConnect -> AbstractChannel#connect -> DefaultChannelPipeline#connect -> HeadContext#connect -> AbstractNioUnsafe#connect
(这里波及 DefaultChannelPipeline 的内容后续有文章解析)

public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...

    try {
        ...

        boolean wasActive = isActive();
        // #1
        if (doConnect(remoteAddress, localAddress)) {fulfillConnectPromise(promise, wasActive);
        } else {
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // #2
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {connectTimeoutFuture = eventLoop().schedule(new Runnable() {public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out:" + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }
            // #3
            promise.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {if (future.isCancelled()) {if (connectTimeoutFuture != null) {connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();}
}

#1 调用 SocketChannel#connect,如果是非阻塞 Socket 调用,该办法返回 false。
#2 给 EventLoop 增加一个定时工作,如果连贯超时则敞开 Channel。
Netty 中也提供了 ReadTimeoutHandler 解决读超时的场景。
#3 给 promise 增加一个回调办法,connect 操作实现时,如果 connect 操作被勾销了,则敞开 Channel。

NioSocketChannel#doConnect

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    ...

    boolean success = false;
    try {
        // #1
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        // #2
        if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {if (!success) {doClose();
        }
    }
}

#1 调用(jvm)SocketChannel#connect 办法,同样,非阻塞 SocketChannel 调用该办法,返回 false。
#2 关注 OP_CONNECT 事件。

EventLoop 中负责解决 OP_CONNECT 事件(EventLoop 前面有文章解析),调用 AbstractNioUnsafe#finishConnect 实现连贯操作。

public final void finishConnect() {
    ...
    try {boolean wasActive = isActive();
        // #1
        doFinishConnect();
        // #2
        fulfillConnectPromise(connectPromise, wasActive);
    } catch (Throwable t) {fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
    } finally {
        // #3
        if (connectTimeoutFuture != null) {connectTimeoutFuture.cancel(false);
        }
        connectPromise = null;
    }
}

#1 doFinishConnect 办法由子类 NioSocketChannel 实现,就是调用 (jvm)SocketChannel#finishConnect() 办法
#2 设置 connectPromise 解决胜利
#3 勾销 connectTimeoutFuture 提早工作

注册关注 Read 事件
AbstractNioUnsafe#fulfillConnectPromise -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive
后面解析服务端启动过程时说过,HeadContext#channelActive 会调用 readIfIsAutoRead 办法,判断是否开启 autoRead,开启则主动触发 read 事件处理办法。
HeadContext#readIfIsAutoRead -> AbstractChannel#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead
AbstractNioChannel#doBeginRead 在解析服务端启动过程时也说过,这里会注册关注 Read 事件。

客户端启动实现后,客户端和服务端就能够开始进行 Read/Write 操作了。

如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!

退出移动版