共计 13196 个字符,预计需要花费 33 分钟才能阅读完成。
前言
后面小飞曾经解说了 NIO
和Netty
服务端启动,这一讲是 Client
的启动过程。
源码系列的文章仍旧还是遵循大白话 + 画图的格调来解说,本文 Netty
源码及当前的文章版本都基于:4.1.22.Final
本篇是以 NettyClient
启动为切入点,带大家一步步进入 Netty
源码的世界。
Client 启动流程揭秘
1、探秘的入口:netty-client demo
这里用 netty-exmaple
中的 EchoClient
来作为例子:
public final class EchoClient {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();
try {Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect(HOST, PORT).sync();
f.channel().closeFuture().sync();} finally {group.shutdownGracefully();
}
}
}
代码没有什么独特的中央,咱们上一篇文章时也梳理过 Netty
网络编程的一些套路,这里就不再赘述了。
(遗记的小朋友能够查看 Netty
系列文章中查找~)
下面的客户端代码尽管简略, 然而却展现了Netty
客户端初始化时所需的所有内容:
EventLoopGroup
:Netty
服务端或者客户端,都必须指定EventLoopGroup
,客户端指定的是NioEventLoopGroup
Bootstrap
:Netty
客户端启动类,负责客户端的启动和初始化过程channel()
类型:指定Channel
的类型,因为这里是客户端,所以应用的是NioSocketChannel
,服务端会应用NioServerSocketChannel
Handler
:设置数据的处理器bootstrap.connect()
: 客户端连贯netty
服务的办法
2、NioEventLoopGroup 流程解析
咱们先从 NioEventLoopGroup
开始,一行行代码解析,先看看其类构造:
下面是大抵的类构造,而 EventLoop
又继承自 EventLoopGroup
,所以类的大抵构造咱们可想而知。这里一些外围逻辑会在MultithreadEventExecutorGroup
中,蕴含 EventLoopGroup
的创立和初始化操作等。
接着从 NioEventLoopGroup
构造方法开始看起,一步步往下跟(代码都只展现重点的局部,省去很多临时不须要关怀的代码,以下代码都遵循这个准则):
EventLoopGroup group = new NioEventLoopGroup();
public NioEventLoopGroup() {this(0);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
这里通过调用 this()
和super()
办法一路往下传递,期间会结构一些默认属性,始终传递到 MultithreadEventExecutorGroup
类中,接着往西看。
2.1、MultithreadEventExecutorGroup
下面构造函数有一个重要的参数传递:DEFAULT_EVENT_LOOP_THREADS
,这个值默认是CPU 核数 * 2
。
为什么要传递这个参数呢?咱们之前说过 EventLoopGroup
能够了解成一个线程池,MultithreadEventExecutorGroup
有一个线程数组 EventExecutor[] children
属性,而传递过去的 DEFAULT_EVENT_LOOP_THREADS
就是数组的长度。
先看下 MultithreadEventExecutorGroup
中的构造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {children[i] = newChild(executor, args);
}
// ... 省略
}
这段代码执行逻辑能够了解为:
- 通过
ThreadPerTaskExecutor
结构一个Executor
执行器,前面会细说,外面蕴含了线程执行的execute()
办法 - 接着创立一个
EventExecutor
数组对象,大小为传递进来的threads
数量,这个所谓的EventExecutor
能够了解为咱们的EventLoop
,在这个 demo 中就是NioEventLoop
对象 - 最初调用
newChild
办法一一初始化EventLoopGroup
中的EventLoop
对象
下面只是大略说了下 MultithreadEventExecutorGroup
中的构造方法做的事件,前面还会一个个具体开展,先不必焦急,咱们先有个整体的认知就好。
再回到 MultithreadEventExecutorGroup
中的构造方法入参中,有个 EventExecutorChooserFactory
对象,这外面是有个很亮眼的细节设计,通过它咱们来洞悉 Netty
的良苦用心。
2.1、亮点设计:DefaultEventExecutorChooserFactory
EventExecutorChooserFactory
这个类的作用是用来抉择 EventLoop
执行器的,咱们晓得 EventLoopGroup
是一个蕴含了 CPU * 2
个数量的 EventLoop
数组对象,那每次抉择 EventLoop
来执行工作是抉择数组中的哪一个呢?
咱们看一下这个类的具体实现,红框中
都是须要重点查看的中央:
DefaultEventExecutorChooserFactory
是一个选择器工厂类,调用外面的 next()
办法达到一个轮询抉择的目标。
数组的长度是 length,执行第 n 次,取数组中的哪个元素就是对 length 取余
持续回到代码的实现,这里的优化就是在于先通过 isPowerOfTwo()
办法判断数组的长度是否为 2 的 n 次幂,判断的形式很奇妙,应用 val & -val == val
,这里我不做过多的解释,网上还有很多判断 2 的 n 次幂的优良解法,我就不班门弄斧了。( 可参考:https://leetcode-cn.com/probl…)
当然我认为这里还有更容易了解的一个算法:x & (x - 1) == 0
大家能够看上面的图就懂了,这里就不延展了:
BUT!!! 这里为什么要去殚精竭虑的判断数组的长度是 2 的 n 次幂?
不晓得小伙伴们是否还记得 大明湖畔 的HashMap
?个别咱们要求 HashMap
数组的长度须要是 2 的 n 次幂,因为在 key
值寻找数组地位的办法:(n - 1) & hash
n 是数组长度,这里如果数组长度是 2 的 n 次幂就能够通过位运算来晋升性能,当 length
为 2 的 n 次幂时上面公式是等价的:
n & (length – 1) <=> n % length
还记得下面说过,数组的长度默认都是CPU * 2
,而个别服务器 CPU 外围数都是 2、4、8、16 等等,所以这一个小优化就很实用了,再认真想想,原来数组长度的初始化也是很考究的。
这里位运算的益处就是效率远远高于与运算,Netty
针对于这个小细节都做了优化,真是太棒了。
2.3、线程执行器:ThreadPerTaskExecutor
接着看下 ThreadPerTaskExecutor
线程执行器,每次执行工作都会通过它来创立一个线程实体。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {threadFactory.newThread(command).start();}
}
传递进来的 threadFactory
为DefaultThreadFactory
,这外面会结构 NioEventLoop
线程命名规定为 nioEventLoop-1-xxx
,咱们就不细看这个了。当线程执行的时候会调用execute()
办法,这里会创立一个 FastThreadLocalThread
线程,具体看代码:
public class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
return t;
}
protected Thread newThread(Runnable r, String name) {return new FastThreadLocalThread(threadGroup, r, name);
}
}
这里通过 newThread()
来创立一个线程,而后初始化线程对象数据,最终会调用到 Thread.init()
中。
2.4、EventLoop 初始化
接着持续看 MultithreadEventExecutorGroup
构造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {children[i] = newChild(executor, args);
// .... 省略局部代码
}
}
下面代码的最初一部分是 newChild
办法, 这个是一个形象办法, 它的工作是实例化 EventLoop
对象. 咱们跟踪一下它的代码, 能够发现, 这个办法在 NioEventLoopGroup
类中实现了, 其内容很简略:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {throw new NullPointerException("selectorProvider");
}
if (strategy == null) {throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
其实就是实例化一个 NioEventLoop
对象, 而后返回。NioEventLoop
构造函数中会保留 provider
和事件轮询器selector
,在其父类中还会创立一个MpscQueue 队列
,而后保留线程执行器executor
。
再回过头来想一想,MultithreadEventExecutorGroup
外部保护了一个 EventExecutor[] children
数组, Netty
的 EventLoopGroup
的实现机制其实就建设在 MultithreadEventExecutorGroup
之上。
每当 Netty
须要一个 EventLoop
时, 会调用 next()
办法从 EventLoopGroup
数组中获取一个可用的 EventLoop
对象。其中 next
办法的实现是通过 NioEventLoopGroup.next()
来实现的,就是用的下面有过解说的通过轮询算法来计算得出的。
最初总结一下整个 EventLoopGroup
的初始化过程:
EventLoopGroup
(其实是MultithreadEventExecutorGroup
) 外部保护一个类型为EventExecutor children
数组,数组长度是nThreads
- 如果咱们在实例化
NioEventLoopGroup
时, 如果指定线程池大小, 则nThreads
就是指定的值, 反之是处理器外围数 * 2
MultithreadEventExecutorGroup
中会调用newChild
形象办法来初始化children
数组- 形象办法
newChild
是在NioEventLoopGroup
中实现的, 它返回一个NioEventLoop
实例. -
NioEventLoop
属性:SelectorProvider provider
属性:NioEventLoopGroup
结构器中通过SelectorProvider.provider()
获取一个SelectorProvider
Selector selector
属性:NioEventLoop
结构器中通过调用通过selector = provider.openSelector()
获取一个selector
对象.
2.5、NioSocketChannel
在 Netty
中,Channel
是对 Socket
的形象,每当 Netty
建设一个连贯后,都会有一个与其对应的 Channel
实例。
咱们在结尾的 Demo
中,设置了 channel(NioSocketChannel.class)
,NioSocketChannel
的类构造如下:
接着剖析代码,当咱们调用 b.channel()
时实际上会进入 AbstractBootstrap.channel()
逻辑,接着看 AbstractBootstrap
中代码:
public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {if (channelFactory == null) {throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();}
能够看到,这里 ReflectiveChannelFactory
其实就是返回咱们指定的 channelClass:NioSocketChannel
, 而后指定AbstractBootstrap
中的channelFactory = new ReflectiveChannelFactory()
。
2.6、Channel 初始化流程
到了这一步,咱们曾经晓得 NioEventLoopGroup
和channel()
的流程,接着来看看 Channel
的 初始化流程,这也是 Netty
客户端启动的的外围流程之一:
ChannelFuture f = b.connect(HOST, PORT).sync();
接着就开始从 b.connect()
为入口一步步往后跟,先看下 NioSocketChannel
结构的整体流程:
从 connet
往后梳理下整体流程:
Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister() {Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
为了更易读,这里代码都做了简化,只保留了一些重要的代码。
紧接着咱们看看 channelFactory.newChannel()
做了什么,这里 channelFactory
是ReflectiveChannelFactory
,咱们在下面的章节剖析过:
@Override
public T newChannel() {
try {return clazz.getConstructor().newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class" + clazz, t);
}
}
这里的 clazz
是NioSocketChannel
,同样是在下面章节讲到过,这里是调用 NioSocketChannel
的构造函数而后初始化一个 Channel
实例。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {public NioSocketChannel() {this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {this(newSocket(provider));
}
private static SocketChannel newSocket(SelectorProvider provider) {
try {return provider.openSocketChannel();
} catch (IOException e) {throw new ChannelException("Failed to open a socket.", e);
}
}
}
这里其实也很简略,就是创立一个 Java NIO SocketChannel
而已,接着看看 NioSocketChannel
的父类还做了哪些事件,这里梳理下类的关系:
NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch, SelectionKey.OP_READ);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);
ch.configureBlocking(false);
}
}
这里会调用父类的结构参数,并且传递 readInterestOp = SelectionKey.OP_READ:
,这里还有一个很重要的点,配置 Java NIO SocketChannel
为非阻塞的,咱们之前在NIO
章节的时候解说过,这里也不再赘述。
接着持续看 AbstractChannel
的构造函数:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();}
}
这里创立一个 ChannelId
,创立一个Unsafe
对象,这里的 Unsafe
并不是 Java 中的 Unsafe,前面也会讲到。而后创立一个ChannelPipeline
,前面也会讲到,到了这里,一个残缺的NioSocketChannel
就初始化实现了,咱们再来总结一下:
Netty
的SocketChannel
会与Java
原生的SocketChannel
绑定在一起;- 会注册
Read
事件; - 会为每一个
Channel
调配一个channelId
; - 会为每一个
Channel
创立一个Unsafe
对象; - 会为每一个
Channel
调配一个ChannelPipeline
;
2.7、Channel 注册流程
还是回到最下面 initAndRegister
办法,咱们下面都是在剖析外面 newChannel
的操作,这个办法是 NioSocketChannel
创立的一个流程,接着咱们在持续跟 init()
和register()
的过程:
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {final ChannelFuture initAndRegister() {Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
}
}
init()
就是将一些参数 options
和attrs
设置到 channel
中,咱们重点须要看的是 register
办法,其调用链为:
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
这里最初到了 unsafe
的register()
办法,最终调用到AbstractNioChannel.doRegister()
:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
}
javaChannel()
就是 Java NIO
中的 SocketChannel
,这里是将SocketChannel
注册到与 eventLoop
相关联的 selector
上。
最初咱们整顿一下服务启动的整体流程:
-
initAndRegister()
初始化并注册什么呢?channelFactory.newChannel()
- 通过反射创立一个
NioSocketChannel
- 将
Java
原生Channel
绑定到NettyChannel
中 - 注册
Read
事件 - 为
Channel
调配id
- 为
Channel
创立unsafe
对象 - 为
Channel
创立ChannelPipeline
(默认是head<=>tail
的双向链表)
-
init(channel)
`- 把
Bootstrap
中的配置设置到Channel
中
- 把
-
register(channel)
- 把
Channel
绑定到一个EventLoop
上 - 把
Java
原生Channel、Netty
的Channel、Selector
绑定到SelectionKey
中 - 触发
Register
相干的事件
- 把
2.8 unsafe 初始化
下面有提到过在初始化 Channel
的过程中会创立一个 Unsafe
的对象,而后绑定到 Channel
上:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();}
newUnsafe
间接调用到了 NioSocketChannel
中的办法:
@Override
protected AbstractNioUnsafe newUnsafe() {return new NioSocketChannelUnsafe();
}
NioSocketChannelUnsafe
是 NioSocketChannel
中的一个外部类,而后向上还有几个父类继承,这里次要是对应到相干 Java
底层的 Socket
操作。
2.9 pipeline 初始化
咱们还是回到 pipeline
初始化的过程,来看一下 newChannelPipeline()
的具体实现:
protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
咱们调用 DefaultChannelPipeline
的结构器, 传入了一个 channel
, 而这个 channel
其实就是咱们实例化的 NioSocketChannel
。
DefaultChannelPipeline
会将这个 NioSocketChannel
对象保留在channel
字段中. DefaultChannelPipeline
中, 还有两个非凡的字段, 即 head
和 tail
, 而这两个字段是一个双向链表的头和尾. 其实在 DefaultChannelPipeline
中, 保护了一个以 AbstractChannelHandlerContext
为节点的双向链表, 这个链表是 Netty
实现 Pipeline
机制的要害.
对于 DefaultChannelPipeline
中的双向链表以及它所起的作用, 咱们会在后续章节具体解说。这里只是对 pipeline
做个初步的意识。
HeadContext
的继承层次结构如下所示:
TailContext
的继承层次结构如下所示:
咱们能够看到, 链表中 head
是一个 ChannelOutboundHandler
, 而 tail
则是一个 ChannelInboundHandler
.
3.0、客户端 connect 过程
客户端连贯的入口办法还是在 Bootstrap.connect()
中,下面也剖析过一部分内容,申请的具体流程是:
Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {throw (IOException) e.getCause();}
}
看到这里,还是用 Java NIO SocketChannel
发送的 connect
申请进行客户端连贯申请。
总结
本篇文章以一个 Netty Client demo
为入口,而后解析了 NioEventLoopGroup
创立的流程、Channel
的创立和注册的流程,以及客户端发动 connect
的具体流程,这里对于很多细节并没有很深的深刻上来,这些会放到后续的源码剖析文章,敬请期待~