我的项目构造
服务端
public class MyServer {public static void main(String[] args) throws Exception{
// 负责连贯的 NioEventLoopGroup 线程数为 1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup wokerGroup = new NioEventLoopGroup();
try{ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
// 退出日志
.handler(new LoggingHandler(LogLevel.INFO))
// 自定义 channel 初始化器
.childHandler(new WebSocketChannelInitializer());
// 绑定本机的 8005 端口
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8005)).sync();
// 异步回调 - 敞开事件
channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();}
}
channel 初始化器
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();
//websocket 协定自身是基于 http 协定的,所以这边也要应用 http 解编码器
pipeline.addLast(new HttpServerCodec());
// 以块的形式来写的处理器
pipeline.addLast(new ChunkedWriteHandler());
//netty 是基于分段申请的,HttpObjectAggregator 的作用是将申请分段再聚合, 参数是聚合字节的最大长度
pipeline.addLast(new HttpObjectAggregator(8192));
// 应用 websocket 协定
//ws://server:port/context_path
// 参数指的是 contex_path
pipeline.addLast(new WebSocketServerProtocolHandler("/world"));
//websocket 定义了传递数据的中 frame 类型
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
WebSocketChannelInitializer
channel 初始化器
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();
//websocket 协定自身是基于 http 协定的,所以这边也要应用 http 解编码器
pipeline.addLast(new HttpServerCodec());
// 以块的形式来写的处理器
pipeline.addLast(new ChunkedWriteHandler());
//netty 是基于分段申请的,HttpObjectAggregator 的作用是将申请分段再聚合, 参数是聚合字节的最大长度
pipeline.addLast(new HttpObjectAggregator(8192));
// 应用 websocket 协定
//ws://server:port/context_path
// 参数指的是 contex_path
pipeline.addLast(new WebSocketServerProtocolHandler("/world"));
//websocket 定义了传递数据的中 frame 类型,这里应用 TextWebSocketFrame 并自定义一个 handler
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 通道列表 用于寄存通道
*/
public static CopyOnWriteArrayList<Channel> channelList = new CopyOnWriteArrayList<Channel>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
/**
* writeAndFlush 接管的参数类型是 Object 类型,然而个别咱们都是要传入管道中传输数据的类型,比方咱们以后的 demo
* 传输的就是 TextWebSocketFrame 类型的数据
*/
System.out.println(msg.text());
// 遍历通道 list,向非以后通道发送音讯
channelList.forEach(channel -> {if (channel != ctx.channel()){channel.writeAndFlush(new TextWebSocketFrame(ctx.channel().id().asShortText()+":" +msg.text()));
}
else {channel.writeAndFlush(new TextWebSocketFrame("我:" +msg.text()));
}
}
);
}
/**
* channel add 后触发, 向 channelList 增加新退出的通道
*
* @param ctx ctx
* @throws Exception 异样
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {channelList.forEach(channel -> channel.writeAndFlush(new TextWebSocketFrame(ctx.channel().id().asShortText()+"上线了")));
channelList.add(ctx.channel());
}
/**
* channel 连贯断开时触发,猜想是 TCP 断开时触发回调 发送连贯断开事件
*
* @param ctx ctx
* @throws Exception 异样
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {channelList.forEach(channel -> channel.writeAndFlush(new TextWebSocketFrame(ctx.channel().id().asShortText()+"下线了")));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("异样产生");
ctx.close();}
TextWebSocketFrameHandler
继承了 SimpleChannelInboundHandler
,SimpleChannelInboundHandler
实际上是一个 ChannelHandlerAdapter
,其办法会在入站的时候被调用。这里咱们通过重写handlerAdded
办法,在通道创立后将其退出当 channelList 中,并向其余通道发送成员上线的提示信息。重写 channelRead0
办法,向所有非以后 channel 发送读取到音讯。
前端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket 客户端 </title>
</head>
<body>
<script type="text/javascript">
var socket;
// 如果浏览器反对 WebSocket
if(window.WebSocket){
// 参数就是与服务器连贯的地址
socket = new WebSocket("ws://localhost:8005/world");
// 客户端收到服务器音讯的时候就会执行这个回调办法
socket.onmessage = function (event) {var ta = document.getElementById("responseText");
ta.value = ta.value + "n"+event.data;
}
// 连贯建设的回调函数
socket.onopen = function(event){var ta = document.getElementById("responseText");
ta.value = "连贯开启";
}
// 连贯断掉的回调函数
socket.onclose = function (event) {var ta = document.getElementById("responseText");
ta.value = ta.value +"n"+"连贯敞开";
}
}else{alert("浏览器不反对 WebSocket!");
}
// 发送数据
function send(message){if(!window.WebSocket){return;}
// 当 websocket 状态关上
if(socket.readyState == WebSocket.OPEN){socket.send(message);
}else{alert("连贯没有开启");
}
}
</script>
<form onsubmit="return false">
<textarea name = "message" style="width: 400px;height: 200px"></textarea>
<input type ="button" value="发送数据" onclick="send(this.form.message.value);">
<h3> 服务器输入:</h3>
<textarea id ="responseText" style="width: 400px;height: 300px;"></textarea>
<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空数据">
</form>
</body>
</html>
测试