宜人贷蜂巢API网关技术解密之Netty使用实践

78次阅读

共计 9621 个字符,预计需要花费 25 分钟才能阅读完成。

宜人贷蜂巢团队,由 Michael 创立于 2013 年,通过使用互联网科技手段助力金融生态和谐健康发展。自成立起一直致力于多维度数据闭环平台建设。目前团队规模超过百人,涵盖征信、电商、金融、社交、五险一金和保险等用户授信数据的抓取解析业务,辅以先进的数据分析、挖掘和机器学习等技术对用户信用级别、欺诈风险进行预测评定,全面对外输出金融反欺诈、社交图谱、自动化模型定制等服务或产品。
目前宜人贷蜂巢基于用户授权数据实时抓取解析技术,并结合顶尖大数据技术,快速迭代和自主的创新, 已形成了强大而领先的聚合和输出能力。
为了适应完成宜人贷蜂巢强大的服务输出能力,蜂巢设计开发了自己的 API 网关系统,集中实现了鉴权、加解密、路由、限流等功能,使各业务抓取团队关注其核心抓取和分析工作,而 API 网关系统更专注于安全、流量、路由等问题,从而更好的保障蜂巢服务系统的质量。今天带着大家解密 API 网关的 Netty 线程池技术实践细节。
API 网关作为宜人贷蜂巢数据开放平台的统一入口,所有的客户端及消费端通过统一的 API 来使用各类抓取服务。从面向对象设计的角度看,它与外观模式类似,包装各类不同的实现细节,对外表现出统一的调用形式。
本文首先,简要地介绍 API 网关的项目框架,其次对比 BIO 和 NIO 的特点,再引入 Netty 作为项目的基础框架,然后介绍 Netty 线程池的原理,最后深入 Netty 线程池的初始化、ServerBootstrap 的初始化与启动及 channel 与线程池的绑定过程,让读者了解 Netty 在承载高并发访问的设计路思。
项目框架

图 1 – API 网关项目框架
图中描绘了 API 网关系统的处理流程,以及与服务注册发现、日志分析、报警系统、各类爬虫的关系。其中 API 网关系统接收请求,对请求进行编解码、鉴权、限流、加解密,再基于 Eureka 服务注册发现模块,将请求发送到有效的服务节点上;网关及抓取系统的日志,会被收集到 elk 平台中,做业务分析及报警处理。
BIO vs NIO
API 网关承载数倍于爬虫的流量,提升服务器的并发处理能力、缩短系统的响应时间,通信模型的选择是至关重要的,是选择 BIO,还是 NIO?
Streamvs Buffer & 阻塞 vs 非阻塞
BIO 是面向流的,io 的读写,每次只能处理一个或者多个 bytes,如果数据没有读写完成,线程将一直等待于此,而不能暂时跳过 io 或者等待 io 读写完成异步通知,线程滞留在 io 读写上,不能充分利用机器有限的线程资源,造成 server 的吞吐量较低,见图 2。而 NIO 与此不同,面向 Buffer,线程不需要滞留在 io 读写上,采用操作系统的 epoll 模式,在 io 数据准备好了,才由线程来处理,见图 3。
图 2 – BIO 从流中读取数据
图 3 – NIO 从 Buffer 中读取数据
Selectors
NIO 的 selector 使一个线程可以监控多个 channel 的读写,多个 channel 注册到一个 selector 上,这个 selector 可以监测到各个 channel 的数据准备情况,从而使用有限的线程资源处理更多的连接,见图 4。所以可以这样说,NIO 极大的提升了服务器接受并发请求的能力,而服务器性能还是要取决于业务处理时间和业务线程池模型。

图 4 – NIO 单一线程管理多个连接
而 BIO 采用的是 request-per-thread 模式,用一个线程负责接收 TCP 连接请求,并建立链路,然后将请求 dispatch 给负责业务逻辑处理的线程,见图 5。一旦访问量过多,就会造成机器的线程资源紧张,造成请求延迟,甚至服务宕机。

图 5 – BIO 一连接一线程
对比 JDK NIO 与诸多 NIO 框架后,鉴于 Netty 优雅的设计、易用的 API、优越的性能、安全性支持、API 网关使用 Netty 作为通信模型,实现了基础框架的搭建。
Netty 线程池
考虑到 API 网关的高并发访问需求,线程池设计,见图 6。

图 6 – API 网关线程池设计
Netty 的线程池理念有点像 ForkJoinPool,不是一个线程大池子并发等待一条任务队列,而是每条线程都有一个任务队列。而且 Netty 的线程,并不只是简单的阻塞地拉取任务,而是在每个循环中做三件事情:

先 SelectKeys() 处理 NIO 的事件
然后获取本线程的定时任务,放到本线程的任务队列里
最后执行其他线程提交给本线程的任务

每个循环里处理 NIO 事件与其他任务的时间消耗比例,还能通过 ioRatio 变量来控制,默认是各占 50%。可见,Netty 的线程根本没有阻塞等待任务的清闲日子,所以也不使用有锁的 BlockingQueue 来做任务队列了,而是使用无锁的 MpscLinkedQueue(Mpsc 是 Multiple Producer, Single Consumer 的缩写)
NioEventLoopGroup 初始化
下面分析下 Netty 线程池 NioEventLoopGroup 的设计与实现细节,NioEventLoopGroup 的类层次关系见图 7

图 7 –NioEvenrLoopGroup 类层次关系
其创建过程——方法调用,见下图

图 8 –NioEvenrLoopGroup 创建调用关系

NioEvenrLoopGroup 的创建,具体执行过程是执行类 MultithreadEventExecutorGroup 的构造方法

/**

* Create a new instance.

*

* @param nThreads the number of threads that will be used by this instance.

* @param executor the Executor to use, or {@code null} if the default should be used.

* @param chooserFactory the {@link EventExecutorChooserFactory} to use.

* @param args arguments which will passed to each {@link #newChild(Executor, Object…)} call

*/

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,

EventExecutorChooserFactory chooserFactory, Object… args) {

if (nThreads <= 0) {

throw new IllegalArgumentException(String.format(“nThreads: %d (expected: > 0)”, nThreads));

}

if (executor == null) {

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {

boolean success = false;

try {

children[i] = newChild(executor, args);

success = true;

} catch (Exception e) {

throw new IllegalStateException(“failed to create a child event loop”, e);

} finally {

if (!success) {

for (int j = 0; j < i; j ++) {

children[j].shutdownGracefully();

}

for (int j = 0; j < i; j ++) {

EventExecutor e = children[j];

try {

while (!e.isTerminated()) {

e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

}

} catch (InterruptedException interrupted) {

// Let the caller handle the interruption.

Thread.currentThread().interrupt();

break;

}

}

}

}

}

chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {

@Override

public void operationComplete(Future<Object> future) throws Exception {

if (terminatedChildren.incrementAndGet() == children.length) {

terminationFuture.setSuccess(null);

}

}

};

for (EventExecutor e: children) {

e.terminationFuture().addListener(terminationListener);

}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);

Collections.addAll(childrenSet, children);

readonlyChildren = Collections.unmodifiableSet(childrenSet);

}

其中,创建细节见下:

线程池中的线程数 nThreads 必须大于 0;
如果 executor 为 null,创建默认 executor,executor 用于创建线程(newChild 方法使用 executor 对象);
依次创建线程池中的每一个线程即 NioEventLoop,如果其中有一个创建失败,将关闭之前创建的所有线程;
chooser 为线程池选择器,用来选择下一个 EventExecutor,可以理解为,用来选择一个线程来执行 task;

chooser 的创建细节,见下
DefaultEventExecutorChooserFactory 根据线程数创建具体的 EventExecutorChooser,线程数如果等于 2^n,可使用按位与替代取模运算,节省 cpu 的计算资源,见源码

@SuppressWarnings(“unchecked”)

@Override

public EventExecutorChooser newChooser(EventExecutor[] executors) {

if (isPowerOfTwo(executors.length)) {

return new PowerOfTowEventExecutorChooser(executors);

} else {

return new GenericEventExecutorChooser(executors);

}

}

private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {

private final AtomicInteger idx = new AtomicInteger();

private final EventExecutor[] executors;

PowerOfTowEventExecutorChooser(EventExecutor[] executors) {

this.executors = executors;

}

@Override

public EventExecutor next() {

return executors[idx.getAndIncrement() & executors.length – 1];

}

}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {

private final AtomicInteger idx = new AtomicInteger();

private final EventExecutor[] executors;

GenericEventExecutorChooser(EventExecutor[] executors) {

this.executors = executors;

}

@Override

public EventExecutor next() {

return executors[Math.abs(idx.getAndIncrement() % executors.length)];

}

}
newChild(executor, args) 的创建细节,见下
MultithreadEventExecutorGroup 的 newChild 方法是一个抽象方法,故使用 NioEventLoopGroup 的 newChild 方法,即调用 NioEventLoop 的构造函数

@Override

protected EventLoop newChild(Executor executor, Object… args) throws Exception {

return new NioEventLoop(this, executor, (SelectorProvider) args[0],

((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);

}
在这里先看下 NioEventLoop 的类层次关系

NioEventLoop 的继承关系比较复杂,在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule 方法来运行一些定时任务. 而在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个 NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.
通常来说, NioEventLoop 肩负着两种任务, 第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括调用 select 等待就绪的 IO 事件、读写数据与数据的处理等; 而第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的.
具体的构造过程,见下

创建任务队列 tailTasks(内部为有界的 LinkedBlockingQueue)

创建线程的任务队列 taskQueue(内部为有界的 LinkedBlockingQueue),以及任务过多防止系统宕机的拒绝策略 rejectedHandler
其中 tailTasks 和 taskQueue 均是任务队列,而优先级不同,taskQueue 的优先级高于 tailTasks,定时任务的优先级高于 taskQueue。
ServerBootstrap 初始化及启动
了解了 Netty 线程池 NioEvenrLoopGroup 的创建过程后,下面看下 API 网关服务 ServerBootstrap 的是如何使用线程池引入服务中,为高并发访问服务的。
API 网关 ServerBootstrap 初始化及启动代码,见下

serverBootstrap = new ServerBootstrap();

bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());

workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())

.option(ChannelOption.SO_BACKLOG, config.getBacklogSize())

.option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())

// Memory pooled

.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

.childHandler(channelInitializer);

ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();

log.info(“API-gateway started on port: {}”, config.getPort());

future.channel().closeFuture().sync();

API 网关系统使用 netty 自带的线程池,共有三组线程池,分别为 bossGroup、workerGroup 和 executorGroup(使用在 channelInitializer 中,本文暂不作介绍)。其中,bossGroup 用于接收客户端的 TCP 连接,workerGroup 用于处理 I /O、执行系统 task 和定时任务,executorGroup 用于处理网关业务加解密、限流、路由,及将请求转发给后端的抓取服务等业务操作。
Channel 与线程池的绑定
ServerBootstrap 初始化后,通过调用 bind(port) 方法启动 Server,bind 的调用链如下
AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister

其中,ChannelFuture regFuture = config().group().register(channel); 中的 group() 方法返回 bossGroup,而 channel 在 serverBootstrap 的初始化过程指定 channel 为 NioServerSocketChannel.class,至此将 NioServerSocketChannel 与 bossGroup 绑定到一起,bossGroup 负责客户端连接的建立。那么 NioSocketChannel 是如何与 workerGroup 绑定到一起的?
调用链 AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {

final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

for (Entry<ChannelOption<?>, Object> e: childOptions) {

try {

if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {

logger.warn(“Unknown channel option: ” + e);

}

} catch (Throwable t) {

logger.warn(“Failed to set a channel option: ” + child, t);

}

}

for (Entry<AttributeKey<?>, Object> e: childAttrs) {

child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

}

try {

childGroup.register(child).addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

if (!future.isSuccess()) {

forceClose(child, future.cause());

}

}

});

} catch (Throwable t) {

forceClose(child, t);

}

}
其中,childGroup.register(child) 就是将 NioSocketChannel 与 workderGroup 绑定到一起,那又是什么触发了 ServerBootstrapAcceptor 的 channelRead 方法?
其实当一个 client 连接到 server 时, Java 底层的 NIO ServerSocketChannel 会有一个 SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 NioServerSocketChannel.doReadMessages 方法

@Override

protected int doReadMessages(List<Object> buf) throws Exception {

SocketChannel ch = javaChannel().accept();

try {

if (ch != null) {

buf.add(new NioSocketChannel(this, ch));

return 1;

}

} catch (Throwable t) {

}

return 0;

}

javaChannel().accept() 会获取到客户端新连接的 SocketChannel, 实例化为一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象 (即 this), 由此可知, 我们创建的这个 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 .
接下来就经由 Netty 的 ChannelPipeline 机制, 将读取事件逐级发送到各个 handler 中, 于是就会触发前面我们提到的 ServerBootstrapAcceptor.channelRead 方法啦。
至此,分析了 Netty 线程池的初始化、ServerBootstrap 的启动及 channel 与线程池的绑定过程,能够看出 Netty 中线程池的优雅设计,使用不同的线程池负责连接的建立、IO 读写等,为 API 网关项目的高并发访问提供了技术基础。
总结
至此,对 API 网关技术的 Netty 实践分享就到这里,各位如果对中间的各个环节有什么疑问和建议,欢迎大家指正,我们一起讨论,共同学习提高。
参考
http://tutorials.jenkov.com/j…
http://netty.io/wiki/user-gui…
http://netty.io/
http://www.tuicool.com/articl…
https://segmentfault.com/a/11…
https://segmentfault.com/a/11…
作者:蜂巢团队 宜信技术学院

正文完
 0