大家好,我是 「后端技术进阶」 作者,一个酷爱技术的少年。
@[toc]
感觉不错的话,欢送 star!ღ(´・ᴗ・`)比心
- Netty 从入门到实战系列文章地址:https://github.com/Snailclimb/netty-practical-tutorial。
- RPC 框架源码地址:https://github.com/Snailclimb/guide-rpc-framework
上面,我会带着大家搭建本人的第一个 Netty 版的 Hello World 小程序。
首先,让咱们来创立服务端。
服务端
咱们能够通过 ServerBootstrap
来疏导咱们启动一个简略的 Netty 服务端,为此,你必须要为其指定上面三类属性:
- 线程组(_个别须要两个线程组,一个负责接解决客户端的连贯,一个负责具体的 IO 解决_)
- IO 模型(_BIO/NIO_)
- 自定义
ChannelHandler
(_解决客户端发过来的数据并返回数据给客户端_)
创立服务端
/**
* @author shuang.kou
* @createTime 2020 年 05 月 14 日 20:28:00
*/
public final class HelloServer {
private final int port;
public HelloServer(int port) {this.port = port;}
private void start() throws InterruptedException {
// 1.bossGroup 用于接管连贯,workerGroup 用于具体的解决
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//2. 创立服务端启动疏导 / 辅助类:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3. 给疏导类配置两大线程组, 确定了线程模型
b.group(bossGroup, workerGroup)
// (非必备)打印日志
.handler(new LoggingHandler(LogLevel.INFO))
// 4. 指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();
//5. 能够自定义客户端音讯的业务解决逻辑
p.addLast(new HelloServerHandler());
}
});
// 6. 绑定端口, 调用 sync 办法阻塞晓得绑定实现
ChannelFuture f = b.bind(port).sync();
// 7. 阻塞期待直到服务器 Channel 敞开 (closeFuture() 办法获取 Channel 的 CloseFuture 对象, 而后调用 sync()办法)
f.channel().closeFuture().sync();} finally {
//8. 优雅敞开相干线程组资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();}
}
public static void main(String[] args) throws InterruptedException {new HelloServer(8080).start();}
}
简略解析一下服务端的创立过程具体是怎么的:
1. 创立了两个 NioEventLoopGroup
对象实例:bossGroup
和 workerGroup
。
bossGroup
: 用于解决客户端的 TCP 连贯申请。workerGroup
:负责每一条连贯的具体读写数据的解决逻辑,真正负责 I/O 读写操作,交由对应的 Handler 解决。
举个例子:咱们把公司的老板当做 bossGroup,员工当做 workerGroup,bossGroup 在里面接完活之后,扔给 workerGroup 去解决。个别状况下咱们会指定 bossGroup 的 线程数为 1(并发连贯量不大的时候),workGroup 的线程数量为 CPU 外围数 *2。另外,依据源码来看,应用 NioEventLoopGroup
类的无参构造函数设置线程数量的默认值就是 CPU 外围数 *2。
2. 创立一个服务端启动疏导 / 辅助类:ServerBootstrap
,这个类将疏导咱们进行服务端的启动工作。
3. 通过 .group()
办法给疏导类 ServerBootstrap
配置两大线程组,确定了线程模型。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
4. 通过 channel()
办法给疏导类 ServerBootstrap
指定了 IO 模型为NIO
NioServerSocketChannel
:指定服务端的 IO 模型为 NIO,与 BIO 编程模型中的ServerSocket
对应NioSocketChannel
: 指定客户端的 IO 模型为 NIO,与 BIO 编程模型中的Socket
对应
5. 通过 .childHandler()
给疏导类创立一个ChannelInitializer
,而后指定了服务端音讯的业务解决逻辑也就是自定义的ChannelHandler
对象
6. 调用 ServerBootstrap
类的 bind()
办法绑定端口。
//bind()是异步的,然而,你能够通过 `sync()` 办法将其变为同步。ChannelFuture f = b.bind(port).sync();
自定义服务端 ChannelHandler 解决音讯
HelloServerHandler.java
/**
* @author shuang.kou
* @createTime 2020 年 05 月 14 日 20:39:00
*/
@Sharable
public class HelloServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {ByteBuf in = (ByteBuf) msg;
System.out.println("message from client:" + in.toString(CharsetUtil.UTF_8));
// 发送音讯给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("你也好!", CharsetUtil.UTF_8));
} finally {ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();}
}
这个逻辑处理器继承自ChannelInboundHandlerAdapter
并重写了上面 2 个办法:
channelRead()
:服务端接管客户端发送数据调用的办法exceptionCaught()
:解决客户端音讯产生异样的时候被调用
客户端
创立客户端
public final class HelloClient {
private final String host;
private final int port;
private final String message;
public HelloClient(String host, int port, String message) {
this.host = host;
this.port = port;
this.message = message;
}
private void start() throws InterruptedException {
//1. 创立一个 NioEventLoopGroup 对象实例
EventLoopGroup group = new NioEventLoopGroup();
try {
//2. 创立客户端启动疏导 / 辅助类:Bootstrap
Bootstrap b = new Bootstrap();
//3. 指定线程组
b.group(group)
//4. 指定 IO 模型
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
// 5. 这里能够自定义音讯的业务解决逻辑
p.addLast(new HelloClientHandler(message));
}
});
// 6. 尝试建设连贯
ChannelFuture f = b.connect(host, port).sync();
// 7. 期待连贯敞开(阻塞,直到 Channel 敞开)f.channel().closeFuture().sync();} finally {group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {new HelloClient("127.0.0.1",8080, "你好, 你真帅啊!哥哥!").start();}
}
持续剖析一下客户端的创立流程:
1. 创立一个 NioEventLoopGroup
对象实例(_服务端创立了两个 NioEventLoopGroup
对象_)
2. 创立客户端启动的疏导类是 Bootstrap
3. 通过 .group()
办法给疏导类 Bootstrap
配置一个线程组
4. 通过 channel()
办法给疏导类 Bootstrap
指定了 IO 模型为NIO
5. 通过 .childHandler()
给疏导类创立一个ChannelInitializer
,而后指定了客户端音讯的业务解决逻辑也就是自定义的ChannelHandler
对象
6. 调用 Bootstrap
类的 connect()
办法连贯服务端,这个办法须要指定两个参数:
inetHost
: ip 地址inetPort
: 端口号
public ChannelFuture connect(String inetHost, int inetPort) {return this.connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
this.validate();
return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
}
connect
办法返回的是一个 Future
类型的对象
public interface ChannelFuture extends Future<Void> {......}
也就是说这个方是异步的,咱们通过 addListener
办法能够监听到连贯是否胜利,进而打印出连贯信息。具体做法很简略,只须要对代码进行以下改变:
ChannelFuture f = b.connect(host, port).addListener(future -> {if (future.isSuccess()) {System.out.println("连贯胜利!");
} else {System.err.println("连贯失败!");
}
}).sync();
自定义客户端 ChannelHandler 解决音讯
HelloClientHandler.java
/**
* @author shuang.kou
* @createTime 2020 年 05 月 14 日 20:46:00
*/
@Sharable
public class HelloClientHandler extends ChannelInboundHandlerAdapter {
private final String message;
public HelloClientHandler(String message) {this.message = message;}
@Override
public void channelActive(ChannelHandlerContext ctx) {System.out.println("client sen msg to server" + message);
ctx.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf in = (ByteBuf) msg;
try {System.out.println("client receive msg from server:" + in.toString(CharsetUtil.UTF_8));
} finally {ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();
ctx.close();}
}
这个逻辑处理器继承自 ChannelInboundHandlerAdapter
,并且笼罩了上面三个办法:
channelActive()
: 客户端和服务端的连贯建设之后就会被调用channelRead
: 客户端接管服务端发送数据调用的办法exceptionCaught
: 解决音讯产生异样的时候被调用
运行程序
首先运行服务端,而后再运行客户端。
如果你看到,服务端控制台打印出:
message from client: 你好, 你真帅啊!哥哥!
客户端控制台打印出:
client sen msg to server 你好, 你真帅啊!哥哥!client receive msg from server: 你也好!
阐明你的 Netty 版的 Hello World 曾经实现了!
总结
这篇文章咱们本人实现了一个 Netty 版的 Hello World,并且具体介绍了服务端和客户端的创立流程。客户端和服务端这块的创立流程,套路根本都差不多,差异可能就在相干配置方面。
文中波及的代码,你能够在这里找到:https://github.com/Snailclimb/guide-rpc-framework-learning/tree/master/src/main/java/github/javaguide/netty/echo。