我的项目构造

服务端

public class MyServer { public static void main(String[] args) throws Exception{ // 负责连贯的NioEventLoopGroup线程数为1 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup wokerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class) // 退出日志 .handler(new LoggingHandler(LogLevel.INFO)) // 自定义channel初始化器 .childHandler(new WebSocketChannelInitializer()); // 绑定本机的8005端口 ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8005)).sync(); // 异步回调-敞开事件 channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } }

channel初始化器

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //websocket协定自身是基于http协定的,所以这边也要应用http解编码器 pipeline.addLast(new HttpServerCodec()); //以块的形式来写的处理器 pipeline.addLast(new ChunkedWriteHandler()); //netty是基于分段申请的,HttpObjectAggregator的作用是将申请分段再聚合,参数是聚合字节的最大长度 pipeline.addLast(new HttpObjectAggregator(8192)); // 应用websocket协定 //ws://server:port/context_path //参数指的是contex_path pipeline.addLast(new WebSocketServerProtocolHandler("/world")); //websocket定义了传递数据的中frame类型 pipeline.addLast(new TextWebSocketFrameHandler()); }}

WebSocketChannelInitializerchannel初始化器

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //websocket协定自身是基于http协定的,所以这边也要应用http解编码器 pipeline.addLast(new HttpServerCodec()); //以块的形式来写的处理器 pipeline.addLast(new ChunkedWriteHandler()); //netty是基于分段申请的,HttpObjectAggregator的作用是将申请分段再聚合,参数是聚合字节的最大长度 pipeline.addLast(new HttpObjectAggregator(8192)); // 应用websocket协定 //ws://server:port/context_path //参数指的是contex_path pipeline.addLast(new WebSocketServerProtocolHandler("/world")); //websocket定义了传递数据的中frame类型,这里应用TextWebSocketFrame并自定义一个handler pipeline.addLast(new TextWebSocketFrameHandler()); }}
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { /** * 通道列表 用于寄存通道 */ public static CopyOnWriteArrayList<Channel> channelList = new CopyOnWriteArrayList<Channel>(); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { /** * writeAndFlush接管的参数类型是Object类型,然而个别咱们都是要传入管道中传输数据的类型,比方咱们以后的demo * 传输的就是TextWebSocketFrame类型的数据 */ System.out.println(msg.text()); // 遍历通道list,向非以后通道发送音讯 channelList.forEach(channel -> { if (channel != ctx.channel()){ channel.writeAndFlush(new TextWebSocketFrame(ctx.channel().id().asShortText()+":" +msg.text())); } else { channel.writeAndFlush(new TextWebSocketFrame("我:" +msg.text())); } } ); } /** * channel add后触发,向channelList增加新退出的通道 * * @param ctx ctx * @throws Exception 异样 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { channelList.forEach(channel -> channel.writeAndFlush(new TextWebSocketFrame(ctx.channel().id().asShortText()+"上线了"))); channelList.add(ctx.channel()); } /** * channel连贯断开时触发,猜想是TCP断开时触发回调 发送连贯断开事件 * * @param ctx ctx * @throws Exception 异样 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { channelList.forEach(channel -> channel.writeAndFlush(new TextWebSocketFrame(ctx.channel().id().asShortText()+"下线了"))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异样产生"); ctx.close(); }

TextWebSocketFrameHandler继承了SimpleChannelInboundHandler,SimpleChannelInboundHandler实际上是一个ChannelHandlerAdapter,其办法会在入站的时候被调用。这里咱们通过重写handlerAdded办法,在通道创立后将其退出当channelList中,并向其余通道发送成员上线的提示信息。重写channelRead0办法,向所有非以后channel发送读取到音讯。

前端

<!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>WebSocket客户端</title></head><body><script type="text/javascript"> var socket; //如果浏览器反对WebSocket if(window.WebSocket){        //参数就是与服务器连贯的地址 socket = new WebSocket("ws://localhost:8005/world"); //客户端收到服务器音讯的时候就会执行这个回调办法 socket.onmessage = function (event) {            var ta = document.getElementById("responseText"); ta.value = ta.value + "n"+event.data; }        //连贯建设的回调函数 socket.onopen = function(event){            var ta = document.getElementById("responseText"); ta.value = "连贯开启"; }        //连贯断掉的回调函数 socket.onclose = function (event) {            var ta = document.getElementById("responseText"); ta.value = ta.value +"n"+"连贯敞开"; }    }else{        alert("浏览器不反对WebSocket!"); }    //发送数据 function send(message){        if(!window.WebSocket){            return; }        //当websocket状态关上 if(socket.readyState == WebSocket.OPEN){            socket.send(message); }else{            alert("连贯没有开启"); }    }</script><form onsubmit="return false"> <textarea name = "message" style="width: 400px;height: 200px"></textarea> <input type ="button" value="发送数据" onclick="send(this.form.message.value);"> <h3>服务器输入:</h3> <textarea id ="responseText" style="width: 400px;height: 300px;"></textarea> <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空数据"></form></body></html>

测试