思维导图
前言
本文次要讲述Netty框架的一些个性以及重要组件,心愿看完之后能对Netty框架有一个比拟直观的感触,心愿能帮忙读者疾速入门Netty,缩小一些弯路。
一、Netty概述
官网的介绍:
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty是 一个异步事件驱动的网络应用程序框架,用于疾速开发可保护的高性能协定服务器和客户端。
二、为什么应用Netty
从官网上介绍,Netty是一个网络应用程序框架,开发服务器和客户端。也就是用于网络编程的一个框架。既然是网络编程,Socket就不谈了,为什么不必NIO呢?
2.1 NIO的毛病
对于这个问题,之前我写了一篇文章《NIO入门》对NIO有比拟具体的介绍,NIO的次要问题是:
- NIO的类库和API繁冗,学习老本高,你须要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 须要相熟Java多线程编程。这是因为NIO编程波及到Reactor模式,你必须对多线程和网络编程十分相熟,能力写出高质量的NIO程序。
- 臭名远扬的epoll bug。它会导致Selector空轮询,最终导致CPU 100%。直到JDK1.7版本仍然没失去根本性的解决。
2.2 Netty的长处
绝对地,Netty的长处有很多:
- API应用简略,学习成本低。
- 功能强大,内置了多种解码编码器,反对多种协定。
- 性能高,比照其余支流的NIO框架,Netty的性能最优。
- 社区沉闷,发现BUG会及时修复,迭代版本周期短,一直退出新的性能。
- Dubbo、Elasticsearch都采纳了Netty,品质失去验证。
三、架构图
下面这张图就是在官网首页的架构图,咱们从上到下剖析一下。
绿色的局部Core外围模块,包含零拷贝、API库、可扩大的事件模型。
橙色局部Protocol Support协定反对,包含Http协定、webSocket、SSL(安全套接字协定)、谷歌Protobuf协定、zlib/gzip压缩与解压缩、Large File Transfer大文件传输等等。
红色的局部Transport Services传输服务,包含Socket、Datagram、Http Tunnel等等。
以上可看出Netty的性能、协定、传输方式都比拟全,比拟弱小。
四、永远的Hello Word
首先搭建一个HelloWord工程,先相熟一下API,还有为前面的学习做铺垫。以上面这张图为根据:
4.1 引入Maven依赖
应用的版本是4.1.20,绝对比较稳定的一个版本。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
4.2 创立服务端启动类
public class MyServer {
public static void main(String[] args) throws Exception {
//创立两个线程组 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创立服务端的启动对象,设置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
//设置线程队列失去连贯个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置放弃流动连贯状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//应用匿名外部类的模式初始化通道对象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道设置处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("java技术爱好者的服务端曾经准备就绪...");
//绑定端口号,启动服务端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//对敞开通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
4.3 创立服务端处理器
/**
* 自定义的Handler须要继承Netty规定好的HandlerAdapter
* 能力被Netty框架所关联,有点相似SpringMVC的适配器模式
**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送过去的音讯
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//发送音讯给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到音讯,并给你发送一个问号?", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//产生异样,敞开通道
ctx.close();
}
}
4.4 创立客户端启动类
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创立bootstrap对象,配置参数
Bootstrap bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(eventExecutors)
//设置客户端的通道实现类型
.channel(NioSocketChannel.class)
//应用匿名外部类初始化通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//增加客户端通道的处理器
ch.pipeline().addLast(new MyClientHandler());
}
});
System.out.println("客户端准备就绪,随时能够腾飞~");
//连贯服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//对通道敞开进行监听
channelFuture.channel().closeFuture().sync();
} finally {
//敞开线程组
eventExecutors.shutdownGracefully();
}
}
}
4.5 创立客户端处理器
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送音讯到服务端
ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~马来西亚~", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接管服务端发送过去的音讯
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));
}
}
4.6 测试
先启动服务端,再启动客户端,就能够看到后果:
MyServer打印后果:
MyClient打印后果:
五、Netty的个性与重要组件
5.1 taskQueue工作队列
如果Handler处理器有一些长时间的业务解决,能够交给taskQueue异步解决。怎么用呢,请看代码演示:
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取到线程池eventLoop,增加线程,执行
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
//长时间操作,不至于长时间的业务操作导致Handler阻塞
Thread.sleep(1000);
System.out.println("长时间的业务解决");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
咱们打一个debug调试,是能够看到增加进去的taskQueue有一个工作。
5.2 scheduleTaskQueue延时工作队列
延时工作队列和下面介绍的工作队列十分类似,只是多了一个可提早肯定工夫再执行的设置,请看代码演示:
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
//长时间操作,不至于长时间的业务操作导致Handler阻塞
Thread.sleep(1000);
System.out.println("长时间的业务解决");
} catch (Exception e) {
e.printStackTrace();
}
}
},5, TimeUnit.SECONDS);//5秒后执行
仍然关上debug进行调试查看,咱们能够有一个scheduleTaskQueue工作待执行中
5.3 Future异步机制
在搭建HelloWord工程的时候,咱们看到有一行这样的代码:
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);
很多操作都返回这个ChannelFuture对象,到底这个ChannelFuture对象是用来做什么的呢?
ChannelFuture提供操作实现时一种异步告诉的形式。个别在Socket编程中,期待响应后果都是同步阻塞的,而Netty则不会造成阻塞,因为ChannelFuture是采取相似观察者模式的模式进行获取后果。请看一段代码演示:
//增加监听器
channelFuture.addListener(new ChannelFutureListener() {
//应用匿名外部类,ChannelFutureListener接口
//重写operationComplete办法
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//判断是否操作胜利
if (future.isSuccess()) {
System.out.println("连贯胜利");
} else {
System.out.println("连贯失败");
}
}
});
5.4 Bootstrap与ServerBootStrap
Bootstrap和ServerBootStrap是Netty提供的一个创立客户端和服务端启动器的工厂类,应用这个工厂类十分便当地创立启动类,依据下面的一些例子,其实也看得出来能大大地缩小了开发的难度。首先看一个类图:
能够看出都是继承于AbstractBootStrap抽象类,所以大抵上的配置办法都雷同。
一般来说,应用Bootstrap创立启动器的步骤可分为以下几步:
5.4.1 group()
在上一篇文章《Reactor模式》中,咱们就讲过服务端要应用两个线程组:
- bossGroup 用于监听客户端连贯,专门负责与客户端创立连贯,并把连贯注册到workerGroup的Selector中。
- workerGroup用于解决每一个连贯产生的读写事件。
个别创立线程组间接应用以下new就完事了:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
有点好奇的是,既然是线程组,那线程数默认是多少呢?深刻源码:
//应用一个常量保留
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
//NettyRuntime.availableProcessors() * 2,cpu核数的两倍赋值给常量
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//如果不传入,则应用常量的值,也就是cpu核数的两倍
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
通过源码能够看到,默认的线程数是cpu核数的两倍。假如想自定义线程数,能够应用有参结构器:
//设置bossGroup线程数为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//设置workerGroup线程数为16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
5.4.2 channel()
这个办法用于设置通道类型,当建设连贯后,会依据这个设置创立对应的Channel实例。
应用debug模式能够看到
通道类型有以下:
NioSocketChannel: 异步非阻塞的客户端 TCP Socket 连贯。
NioServerSocketChannel: 异步非阻塞的服务器端 TCP Socket 连贯。
罕用的就是这两个通道类型,因为是异步非阻塞的。所以是首选。
OioSocketChannel: 同步阻塞的客户端 TCP Socket 连贯。
OioServerSocketChannel: 同步阻塞的服务器端 TCP Socket 连贯。
略微在本地调试过,用起来和Nio有一些不同,是阻塞的,所以API调用也不一样。因为是阻塞的IO,简直没什么人会抉择应用Oio,所以也很难找到例子。我略微推敲了一下,通过几次报错之后,总算调通了。代码如下:
//server端代码,跟下面简直一样,只需改三个中央
//这个中央应用的是OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)//只须要设置一个线程组boosGroup
.channel(OioServerSocketChannel.class)//设置服务端通道实现类型
//client端代码,只需改两个中央
//应用的是OioEventLoopGroup
EventLoopGroup eventExecutors = new OioEventLoopGroup();
//通道类型设置为OioSocketChannel
bootstrap.group(eventExecutors)//设置线程组
.channel(OioSocketChannel.class)//设置客户端的通道实现类型
NioSctpChannel: 异步的客户端 Sctp(Stream Control Transmission Protocol,流控制传输协定)连贯。
NioSctpServerChannel: 异步的 Sctp 服务器端连贯。
本地没启动胜利,网上看了一些网友的评论,说是只能在linux环境下才能够启动。从报错信息看:SCTP not supported on this platform,不反对这个平台。因为我电脑是window零碎,所以网友说的有点情理。
5.4.3 option()与childOption()
首先说一下这两个的区别。
option()设置的是服务端用于接管进来的连贯,也就是boosGroup线程。
childOption()是提供给父管道接管到的连贯,也就是workerGroup线程。
搞清楚了之后,咱们看一下罕用的一些设置有哪些:
SocketChannel参数,也就是childOption()罕用的参数:
SO_RCVBUF Socket参数,TCP数据接收缓冲区大小。
TCP_NODELAY TCP参数,立刻发送数据,默认值为Ture。
SO_KEEPALIVE Socket参数,连贯保活,默认值为False。启用该性能时,TCP会被动探测闲暇连贯的有效性。
ServerSocketChannel参数,也就是option()罕用参数:
SO_BACKLOG Socket参数,服务端承受连贯的队列长度,如果队列已满,客户端连贯将被回绝。默认值,Windows为200,其余为128。
因为篇幅限度,其余就不列举了,大家能够去网上找材料看看,理解一下。
5.4.4 设置流水线(重点)
ChannelPipeline是Netty解决申请的责任链,ChannelHandler则是具体解决申请的处理器。实际上每一个channel都有一个处理器的流水线。
在Bootstrap中childHandler()办法须要初始化通道,实例化一个ChannelInitializer,这时候须要重写initChannel()初始化通道的办法,拆卸流水线就是在这个中央进行。代码演示如下:
//应用匿名外部类的模式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道设置自定义的处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});
处理器Handler次要分为两种:
ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)
入站指的是数据从底层java NIO Channel到Netty的Channel。
出站指的是通过Netty的Channel来操作底层的java NIO Channel。
ChannelInboundHandlerAdapter处理器罕用的事件有:
- 注册事件 fireChannelRegistered。
- 连贯建设事件 fireChannelActive。
- 读事件和读实现事件 fireChannelRead、fireChannelReadComplete。
- 异样告诉事件 fireExceptionCaught。
- 用户自定义事件 fireUserEventTriggered。
- Channel 可写状态变动事件 fireChannelWritabilityChanged。
- 连贯敞开事件 fireChannelInactive。
ChannelOutboundHandler处理器罕用的事件有:
- 端口绑定 bind。
- 连贯服务端 connect。
- 写事件 write。
- 刷新工夫 flush。
- 读事件 read。
- 被动断开连接 disconnect。
- 敞开 channel 事件 close。
还有一个相似的handler(),次要用于拆卸parent通道,也就是bossGroup线程。个别状况下,都用不上这个办法。
5.4.5 bind()
提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动。如果加上sync()办法则是同步。
有五个同名的重载办法,作用都是用于绑定地址端口号。不一一介绍了。
5.4.6 优雅地敞开EventLoopGroup
//开释掉所有的资源,包含创立的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
会敞开所有的child Channel。敞开之后,开释掉底层的资源。
5.5 Channel
Channel是什么?无妨看一下官网文档的阐明:
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind
翻译粗心:一种连贯到网络套接字或能进行读、写、连贯和绑定等I/O操作的组件。
如果下面这段阐明比拟形象,上面还有一段阐明:
A channel provides a user:
the current state of the channel (e.g. is it open? is it connected?),
the configuration parameters of the channel (e.g. receive buffer size),
the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
the ChannelPipeline which handles all I/O events and requests associated with the channel.
翻译粗心:
channel为用户提供:
- 通道以后的状态(例如它是关上?还是已连贯?)
- channel的配置参数(例如接收缓冲区的大小)
- channel反对的IO操作(例如读、写、连贯和绑定),以及解决与channel相关联的所有IO事件和申请的ChannelPipeline。
5.5.1 获取channel的状态
boolean isOpen(); //如果通道关上,则返回true
boolean isRegistered();//如果通道注册到EventLoop,则返回true
boolean isActive();//如果通道处于活动状态并且已连贯,则返回true
boolean isWritable();//当且仅当I/O线程将立刻执行申请的写入操作时,返回true。
以上就是获取channel的四种状态的办法。
5.5.2 获取channel的配置参数
获取单条配置信息,应用getOption(),代码演示:
ChannelConfig config = channel.config();//获取配置参数
//获取ChannelOption.SO_BACKLOG参数,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
//因为我启动器配置的是128,所以我这里获取的soBackLogConfig=128
获取多条配置信息,应用getOptions(),代码演示:
ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
/**
SO_REUSEADDR : false
WRITE_BUFFER_LOW_WATER_MARK : 32768
WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)
SO_BACKLOG : 128
以下省略...
*/
5.5.3 channel反对的IO操作
写操作,这里演示从服务端写音讯发送到客户端:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("这波啊,这波是肉蛋葱鸡~", CharsetUtil.UTF_8));
}
客户端控制台:
//收到服务端/127.0.0.1:6666的音讯:这波啊,这波是肉蛋葱鸡~
连贯操作,代码演示:
ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//个别应用启动器,这种形式不罕用
通过channel获取ChannelPipeline,并做相干的解决:
//获取ChannelPipeline对象
ChannelPipeline pipeline = ctx.channel().pipeline();
//往pipeline中增加ChannelHandler处理器,拆卸流水线
pipeline.addLast(new MyServerHandler());
5.6 Selector
在NioEventLoop中,有一个成员变量selector,这是nio包的Selector,在之前《NIO入门》中,我曾经讲过Selector了。
Netty中的Selector也和NIO的Selector是一样的,就是用于监听事件,治理注册到Selector中的channel,实现多路复用器。
5.7 PiPeline与ChannelPipeline
在后面介绍Channel时,咱们晓得能够在channel中拆卸ChannelHandler流水线处理器,那一个channel不可能只有一个channelHandler处理器,必定是有很多的,既然是很多channelHandler在一个流水线工作,必定是有程序的。
于是pipeline就呈现了,pipeline相当于处理器的容器。初始化channel时,把channelHandler按程序装在pipeline中,就能够实现按序执行channelHandler了。
在一个Channel中,只有一个ChannelPipeline。该pipeline在Channel被创立的时候创立。ChannelPipeline蕴含了一个ChannelHander造成的列表,且所有ChannelHandler都会注册到ChannelPipeline中。
5.8 ChannelHandlerContext
在Netty中,Handler处理器是有咱们定义的,下面讲过通过集成入站处理器或者出站处理器实现。这时如果咱们想在Handler中获取pipeline对象,或者channel对象,怎么获取呢。
于是Netty设计了这个ChannelHandlerContext上下文对象,就能够拿到channel、pipeline等对象,就能够进行读写等操作。
通过类图,ChannelHandlerContext是一个接口,上面有三个实现类。
实际上ChannelHandlerContext在pipeline中是一个链表的模式。看一段源码就明确了:
//ChannelPipeline实现类DefaultChannelPipeline的结构器办法
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//设置头结点head,尾结点tail
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
上面我用一张图来示意,会更加清晰一点:
5.9 EventLoopGroup
咱们先看一下EventLoopGroup的类图:
其中包含了罕用的实现类NioEventLoopGroup。OioEventLoopGroup在后面的例子中也有应用过。
从Netty的架构图中,能够晓得服务器是须要两个线程组进行配合工作的,而这个线程组的接口就是EventLoopGroup。
每个EventLoopGroup里包含一个或多个EventLoop,每个EventLoop中保护一个Selector实例。
5.9.1 轮询机制的实现原理
咱们无妨看一段DefaultEventExecutorChooserFactory的源码:
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
@Override
public EventExecutor next() {
//idx.getAndIncrement()相当于idx++,而后对工作长度取模
return executors[idx.getAndIncrement() & executors.length - 1];
}
这段代码能够确定执行的形式是轮询机制,接下来debug调试一下:
它这里还有一个判断,如果线程数不是2的N次方,则采纳取模算法实现。
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
原文链接
本文为阿里云原创内容,未经容许不得转载。思维导图
前言
本文次要讲述Netty框架的一些个性以及重要组件,心愿看完之后能对Netty框架有一个比拟直观的感触,心愿能帮忙读者疾速入门Netty,缩小一些弯路。
一、Netty概述
官网的介绍:
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty是 一个异步事件驱动的网络应用程序框架,用于疾速开发可保护的高性能协定服务器和客户端。
二、为什么应用Netty
从官网上介绍,Netty是一个网络应用程序框架,开发服务器和客户端。也就是用于网络编程的一个框架。既然是网络编程,Socket就不谈了,为什么不必NIO呢?
2.1 NIO的毛病
对于这个问题,之前我写了一篇文章《NIO入门》对NIO有比拟具体的介绍,NIO的次要问题是:
- NIO的类库和API繁冗,学习老本高,你须要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 须要相熟Java多线程编程。这是因为NIO编程波及到Reactor模式,你必须对多线程和网络编程十分相熟,能力写出高质量的NIO程序。
- 臭名远扬的epoll bug。它会导致Selector空轮询,最终导致CPU 100%。直到JDK1.7版本仍然没失去根本性的解决。
2.2 Netty的长处
绝对地,Netty的长处有很多:
- API应用简略,学习成本低。
- 功能强大,内置了多种解码编码器,反对多种协定。
- 性能高,比照其余支流的NIO框架,Netty的性能最优。
- 社区沉闷,发现BUG会及时修复,迭代版本周期短,一直退出新的性能。
- Dubbo、Elasticsearch都采纳了Netty,品质失去验证。
三、架构图
下面这张图就是在官网首页的架构图,咱们从上到下剖析一下。
绿色的局部Core外围模块,包含零拷贝、API库、可扩大的事件模型。
橙色局部Protocol Support协定反对,包含Http协定、webSocket、SSL(安全套接字协定)、谷歌Protobuf协定、zlib/gzip压缩与解压缩、Large File Transfer大文件传输等等。
红色的局部Transport Services传输服务,包含Socket、Datagram、Http Tunnel等等。
以上可看出Netty的性能、协定、传输方式都比拟全,比拟弱小。
四、永远的Hello Word
首先搭建一个HelloWord工程,先相熟一下API,还有为前面的学习做铺垫。以上面这张图为根据:
4.1 引入Maven依赖
应用的版本是4.1.20,绝对比较稳定的一个版本。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
4.2 创立服务端启动类
public class MyServer {
public static void main(String[] args) throws Exception {
//创立两个线程组 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创立服务端的启动对象,设置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
//设置线程队列失去连贯个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置放弃流动连贯状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//应用匿名外部类的模式初始化通道对象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道设置处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("java技术爱好者的服务端曾经准备就绪...");
//绑定端口号,启动服务端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//对敞开通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
4.3 创立服务端处理器
/**
* 自定义的Handler须要继承Netty规定好的HandlerAdapter
* 能力被Netty框架所关联,有点相似SpringMVC的适配器模式
**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送过去的音讯
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//发送音讯给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到音讯,并给你发送一个问号?", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//产生异样,敞开通道
ctx.close();
}
}
4.4 创立客户端启动类
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创立bootstrap对象,配置参数
Bootstrap bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(eventExecutors)
//设置客户端的通道实现类型
.channel(NioSocketChannel.class)
//应用匿名外部类初始化通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//增加客户端通道的处理器
ch.pipeline().addLast(new MyClientHandler());
}
});
System.out.println("客户端准备就绪,随时能够腾飞~");
//连贯服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//对通道敞开进行监听
channelFuture.channel().closeFuture().sync();
} finally {
//敞开线程组
eventExecutors.shutdownGracefully();
}
}
}
4.5 创立客户端处理器
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送音讯到服务端
ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~马来西亚~", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接管服务端发送过去的音讯
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));
}
}
4.6 测试
先启动服务端,再启动客户端,就能够看到后果:
MyServer打印后果:
MyClient打印后果:
五、Netty的个性与重要组件
5.1 taskQueue工作队列
如果Handler处理器有一些长时间的业务解决,能够交给taskQueue异步解决。怎么用呢,请看代码演示:
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取到线程池eventLoop,增加线程,执行
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
//长时间操作,不至于长时间的业务操作导致Handler阻塞
Thread.sleep(1000);
System.out.println("长时间的业务解决");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
咱们打一个debug调试,是能够看到增加进去的taskQueue有一个工作。
5.2 scheduleTaskQueue延时工作队列
延时工作队列和下面介绍的工作队列十分类似,只是多了一个可提早肯定工夫再执行的设置,请看代码演示:
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
//长时间操作,不至于长时间的业务操作导致Handler阻塞
Thread.sleep(1000);
System.out.println("长时间的业务解决");
} catch (Exception e) {
e.printStackTrace();
}
}
},5, TimeUnit.SECONDS);//5秒后执行
仍然关上debug进行调试查看,咱们能够有一个scheduleTaskQueue工作待执行中
5.3 Future异步机制
在搭建HelloWord工程的时候,咱们看到有一行这样的代码:
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);
很多操作都返回这个ChannelFuture对象,到底这个ChannelFuture对象是用来做什么的呢?
ChannelFuture提供操作实现时一种异步告诉的形式。个别在Socket编程中,期待响应后果都是同步阻塞的,而Netty则不会造成阻塞,因为ChannelFuture是采取相似观察者模式的模式进行获取后果。请看一段代码演示:
//增加监听器
channelFuture.addListener(new ChannelFutureListener() {
//应用匿名外部类,ChannelFutureListener接口
//重写operationComplete办法
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//判断是否操作胜利
if (future.isSuccess()) {
System.out.println("连贯胜利");
} else {
System.out.println("连贯失败");
}
}
});
5.4 Bootstrap与ServerBootStrap
Bootstrap和ServerBootStrap是Netty提供的一个创立客户端和服务端启动器的工厂类,应用这个工厂类十分便当地创立启动类,依据下面的一些例子,其实也看得出来能大大地缩小了开发的难度。首先看一个类图:
能够看出都是继承于AbstractBootStrap抽象类,所以大抵上的配置办法都雷同。
一般来说,应用Bootstrap创立启动器的步骤可分为以下几步:
5.4.1 group()
在上一篇文章《Reactor模式》中,咱们就讲过服务端要应用两个线程组:
- bossGroup 用于监听客户端连贯,专门负责与客户端创立连贯,并把连贯注册到workerGroup的Selector中。
- workerGroup用于解决每一个连贯产生的读写事件。
个别创立线程组间接应用以下new就完事了:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
有点好奇的是,既然是线程组,那线程数默认是多少呢?深刻源码:
//应用一个常量保留
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
//NettyRuntime.availableProcessors() * 2,cpu核数的两倍赋值给常量
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//如果不传入,则应用常量的值,也就是cpu核数的两倍
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
通过源码能够看到,默认的线程数是cpu核数的两倍。假如想自定义线程数,能够应用有参结构器:
//设置bossGroup线程数为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//设置workerGroup线程数为16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
5.4.2 channel()
这个办法用于设置通道类型,当建设连贯后,会依据这个设置创立对应的Channel实例。
应用debug模式能够看到
通道类型有以下:
NioSocketChannel: 异步非阻塞的客户端 TCP Socket 连贯。
NioServerSocketChannel: 异步非阻塞的服务器端 TCP Socket 连贯。
罕用的就是这两个通道类型,因为是异步非阻塞的。所以是首选。
OioSocketChannel: 同步阻塞的客户端 TCP Socket 连贯。
OioServerSocketChannel: 同步阻塞的服务器端 TCP Socket 连贯。
略微在本地调试过,用起来和Nio有一些不同,是阻塞的,所以API调用也不一样。因为是阻塞的IO,简直没什么人会抉择应用Oio,所以也很难找到例子。我略微推敲了一下,通过几次报错之后,总算调通了。代码如下:
//server端代码,跟下面简直一样,只需改三个中央
//这个中央应用的是OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)//只须要设置一个线程组boosGroup
.channel(OioServerSocketChannel.class)//设置服务端通道实现类型
//client端代码,只需改两个中央
//应用的是OioEventLoopGroup
EventLoopGroup eventExecutors = new OioEventLoopGroup();
//通道类型设置为OioSocketChannel
bootstrap.group(eventExecutors)//设置线程组
.channel(OioSocketChannel.class)//设置客户端的通道实现类型
NioSctpChannel: 异步的客户端 Sctp(Stream Control Transmission Protocol,流控制传输协定)连贯。
NioSctpServerChannel: 异步的 Sctp 服务器端连贯。
本地没启动胜利,网上看了一些网友的评论,说是只能在linux环境下才能够启动。从报错信息看:SCTP not supported on this platform,不反对这个平台。因为我电脑是window零碎,所以网友说的有点情理。
5.4.3 option()与childOption()
首先说一下这两个的区别。
option()设置的是服务端用于接管进来的连贯,也就是boosGroup线程。
childOption()是提供给父管道接管到的连贯,也就是workerGroup线程。
搞清楚了之后,咱们看一下罕用的一些设置有哪些:
SocketChannel参数,也就是childOption()罕用的参数:
SO_RCVBUF Socket参数,TCP数据接收缓冲区大小。
TCP_NODELAY TCP参数,立刻发送数据,默认值为Ture。
SO_KEEPALIVE Socket参数,连贯保活,默认值为False。启用该性能时,TCP会被动探测闲暇连贯的有效性。
ServerSocketChannel参数,也就是option()罕用参数:
SO_BACKLOG Socket参数,服务端承受连贯的队列长度,如果队列已满,客户端连贯将被回绝。默认值,Windows为200,其余为128。
因为篇幅限度,其余就不列举了,大家能够去网上找材料看看,理解一下。
5.4.4 设置流水线(重点)
ChannelPipeline是Netty解决申请的责任链,ChannelHandler则是具体解决申请的处理器。实际上每一个channel都有一个处理器的流水线。
在Bootstrap中childHandler()办法须要初始化通道,实例化一个ChannelInitializer,这时候须要重写initChannel()初始化通道的办法,拆卸流水线就是在这个中央进行。代码演示如下:
//应用匿名外部类的模式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道设置自定义的处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});
处理器Handler次要分为两种:
ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)
入站指的是数据从底层java NIO Channel到Netty的Channel。
出站指的是通过Netty的Channel来操作底层的java NIO Channel。
ChannelInboundHandlerAdapter处理器罕用的事件有:
- 注册事件 fireChannelRegistered。
- 连贯建设事件 fireChannelActive。
- 读事件和读实现事件 fireChannelRead、fireChannelReadComplete。
- 异样告诉事件 fireExceptionCaught。
- 用户自定义事件 fireUserEventTriggered。
- Channel 可写状态变动事件 fireChannelWritabilityChanged。
- 连贯敞开事件 fireChannelInactive。
ChannelOutboundHandler处理器罕用的事件有:
- 端口绑定 bind。
- 连贯服务端 connect。
- 写事件 write。
- 刷新工夫 flush。
- 读事件 read。
- 被动断开连接 disconnect。
- 敞开 channel 事件 close。
还有一个相似的handler(),次要用于拆卸parent通道,也就是bossGroup线程。个别状况下,都用不上这个办法。
5.4.5 bind()
提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动。如果加上sync()办法则是同步。
有五个同名的重载办法,作用都是用于绑定地址端口号。不一一介绍了。
5.4.6 优雅地敞开EventLoopGroup
//开释掉所有的资源,包含创立的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
会敞开所有的child Channel。敞开之后,开释掉底层的资源。
5.5 Channel
Channel是什么?无妨看一下官网文档的阐明:
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind
翻译粗心:一种连贯到网络套接字或能进行读、写、连贯和绑定等I/O操作的组件。
如果下面这段阐明比拟形象,上面还有一段阐明:
A channel provides a user:
the current state of the channel (e.g. is it open? is it connected?),
the configuration parameters of the channel (e.g. receive buffer size),
the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
the ChannelPipeline which handles all I/O events and requests associated with the channel.
翻译粗心:
channel为用户提供:
- 通道以后的状态(例如它是关上?还是已连贯?)
- channel的配置参数(例如接收缓冲区的大小)
- channel反对的IO操作(例如读、写、连贯和绑定),以及解决与channel相关联的所有IO事件和申请的ChannelPipeline。
5.5.1 获取channel的状态
boolean isOpen(); //如果通道关上,则返回true
boolean isRegistered();//如果通道注册到EventLoop,则返回true
boolean isActive();//如果通道处于活动状态并且已连贯,则返回true
boolean isWritable();//当且仅当I/O线程将立刻执行申请的写入操作时,返回true。
以上就是获取channel的四种状态的办法。
5.5.2 获取channel的配置参数
获取单条配置信息,应用getOption(),代码演示:
ChannelConfig config = channel.config();//获取配置参数
//获取ChannelOption.SO_BACKLOG参数,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
//因为我启动器配置的是128,所以我这里获取的soBackLogConfig=128
获取多条配置信息,应用getOptions(),代码演示:
ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
/**
SO_REUSEADDR : false
WRITE_BUFFER_LOW_WATER_MARK : 32768
WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)
SO_BACKLOG : 128
以下省略...
*/
5.5.3 channel反对的IO操作
写操作,这里演示从服务端写音讯发送到客户端:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("这波啊,这波是肉蛋葱鸡~", CharsetUtil.UTF_8));
}
客户端控制台:
//收到服务端/127.0.0.1:6666的音讯:这波啊,这波是肉蛋葱鸡~
连贯操作,代码演示:
ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//个别应用启动器,这种形式不罕用
通过channel获取ChannelPipeline,并做相干的解决:
//获取ChannelPipeline对象
ChannelPipeline pipeline = ctx.channel().pipeline();
//往pipeline中增加ChannelHandler处理器,拆卸流水线
pipeline.addLast(new MyServerHandler());
5.6 Selector
在NioEventLoop中,有一个成员变量selector,这是nio包的Selector,在之前《NIO入门》中,我曾经讲过Selector了。
Netty中的Selector也和NIO的Selector是一样的,就是用于监听事件,治理注册到Selector中的channel,实现多路复用器。
5.7 PiPeline与ChannelPipeline
在后面介绍Channel时,咱们晓得能够在channel中拆卸ChannelHandler流水线处理器,那一个channel不可能只有一个channelHandler处理器,必定是有很多的,既然是很多channelHandler在一个流水线工作,必定是有程序的。
于是pipeline就呈现了,pipeline相当于处理器的容器。初始化channel时,把channelHandler按程序装在pipeline中,就能够实现按序执行channelHandler了。
在一个Channel中,只有一个ChannelPipeline。该pipeline在Channel被创立的时候创立。ChannelPipeline蕴含了一个ChannelHander造成的列表,且所有ChannelHandler都会注册到ChannelPipeline中。
5.8 ChannelHandlerContext
在Netty中,Handler处理器是有咱们定义的,下面讲过通过集成入站处理器或者出站处理器实现。这时如果咱们想在Handler中获取pipeline对象,或者channel对象,怎么获取呢。
于是Netty设计了这个ChannelHandlerContext上下文对象,就能够拿到channel、pipeline等对象,就能够进行读写等操作。
通过类图,ChannelHandlerContext是一个接口,上面有三个实现类。
实际上ChannelHandlerContext在pipeline中是一个链表的模式。看一段源码就明确了:
//ChannelPipeline实现类DefaultChannelPipeline的结构器办法
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//设置头结点head,尾结点tail
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
上面我用一张图来示意,会更加清晰一点:
5.9 EventLoopGroup
咱们先看一下EventLoopGroup的类图:
其中包含了罕用的实现类NioEventLoopGroup。OioEventLoopGroup在后面的例子中也有应用过。
从Netty的架构图中,能够晓得服务器是须要两个线程组进行配合工作的,而这个线程组的接口就是EventLoopGroup。
每个EventLoopGroup里包含一个或多个EventLoop,每个EventLoop中保护一个Selector实例。
5.9.1 轮询机制的实现原理
咱们无妨看一段DefaultEventExecutorChooserFactory的源码:
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
@Override
public EventExecutor next() {
//idx.getAndIncrement()相当于idx++,而后对工作长度取模
return executors[idx.getAndIncrement() & executors.length - 1];
}
这段代码能够确定执行的形式是轮询机制,接下来debug调试一下:
它这里还有一个判断,如果线程数不是2的N次方,则采纳取模算法实现。
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
原文链接
本文为阿里云原创内容,未经容许不得转载。
发表回复