1.用Netty实现群聊零碎

2.用Netty实现心跳检测

3.用Netty编程实现客户端与服务器端的长链接

4.Netty外围模块组件剖析

咱们先通过代码示例,感受一下Netty的应用,而后再分析这些工具类每一个的作用。

1.用Netty实现群聊零碎

咱们用Netty实现一个群聊零碎,实现客户端和服务器端之间的简略通信

性能点如下:
1)服务器端:能够检测用户上线,离线,并实现音讯转发
2)客户端:通过channel能够把音讯发送给所有其它用户,同时也能接管其它用户发送的信息。

server端:

client端:


server:

package com.example.demo.netty.nettyDemo.chat;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import lombok.extern.slf4j.Slf4j;/** * @author sulingfeng * @title: GroupChatServer * @projectName netty-learn * @description: TODO * @date 2022/7/12 17:12 */@Slf4jpublic class GroupChatServer {    private int port;    public GroupChatServer(int port) {        this.port = port;    }    public void run() throws Exception{        //两个工作线程组        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try{            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)//应用NioSocketChannel作为通道的实现                    .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列的连贯数量下限                    .childOption(ChannelOption.SO_KEEPALIVE, true)//放弃流动连贯状态                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            //获取到pipeline                            ChannelPipeline pipeline = ch.pipeline();                            //向pipeline退出解码器                            pipeline.addLast("decoder", new StringDecoder());                            //向pipeline退出编码器                            pipeline.addLast("encoder", new StringEncoder());                            //退出本人的业务解决handler                            pipeline.addLast(new GroupChatServerHandler());                        }                    });            log.info("netty 服务器启动!");            ChannelFuture channelFuture = bootstrap.bind(port).sync();            channelFuture.channel().closeFuture().sync();        }finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception{        //启动        GroupChatServer groupChatServer = new GroupChatServer(10087);        groupChatServer.run();    }}

serverHandler:

package com.example.demo.netty.nettyDemo.chat;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;import lombok.extern.slf4j.Slf4j;import java.text.SimpleDateFormat;import java.util.Date;/** * @author sulingfeng * @title: GroupChatServerHandler * @projectName netty-learn * @description: TODO * @date 2022/7/12 17:21 */@Slf4jpublic class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);    @Override    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        Channel channel = ctx.channel();        channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"退出聊天"+"\n");        channelGroup.add(channel);    }    @Override    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {        Channel channel = ctx.channel();        channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"来到了\n");    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        log.info(ctx.channel().remoteAddress()+"上线了~");    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        log.info(ctx.channel().remoteAddress()+"离线了~");    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {        Channel channel = ctx.channel();        //遍历channelGroup,依据不同的状况,回送不同的信息        channelGroup.forEach(ch->{            if(channel!=ch){                ch.writeAndFlush("[客户]:["+channel.remoteAddress()+"]发送了信息:"+msg+"\n");            }else{                ch.writeAndFlush("[本人]发送了信息:"+msg+"\n");            }        });    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();    }}

client:

package com.example.demo.netty.nettyDemo.chat;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import lombok.extern.slf4j.Slf4j;import java.util.Scanner;/** * @author sulingfeng * @title: GroupChatClient * @projectName netty-learn * @description: TODO * @date 2022/7/12 17:33 */@Slf4jpublic class GroupChatClient {    //属性    private final String host;    private final int port;    public GroupChatClient(String host, int port) {        this.host = host;        this.port = port;    }    public void run() throws Exception{        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap bootstrap = new Bootstrap()                    .group(group)                    .channel(NioSocketChannel.class)                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            //失去pipeline                            ChannelPipeline pipeline = ch.pipeline();                            //退出相干handler                            pipeline.addLast("decoder", new StringDecoder());                            pipeline.addLast("encoder", new StringEncoder());                            //退出自定义的handler                            pipeline.addLast(new GroupChatClientHandler());                        }                    });            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();            Channel channel = channelFuture.channel();            //客户端须要输出信息,创立一个输出循环            Scanner scanner = new Scanner(System.in);            while (!scanner.hasNext("#")) {                String msg = scanner.nextLine();                //通过channel 发送到服务器端                channel.writeAndFlush(msg + "\r\n");            }        }finally {            group.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception {        new GroupChatClient("127.0.0.1", 10087).run();    }}

clientHandler:

package com.example.demo.netty.nettyDemo.chat;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import lombok.extern.slf4j.Slf4j;/** * @author sulingfeng * @title: GroupChatClientHandler * @projectName netty-learn * @description: TODO * @date 2022/7/12 17:36 */@Slf4jpublic class GroupChatClientHandler  extends SimpleChannelInboundHandler<String> {    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {        log.info(s.trim());    }}

2.用Netty实现心跳检测

咱们编写一个服务器:
当服务器超过5秒没有读时,就输入读闲暇。
当服务器超过6秒没有写时,就提醒写闲暇。
当服务器超过7秒没有读写时,就提醒读写闲暇。

server:

package com.example.demo.netty.nettyDemo.chat.heartbeat;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/** * @author sulingfeng * @title: BeatServer * @projectName netty-learn * @description: TODO * @date 2022/7/26 13:35 */public class BeatServer {    public static void main(String[] args) throws Exception {        //两个工作线程组        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .handler(new LoggingHandler())                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline pipeline = ch.pipeline();                            /**                             * IdleStateHandler 是netty提供的解决闲暇状态的处理器                             * long readerIdleTime 示意长时间没有读,就会发送心跳检测包是否连贯                             * long writerIdleTime 示意长时间没有写,就会发送心跳检测包是否连贯                             * long allIdleTime    示意长时间没有读写,就会发送心跳检测包是否连贯                             */                            pipeline.addLast(new IdleStateHandler(5,6,7, TimeUnit.SECONDS));                            pipeline.addLast(new MyHeartHandler());                        }                    });            ChannelFuture channelFuture = bootstrap.bind(10087).sync();            channelFuture.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}

serverHandler:

package com.example.demo.netty.nettyDemo.chat.heartbeat;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.timeout.IdleStateEvent;import io.netty.handler.timeout.IdleStateHandler;/** * @author sulingfeng * @title: MyHeartHandler * @projectName netty-learn * @description: TODO * @date 2022/7/26 13:57 */public class MyHeartHandler extends ChannelInboundHandlerAdapter {    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if(evt instanceof IdleStateEvent){            IdleStateEvent event = (IdleStateEvent)evt;            String eventType = null;            switch (event.state()){                case READER_IDLE:                    eventType="读闲暇";                    break;                case WRITER_IDLE:                    eventType="写闲暇";                    break;                case ALL_IDLE:                    eventType="读写闲暇";                    break;            }            System.out.println(ctx.channel().remoteAddress()+"--超时工夫--"+eventType);            System.out.println("服务器做相应解决");        }    }}

3.用Netty编程实现客户端与服务器端的长链接

实现一个基于webSocket的长链接全双工交互,从页面发动调用,客户端就能够和服务端互发信息了。

server:

package com.example.demo.netty.nettyDemo.socket;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.stream.ChunkedWriteHandler;import lombok.extern.slf4j.Slf4j;/** * @author sulingfeng * @title: MyServer * @projectName netty-learn * @description: TODO * @date 2022/7/13 13:57 */@Slf4jpublic class MyServer {    public static void main(String[] args) throws Exception {        //创立两个线程组        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop        try{            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap                    .group(bossGroup,workerGroup)                    .channel(NioServerSocketChannel.class)                    .handler(new LoggingHandler(LogLevel.INFO))                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline pipeline = ch.pipeline();                            //因为Http协定,应用http的编码和解码器                            pipeline.addLast(new HttpServerCodec());                            //以块的形式写,增加ChunkedWriteHandler处理器                            pipeline.addLast(new ChunkedWriteHandler());                            //http数据如果很大,就会将数据分段发送,收回屡次http申请                            //参数示意byte数组最大长度                            pipeline.addLast(new HttpObjectAggregator(8192));                            //浏览器申请的对应接口 : localhost:7000/hello                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));                            //自定义的handler ,解决业务逻辑                            pipeline.addLast(new MyTextWebSocketFrameHandler());                        }                    });            //启动服务器            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();            channelFuture.channel().closeFuture().sync();        }finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}

handler:

package com.example.demo.netty.nettyDemo.socket;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import java.time.LocalDateTime;/** * @author sulingfeng * @title: MyTextWebSocketFrameHandler * @projectName netty-learn * @description: TODO * @date 2022/7/13 14:02 */public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {        System.out.println("服务器收到音讯 " + msg.text());        //回复音讯        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器工夫" + LocalDateTime.now() + " " + msg.text()));    }    @Override    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {        System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        System.out.println("异样产生 " + cause.getMessage());        ctx.close(); //敞开连贯    }}

html:

<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><title>Title</title></head><body><script>var socket;//判断以后浏览器是否反对 websocketif(window.WebSocket) {//go onsocket = new WebSocket("ws://localhost:7000/hello");//相当于 channelReado, ev 收到服务器端回送的音讯socket.onmessage = function (ev) {var rt = document.getElementById("responseText");rt.value = rt.value + "\n" + ev.data;}//相当于连贯开启(感知到连贯开启)socket.onopen = function (ev) {var rt = document.getElementById("responseText");rt.value = "连贯开启了.." }//相当于连贯敞开(感知到连贯敞开)socket.onclose = function (ev) {var rt = document.getElementById("responseText");rt.value = rt.value + "\n" + "连贯敞开了.." }} else {alert("以后浏览器不反对 websocket")}//发送音讯到服务器function send(message) {if(!window.socket) { //先判断 socket 是否创立好return;}if(socket.readyState == WebSocket.OPEN) {//通过 socket 发送音讯socket.send(message)} else {alert("连贯没有开启");}}</script><form onsubmit="return false"><textarea name="message" style="height: 300px; width: 300px"></textarea><input type="button" value="产生音讯" onclick="send(this.form.message.value)"><textarea id="responseText" style="height: 300px; width: 300px"></textarea><input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''"></form></body></html>

4.Netty外围模块组件剖析

咱们通过案例大抵学了一下Netty的应用,然而咱们还是只会api的调用,对于其中的每一个组件,以及每一个组件的原理,咱们都知之甚少,接下来咱们对每个组件都进行分析一番:

4.1)Bootstrap、ServerBootstrap

这是一个疏导类,一个Netty程序都是从Bootstrap开始的,次要作用是配置以及启动整个程序,串联所有netty的组件。Bootstrap是客户端的疏导类,ServerBootstrap是服务器端的疏导类。

4.2)Future,ChannelFuture

Netty所有的IO操作都是异步的,不能立即得悉音讯是否被解决。只能过一会儿等它执行实现,当操作胜利,应用程序天然就会监听到了。

4.3)channel
channel次要用于执行网络IO操作。
4.3.1)通过channel能够取得以后通道的状态。
4.3.2)通过channel能进行IO操作。
4.3.3)通过channel能取得缓冲区的大小

4.4)selector
Netty基于selector的I/O多路复用,通过selector一个线程能够监听多个链接的事件。

当一个selector中注册channel后,selector外部能够始终查问这些channel是否有就绪的I/O事件,这样程序就能够简略地应用一个线程高效治理多个channel。

4.5)channelHandler及其实现类
它是用来解决I/O工夫,并将其转发到ChannelPipeline(业务解决链路)的下一个处理程序。

咱们个别都定义一个Handler来减少咱们自定义的业务解决类。

4.6)Pipeline 和 ChannelPipeline
这是咱们次要的业务逻辑解决类,咱们通常通过继承类重写办法来退出咱们想要的业务逻辑。

4.7)ChannelHandlerContext
保留 Channel 相干的所有上下文信息,能够取得以后的channel等信息。

4.8)ChannelOption

Netty 在创立 Channel 实例后,个别都须要设置 ChannelOption 参数

4.9)EventLoopGroup 和其实现类 NioEventLoopGroup
是一组工作线程,Netty为了更好地利用多核cpu资源,个别会有多个EventLoop同时工作,每个EentLoop保护着一个seelctor实例。

4.10)Unpooled 类

Netty 提供一个专门用来操作缓冲区(即 Netty 的数据容器)的工具类。