上一篇文章分享了Netty服务端启动过程,本文持续分享Netty客户端启动过程。
源码剖析基于Netty 4.1
Connect
客户端启动过程比较简单,次要是Connect操作。
Netty客户端启动疏导类是Bootstrap,同样继承了AbstractBootstrap,它只有一个EventLoopGroup,下文称为ConnectGroup。
Bootstrap#connect -> doResolveAndConnect -> doResolveAndConnect0
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { try { final EventLoop eventLoop = channel.eventLoop(); // #1 final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop); ... final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress); if (resolveFuture.isDone()) { final Throwable resolveFailureCause = resolveFuture.cause(); if (resolveFailureCause != null) { channel.close(); promise.setFailure(resolveFailureCause); } else { // #2 doConnect(resolveFuture.getNow(), localAddress, promise); } return promise; } ... } catch (Throwable cause) { promise.tryFailure(cause); } return promise;}
#1
AddressResolver负责解析SocketAddress。它能够做一些地址转换工作。如Netty提供了RoundRobinInetAddressResolver,能够对上游服务集群进行轮询调用。
Bootstrap#resolver是一个AddressResolverGroup,它负责结构AddressResolver,默认应用DefaultAddressResolverGroup。#2
调用doConnect,执行Connect操作。
doConnect -> AbstractChannel#connect -> DefaultChannelPipeline#connect -> HeadContext#connect -> AbstractNioUnsafe#connect
(这里波及DefaultChannelPipeline的内容后续有文章解析)
public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { ... try { ... boolean wasActive = isActive(); // #1 if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // #2 int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } // #3 promise.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); }}
#1
调用SocketChannel#connect,如果是非阻塞Socket调用,该办法返回false。#2
给EventLoop增加一个定时工作,如果连贯超时则敞开Channel。
Netty中也提供了ReadTimeoutHandler解决读超时的场景。#3
给promise增加一个回调办法,connect操作实现时,如果connect操作被勾销了,则敞开Channel。
NioSocketChannel#doConnect
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { ... boolean success = false; try { // #1 boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); // #2 if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } }}
#1
调用(jvm)SocketChannel#connect办法,同样,非阻塞SocketChannel调用该办法,返回false。#2
关注OP_CONNECT事件。
EventLoop中负责解决OP_CONNECT事件(EventLoop前面有文章解析),调用AbstractNioUnsafe#finishConnect实现连贯操作。
public final void finishConnect() { ... try { boolean wasActive = isActive(); // #1 doFinishConnect(); // #2 fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // #3 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; }}
#1
doFinishConnect办法由子类NioSocketChannel实现,就是调用(jvm)SocketChannel#finishConnect()办法#2
设置connectPromise解决胜利#3
勾销connectTimeoutFuture提早工作
注册关注Read事件
AbstractNioUnsafe#fulfillConnectPromise -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive
后面解析服务端启动过程时说过,HeadContext#channelActive会调用readIfIsAutoRead办法,判断是否开启autoRead,开启则主动触发read事件处理办法。
HeadContext#readIfIsAutoRead -> AbstractChannel#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead
AbstractNioChannel#doBeginRead在解析服务端启动过程时也说过,这里会注册关注Read事件。
客户端启动实现后,客户端和服务端就能够开始进行Read/Write操作了。
如果您感觉本文不错,欢送关注我的微信公众号,您的关注是我保持的能源!