1. 用 Netty 实现群聊零碎
2. 用 Netty 实现心跳检测
3. 用 Netty 编程实现客户端与服务器端的长链接
4.Netty 外围模块组件剖析
咱们先通过代码示例,感受一下 Netty 的应用,而后再分析这些工具类每一个的作用。
1. 用 Netty 实现群聊零碎
咱们用 Netty 实现一个群聊零碎,实现 客户端和服务器端之间的简略通信。
性能点如下:
1)服务器端:能够检测用户上线,离线,并实现音讯转发
2)客户端:通过 channel 能够把音讯发送给所有其它用户,同时也能接管其它用户发送的信息。
server 端:
client 端:
server:
package com.example.demo.netty.nettyDemo.chat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
/**
* @author sulingfeng
* @title: GroupChatServer
* @projectName netty-learn
* @description: TODO
* @date 2022/7/12 17:12
*/
@Slf4j
public class GroupChatServer {
private int port;
public GroupChatServer(int port) {this.port = port;}
public void run() throws Exception{
// 两个工作线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)// 应用 NioSocketChannel 作为通道的实现
.option(ChannelOption.SO_BACKLOG, 128)// 设置线程队列的连贯数量下限
.childOption(ChannelOption.SO_KEEPALIVE, true)// 放弃流动连贯状态
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 获取到 pipeline
ChannelPipeline pipeline = ch.pipeline();
// 向 pipeline 退出解码器
pipeline.addLast("decoder", new StringDecoder());
// 向 pipeline 退出编码器
pipeline.addLast("encoder", new StringEncoder());
// 退出本人的业务解决 handler
pipeline.addLast(new GroupChatServerHandler());
}
});
log.info("netty 服务器启动!");
ChannelFuture channelFuture = bootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();}
}
public static void main(String[] args) throws Exception{
// 启动
GroupChatServer groupChatServer = new GroupChatServer(10087);
groupChatServer.run();}
}
serverHandler:
package com.example.demo.netty.nettyDemo.chat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author sulingfeng
* @title: GroupChatServerHandler
* @projectName netty-learn
* @description: TODO
* @date 2022/7/12 17:21
*/
@Slf4j
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"退出聊天"+"\n");
channelGroup.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"来到了 \n");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {log.info(ctx.channel().remoteAddress()+"上线了~");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info(ctx.channel().remoteAddress()+"离线了~");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {Channel channel = ctx.channel();
// 遍历 channelGroup,依据不同的状况,回送不同的信息
channelGroup.forEach(ch->{if(channel!=ch){ch.writeAndFlush("[客户]:["+channel.remoteAddress()+"]发送了信息:"+msg+"\n");
}else{ch.writeAndFlush("[本人]发送了信息:"+msg+"\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();
}
}
client:
package com.example.demo.netty.nettyDemo.chat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.util.Scanner;
/**
* @author sulingfeng
* @title: GroupChatClient
* @projectName netty-learn
* @description: TODO
* @date 2022/7/12 17:33
*/
@Slf4j
public class GroupChatClient {
// 属性
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception{EventLoopGroup group = new NioEventLoopGroup();
try {Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 失去 pipeline
ChannelPipeline pipeline = ch.pipeline();
// 退出相干 handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 退出自定义的 handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
// 客户端须要输出信息,创立一个输出循环
Scanner scanner = new Scanner(System.in);
while (!scanner.hasNext("#")) {String msg = scanner.nextLine();
// 通过 channel 发送到服务器端
channel.writeAndFlush(msg + "\r\n");
}
}finally {group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {new GroupChatClient("127.0.0.1", 10087).run();}
}
clientHandler:
package com.example.demo.netty.nettyDemo.chat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
/**
* @author sulingfeng
* @title: GroupChatClientHandler
* @projectName netty-learn
* @description: TODO
* @date 2022/7/12 17:36
*/
@Slf4j
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {log.info(s.trim());
}
}
2. 用 Netty 实现心跳检测
咱们编写一个服务器:
当服务器超过 5 秒没有读时,就输入读闲暇。
当服务器超过 6 秒没有写时,就提醒写闲暇。
当服务器超过 7 秒没有读写时,就提醒读写闲暇。
server:
package com.example.demo.netty.nettyDemo.chat.heartbeat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* @author sulingfeng
* @title: BeatServer
* @projectName netty-learn
* @description: TODO
* @date 2022/7/26 13:35
*/
public class BeatServer {public static void main(String[] args) throws Exception {
// 两个工作线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();
/**
* IdleStateHandler 是 netty 提供的解决闲暇状态的处理器
* long readerIdleTime 示意长时间没有读,就会发送心跳检测包是否连贯
* long writerIdleTime 示意长时间没有写,就会发送心跳检测包是否连贯
* long allIdleTime 示意长时间没有读写,就会发送心跳检测包是否连贯
*/
pipeline.addLast(new IdleStateHandler(5,6,7, TimeUnit.SECONDS));
pipeline.addLast(new MyHeartHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(10087).sync();
channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();}
}
}
serverHandler:
package com.example.demo.netty.nettyDemo.chat.heartbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
/**
* @author sulingfeng
* @title: MyHeartHandler
* @projectName netty-learn
* @description: TODO
* @date 2022/7/26 13:57
*/
public class MyHeartHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if(evt instanceof IdleStateEvent){IdleStateEvent event = (IdleStateEvent)evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType="读闲暇";
break;
case WRITER_IDLE:
eventType="写闲暇";
break;
case ALL_IDLE:
eventType="读写闲暇";
break;
}
System.out.println(ctx.channel().remoteAddress()+"-- 超时工夫 --"+eventType);
System.out.println("服务器做相应解决");
}
}
}
3. 用 Netty 编程实现客户端与服务器端的长链接
实现一个基于 webSocket 的长链接全双工交互,从页面发动调用,客户端就能够和服务端互发信息了。
server:
package com.example.demo.netty.nettyDemo.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
/**
* @author sulingfeng
* @title: MyServer
* @projectName netty-learn
* @description: TODO
* @date 2022/7/13 13:57
*/
@Slf4j
public class MyServer {public static void main(String[] args) throws Exception {
// 创立两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 8 个 NioEventLoop
try{ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();
// 因为 Http 协定,应用 http 的编码和解码器
pipeline.addLast(new HttpServerCodec());
// 以块的形式写,增加 ChunkedWriteHandler 处理器
pipeline.addLast(new ChunkedWriteHandler());
//http 数据如果很大,就会将数据分段发送,收回屡次 http 申请
// 参数示意 byte 数组最大长度
pipeline.addLast(new HttpObjectAggregator(8192));
// 浏览器申请的对应接口:localhost:7000/hello
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
// 自定义的 handler,解决业务逻辑
pipeline.addLast(new MyTextWebSocketFrameHandler());
}
});
// 启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();}
}
}
handler:
package com.example.demo.netty.nettyDemo.socket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
/**
* @author sulingfeng
* @title: MyTextWebSocketFrameHandler
* @projectName netty-learn
* @description: TODO
* @date 2022/7/13 14:02
*/
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {System.out.println("服务器收到音讯" + msg.text());
// 回复音讯
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器工夫" + LocalDateTime.now() + " " + msg.text()));
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("异样产生" + cause.getMessage());
ctx.close(); // 敞开连贯}
}
html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
// 判断以后浏览器是否反对 websocket
if(window.WebSocket) {
//go on
socket = new WebSocket("ws://localhost:7000/hello");
// 相当于 channelReado, ev 收到服务器端回送的音讯
socket.onmessage = function (ev) {var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
// 相当于连贯开启(感知到连贯开启)
socket.onopen = function (ev) {var rt = document.getElementById("responseText");
rt.value = "连贯开启了.." }
// 相当于连贯敞开(感知到连贯敞开)
socket.onclose = function (ev) {var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连贯敞开了.." }
} else {alert("以后浏览器不反对 websocket")
}
// 发送音讯到服务器
function send(message) {if(!window.socket) { // 先判断 socket 是否创立好
return;
}
if(socket.readyState == WebSocket.OPEN) {
// 通过 socket 发送音讯
socket.send(message)
} else {alert("连贯没有开启");
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height: 300px; width: 300px"></textarea>
<input type="button" value="产生音讯" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
4.Netty 外围模块组件剖析
咱们通过案例大抵学了一下 Netty 的应用,然而咱们还是只会 api 的调用,对于其中的每一个组件,以及每一个组件的原理,咱们都知之甚少,接下来咱们对每个组件都进行分析一番:
4.1)Bootstrap、ServerBootstrap
这是一个疏导类,一个 Netty 程序都是从 Bootstrap 开始的,次要作用是配置以及启动整个程序,串联所有 netty 的组件。Bootstrap 是客户端的疏导类,ServerBootstrap 是服务器端的疏导类。
4.2)Future,ChannelFuture
Netty 所有的 IO 操作都是异步的,不能立即得悉音讯是否被解决。只能过一会儿等它执行实现,当操作胜利,应用程序天然就会监听到了。
4.3)channel
channel 次要用于执行网络 IO 操作。
4.3.1)通过 channel 能够取得以后通道的状态。
4.3.2)通过 channel 能进行 IO 操作。
4.3.3)通过 channel 能取得缓冲区的大小
4.4)selector
Netty 基于 selector 的 I / O 多路复用,通过 selector 一个线程能够监听多个链接的事件。
当一个 selector 中注册 channel 后,selector 外部能够始终查问这些 channel 是否有就绪的 I / O 事件,这样程序就能够简略地应用一个线程高效治理多个 channel。
4.5)channelHandler 及其实现类
它是用来解决 I / O 工夫,并将其转发到 ChannelPipeline(业务解决链路)的下一个处理程序。
咱们个别都定义一个 Handler 来减少咱们自定义的业务解决类。
4.6)Pipeline 和 ChannelPipeline
这是咱们次要的业务逻辑解决类,咱们通常通过继承类重写办法来退出咱们想要的业务逻辑。
4.7)ChannelHandlerContext
保留 Channel 相干的所有上下文信息,能够取得以后的 channel 等信息。
4.8)ChannelOption
Netty 在创立 Channel 实例后, 个别都须要设置 ChannelOption 参数
4.9)EventLoopGroup 和其实现类 NioEventLoopGroup
是一组工作线程,Netty 为了更好地利用多核 cpu 资源,个别会有多个 EventLoop 同时工作,每个 EentLoop 保护着一个 seelctor 实例。
4.10)Unpooled 类
Netty 提供一个专门用来操作缓冲区 (即 Netty 的数据容器) 的工具类。