1 模仿单机连贯瓶颈
咱们晓得,通常启动一个服务端会绑定一个端口,例如8000端口,当然客户端连贯端口是有限度的,除去最大端口65535和默认的1024端口及以下的端口,就只剩下1 024~65 535个,再扣除一些罕用端口,理论可用端口只有6万个左右。那么,咱们如何实现单机百万连贯呢? 假如在服务端启动[8000,8100)这100个端口,100×6万就能够实现600万左右的连贯,这是TCP的一个基础知识,尽管对于客户端来说是同一个端口号,然而对于服务端来说是不同的端口号,因为TCP是一个私源组概念,也就是说它是由源IP地址、源端口号、目标IP地址和目标端口号确定的,当源IP地址和源端口号是一样的,然而目标端口号不一样,那么最终零碎底层会把它当作两条TCP连贯来解决,所以这里取巧给服务端开启了100个端口号,这就是单机百万连贯的筹备工作,如下图所示。
单机1024及以下的端口只能给ROOT保留应用,客户端端口范畴为1025~65535,接下来用代码实现单机百万连贯的模仿场景。先看服务端类,循环开启 [8000~8100)这100个监听端口,期待客户端连贯。上面已Netty为例编写代码如下。
package com.tom.netty.connection;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * @author Tom */public final class Server { public static final int BEGIN_PORT = 8000; public static final int N_PORT = 8100; public static void main(String[] args) { new Server().start(Server.BEGIN_PORT, Server.N_PORT); } public void start(int beginPort, int nPort) { System.out.println("服务端启动中..."); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childHandler(new ConnectionCountHandler()); for (int i = 0; i <= (nPort - beginPort); i++) { final int port = beginPort + i; bootstrap.bind(port).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("胜利绑定监听端口: " + port); } }); } System.out.println("服务端已启动!"); }}
而后看ConnectionCountHandler类的实现逻辑,次要用来统计单位工夫内的申请数,每接入一个连贯则自增一个数字,每2s统计一次,代码如下。
package com.tom.netty.connection;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** * Created by Tom. */@ChannelHandler.Sharablepublic class ConnectionCountHandler extends ChannelInboundHandlerAdapter { private AtomicInteger nConnection = new AtomicInteger(); public ConnectionCountHandler() { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() { public void run() { System.out.println("以后客户端连接数: " + nConnection.get()); } },0, 2, TimeUnit.SECONDS); } @Override public void channelActive(ChannelHandlerContext ctx) { nConnection.incrementAndGet(); } @Override public void channelInactive(ChannelHandlerContext ctx) { nConnection.decrementAndGet(); }}
再看客户端类代码,次要性能是循环顺次往服务端开启的100个端口发动申请,直到服务端无响应、线程挂起为止,代码如下。
package com.tom.netty.connection;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/** * Created by Tom. */public class Client { private static final String SERVER_HOST = "127.0.0.1"; public static void main(String[] args) { new Client().start(Server.BEGIN_PORT, Server.N_PORT); } public void start(final int beginPort, int nPort) { System.out.println("客户端已启动..."); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { } }); int index = 0; int port; while (!Thread.interrupted()) { port = beginPort + index; try { ChannelFuture channelFuture = bootstrap.connect(SERVER_HOST, port); channelFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { System.out.println("连贯失败,程序敞开!"); System.exit(0); } } }); channelFuture.get(); } catch (Exception e) { } if (port == nPort) { index = 0; }else { index ++; } } }}
最初,将服务端程序打包公布到Linux服务器上,同样将客户端程序打包公布到另一台Linux服务器上。接下来别离启动服务端和客户端程序。运行一段时间之后,会发现服务端监听的连接数定格在一个值不再变动,如下所示。
以后客户端连接数: 870以后客户端连接数: 870以后客户端连接数: 870以后客户端连接数: 870以后客户端连接数: 870以后客户端连接数: 870以后客户端连接数: 870以后客户端连接数: 870以后客户端连接数: 870...
并且抛出如下异样。
Exception in thread "nioEventLoopGroup-2-1" java.lang.InternalError: java.io.FileNotFoundException: /usr/java/jdk1.8.0_121/jre/lib/ext/cldrdata.jar (Too many open files) at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1040) at sun.misc.URLClassPath.getResource(URLClassPath.java:239) at java.net.URLClassLoader$1.run(URLClassLoader.java:365) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:411) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.util.ResourceBundle$RBClassLoader.loadClass(ResourceBundle.java:503) at java.util.ResourceBundle$Control.newBundle(ResourceBundle.java:2640) at java.util.ResourceBundle.loadBundle(ResourceBundle.java:1501) at java.util.ResourceBundle.findBundle(ResourceBundle.java:1465) at java.util.ResourceBundle.findBundle(ResourceBundle.java:1419) at java.util.ResourceBundle.getBundleImpl(ResourceBundle.java:1361) at java.util.ResourceBundle.getBundle(ResourceBundle.java:845) at java.util.logging.Level.computeLocalizedLevelName(Level.java:265) at java.util.logging.Level.getLocalizedLevelName(Level.java:324) at java.util.logging.SimpleFormatter.format(SimpleFormatter.java:165) at java.util.logging.StreamHandler.publish(StreamHandler.java:211) at java.util.logging.ConsoleHandler.publish(ConsoleHandler.java:116) at java.util.logging.Logger.log(Logger.java:738) at io.netty.util.internal.logging.JdkLogger.log(JdkLogger.java:606) at io.netty.util.internal.logging.JdkLogger.warn(JdkLogger.java:482) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run (SingleThreadEventExecutor.java:876) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run (DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745)
这个时候,咱们就应该要晓得,这曾经是服务器所能承受客户端连贯数量的瓶颈值,也就是服务端最大反对870个连贯。接下来要做的事件是想方法冲破这个瓶颈,让单台服务器也能反对100万连贯,这是一件如许激动人心的事件。
2 单机百万连贯调优解决思路
2.1 冲破部分文件句柄限度
首先在服务端输出命令,看一下单个过程所能反对的最大句柄数。
ulimit -n
输出命令后,会呈现1 024的数字,示意Linux零碎中一个过程可能关上的最大文件数,因为开启一个TCP连贯就会在Linux零碎中对应创立一个文件,所以就是受这个文件的最大文件数限度。那为什么后面演示的服务端连接数最终定格在870,比1 024小呢?其实是因为除了连接数,还有JVM关上的文件Class类也算作过程内关上的文件,所以,1 024减去JVM关上的文件数剩下的就是TCP所能反对的连接数。接下来想方法冲破这个限度,首先在服务器命令行输出以下命令,关上/etc/security/limits.conf文件。
sudo vi /etc/security/limits.conf
而后在这个文件开端加上上面两行代码。
* hard nofile 1000000* soft nofile 1000000
后面的*示意以后用户,hard和soft别离示意限度和正告限度,nofile示意最大的文件数标识,前面的数字1 000 000示意任何用户都能关上100万个文件,这也是操作系统所能反对的最大值,如下图所示。
接下来,输出以下命令。
ulimit -n
这时候,咱们发现还是1 024,没变,重启服务器。将服务端程序和客户端程序别离从新运行,这时候只需静静地察看连接数的变动,最终连接数停留在137 920,同时抛出了异样,如下所示。
以后客户端连接数: 137920以后客户端连接数: 137920以后客户端连接数: 137920以后客户端连接数: 137920以后客户端连接数: 137920Exception in thread "nioEventLoopGroup-2-1" java.lang.InternalError: java.io.FileNotFoundException: /usr/java/jdk1.8.0_121/jre/lib/ext/cldrdata.jar (Too many open files)...
这又是为什么呢?必定还有中央限度了连接数,想要冲破这个限度,就须要冲破全局文件句柄数的限度。
2.2 冲破全局文件句柄限度
首先在Linux命令行输出以下命令,能够查看Linux零碎所有用户过程所能关上的文件数。
cat /proc/sys/fs/file-max
通过下面这个命令能够看到全局的限度,发现失去的后果是10 000。可想而知,部分文件句柄数不能大于全局的文件句柄数。所以,必须将全局的文件句柄数限度调大,冲破这个限度。首先切换为ROOT用户,不然没有权限。
sudo -secho 2000> /proc/sys/fs/file-maxexit
咱们改成20 000来测试一下,持续试验。别离启动服务端程序和客户端程序,发现连接数曾经超出了20 000的限度。
后面应用echo来配置/proc/sys/fs/file-max的话,重启服务器就会生效,还会变回原来的10 000,因而,间接用vi命令批改,输出以下命令行。
sodu vi /etc/sysctl.conf
在/etc/sysctl.conf文件开端加上上面的内容。
fs.file-max=1000000
后果如下图所示。
接下来重启 Linux服务器,再启动服务端程序和客户端程序。
以后客户端连接数: 9812451以后客户端连接数: 9812462以后客户端连接数: 9812489以后客户端连接数: 9812501以后客户端连接数: 9812503...
最终连接数定格在 98万左右。咱们发现次要受限于本机自身的性能。用htop命令查看一下,发现CPU都靠近100%,如下图所示。
以上是操作系统层面的调优和性能晋升,上面次要介绍基于Netty利用层面的调优。
3 Netty利用级别的性能调优
3.1 Netty利用级别的性能瓶颈复现
首先来看一下利用场景,上面是一段规范的服务端利用程序代码。
package com.tom.netty.thread;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.FixedLengthFrameDecoder;/** * Created by Tom. */public class Server { private static final int port = 8000; public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EventLoopGroup businessGroup = new NioEventLoopGroup(1000); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { //自定义长度的解码,每次发送一个long类型的长度数据 //每次传递一个零碎的工夫戳 ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES)); ch.pipeline().addLast(businessGroup, ServerHandler.INSTANCE); } }); ChannelFuture channelFuture = bootstrap.bind(port).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("服务端启动胜利,绑定端口为: " + port); } }); }}
咱们重点关注服务端的逻辑解决ServerHandler类。
package com.tom.netty.thread;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import java.util.concurrent.ThreadLocalRandom;/** * Created by Tom. */@ChannelHandler.Sharablepublic class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> { public static final ChannelHandler INSTANCE = new ServerHandler(); //channelread0是主线程 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { ByteBuf data = Unpooled.directBuffer(); //从客户端读一个工夫戳 data.writeBytes(msg); //模仿一次业务解决,有可能是数据库操作,也有可能是逻辑解决 Object result = getResult(data); //从新写回给客户端 ctx.channel().writeAndFlush(result); } //模仿去数据库获取一个后果 protected Object getResult(ByteBuf data) { int level = ThreadLocalRandom.current().nextInt(1, 1000); //计算出每次响应须要的工夫,用来作为QPS的参考数据 //90.0% == 1ms 1000 100 > 1ms int time; if (level <= 900) { time = 1; //95.0% == 10ms 1000 50 > 10ms } else if (level <= 950) { time = 10; //99.0% == 100ms 1000 10 > 100ms } else if (level <= 990) { time = 100; //99.9% == 1000ms 1000 1 > 1000ms } else { time = 1000; } try { Thread.sleep(time); } catch (InterruptedException e) { } return data; }}
下面代码中有一个getResult()办法。能够把getResult()办法看作是在数据库中查问数据的一个办法,把每次查问的后果返回给客户端。实际上,为了模仿查问数据性能,getResult()传入的参数是由客户端传过来的工夫戳,最终返回的还是客户端传过来的值。只不过返回之前做了一次随机的线程休眠解决,以模仿实在的业务解决性能。如下表所示是模仿场景的性能参数。
上面来看客户端,也是一段规范的代码。
package com.tom.netty.thread;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.FixedLengthFrameDecoder;/** * Created by Tom. */public class Client { private static final String SERVER_HOST = "127.0.0.1"; public static void main(String[] args) throws Exception { new Client().start(8000); } public void start(int port) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_REUSEADDR, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES)); ch.pipeline().addLast(ClientHandler.INSTANCE); } }); //客户端每秒钟向服务端发动1 000次申请 for (int i = 0; i < 1000; i++) { bootstrap.connect(SERVER_HOST, port).get(); } }}
从下面代码中看到,客户端会向服务端发动1 000次申请。重点来看客户端逻辑解决ClientHandler类。
package com.tom.netty.thread;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicLong;/** * Created by Tom. */@ChannelHandler.Sharablepublic class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { public static final ChannelHandler INSTANCE = new ClientHandler(); private static AtomicLong beginTime = new AtomicLong(0); //总响应工夫 private static AtomicLong totalResponseTime = new AtomicLong(0); //总申请数 private static AtomicInteger totalRequest = new AtomicInteger(0); public static final Thread THREAD = new Thread(){ @Override public void run() { try { while (true) { long duration = System.currentTimeMillis() - beginTime.get(); if (duration != 0) { System.out.println("QPS: " + 1000 * totalRequest.get() / duration + ", " + "均匀响应工夫: " + ((float) totalResponseTime.get()) / totalRequest.get() + "ms."); Thread.sleep(2000); } } } catch (InterruptedException ignored) { } } }; @Override public void channelActive(final ChannelHandlerContext ctx) { ctx.executor().scheduleAtFixedRate(new Runnable() { public void run() { ByteBuf byteBuf = ctx.alloc().ioBuffer(); //将以后零碎工夫发送到服务端 byteBuf.writeLong(System.currentTimeMillis()); ctx.channel().writeAndFlush(byteBuf); } }, 0, 1, TimeUnit.SECONDS); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { //获取一个响应时间差,本次申请的响应工夫 totalResponseTime.addAndGet(System.currentTimeMillis() - msg.readLong()); //每次自增 totalRequest.incrementAndGet(); if (beginTime.compareAndSet(0, System.currentTimeMillis())) { THREAD.start(); } }}
下面代码次要模仿了Netty实在业务环境下的解决耗时状况,QPS大略在1 000次,每2s统计一次。接下来,启动服务端和客户端查看控制台日志。首先运行服务端,看到控制台日志如下图所示。
而后运行客户端,看到控制台日志如下图所示,一段时间之后,发现QPS放弃在1 000次以内,均匀响应工夫越来越长。
回到服务端ServerHander的getResul()办法,在getResult()办法中有线程休眠导致阻塞,不难发现,它最终会阻塞主线程,导致所有的申请挤压在一个线程中。如果把上面的代码放入线程池中,成果将齐全不同。
Object result =getResult(data);ctx.channel().wrteAndFlush(result);
把这两行代码放到业务线程池里,一直在后盾运行,运行实现后即时返回后果。
3.2 Netty利用级别的性能调优计划
上面来革新一下代码,在服务端的代码中新建一个ServerThreadPoolHander类。
package com.tom.netty.thread;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * Created by Tom. */@ChannelHandler.Sharablepublic class ServerThreadPoolHandler extends ServerHandler { public static final ChannelHandler INSTANCE = new ServerThreadPoolHandler(); private static ExecutorService threadPool = Executors.newFixedThreadPool(1000); @Override protected void channelRead0(final ChannelHandlerContext ctx, ByteBuf msg) { final ByteBuf data = Unpooled.directBuffer(); data.writeBytes(msg); threadPool.submit(new Runnable() { public void run() { Object result = getResult(data); ctx.channel().writeAndFlush(result); } }); }}
而后在服务端的Handler解决注册为ServerThreadPoolHander,删除原来的ServerHandler,代码如下。
ch.pipeline().addLast(ServerThreadPoolHandler.INSTANCE);
随后,启动服务端和客户端程序,查看控制台日志,如下图所示。
最终耗时稳固在15ms左右,QPS也超过了1 000次。实际上这个后果还不是最优的状态,持续调整。将ServerThreadPoolHander的线程个数调整到20,代码如下。
而后启动程序,发现均匀响应工夫相差也不是太多,如下图所示。
由此得出的论断是:具体的线程数须要在实在的环境下一直地调整、测试,能力确定最合适的数值。本章旨在通知大家优化的办法,而不是后果。
本篇由Tom老师分享,如果本文对您有帮忙,欢送关注和点赞!