共计 18275 个字符,预计需要花费 46 分钟才能阅读完成。
需求
实现基于 Netty 的“请求 - 响应”同步通信机制。
设计思路
Netty 提供了异步 IO 和同步 IO 的统一实现,但是我们的需求其实和 IO 的同步异步并无关系。我们的关键是要实现请求 - 响应这种典型的一问一答交互方式。要实现这个需求,需要解决两个问题:
请求和响应的正确匹配。
客户端发送数据后,服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢,(即一个请求对应一个自己的响应)?
解决思路:通过客户端唯一的 RequestId,服务端返回的响应中需要包含该 RequestId,这样客户端就可以通过 RequestId 来正确匹配请求响应。
请求线程和响应线程的通信。
请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty 客户端在接受到响应之后,怎么通知请求线程结果。
解决思路:客户端线程在发送请求后,进入等待,服务器返回响应后,根据 RequestId 来唤醒客户端的请求线程,并把结果返回给请求线程。
解决方案
利用 Java 中的 CountDownLatch 类来实现同步 Future。
具体过程是:客户端发送请求后将 < 请求 ID,Future> 的键值对保存到一个缓存中,这时候用 Future 等待结果,挂住请求线程;当 Netty 客户端收到服务端的响应后,响应线程根据请求 ID 从缓存中取出 Future,然后设置响应结果到 Future 中。这个时候利用 CountDownLatch 的通知机制,通知请求线程。请求线程从 Future 中拿到响应结果,然后做业务处理。
缓存使用 google 的 guava
具体代码
首先引入依赖
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
SyncFuture
SyncFuture:同步的 Future。这个是核心,通过这个工具类来实现线程等待。
package com.topinfo.ci.netty.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class SyncFuture<T> implements Future<T> {
// 因为请求和响应是一一对应的,因此初始化 CountDownLatch 值为 1。private CountDownLatch latch = new CountDownLatch(1);
// 需要响应线程设置的响应结果
private T response;
// Futrue 的请求时间,用于计算 Future 是否超时
private long beginTime = System.currentTimeMillis();
public SyncFuture() {}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {return false;}
@Override
public boolean isCancelled() {return false;}
@Override
public boolean isDone() {if (response != null) {return true;}
return false;
}
// 获取响应结果,直到有结果才返回。@Override
public T get() throws InterruptedException {latch.await();
return this.response;
}
// 获取响应结果,直到有结果或者超过指定时间就返回。@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException {if (latch.await(timeout, unit)) {return this.response;}
return null;
}
// 用于设置响应结果,并且做 countDown 操作,通知请求线程
public void setResponse(T response) {
this.response = response;
latch.countDown();}
public long getBeginTime() {return beginTime;}
}
客户端代码
NettyClient
NettyClient:有消息同步的和异步的方法,具体内容如下:
package com.topinfo.ci.netty.client;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
/**
*@Description: Netty 客户端
*@Author: 杨攀
*@Since:2019 年 9 月 26 日下午 8:54:59
*/
@Component
public class NettyClient {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);
private EventLoopGroup group = new NioEventLoopGroup();
/**
*@Fields DELIMITER : 自定义分隔符,服务端和客户端要保持一致
*/
public static final String DELIMITER = "@@";
/**
* @Fields hostIp : 服务端 ip
*/
private String hostIp = "192.168.90.96";
/**
* @Fields port : 服务端端口
*/
private int port= 8888;
/**
* @Fields socketChannel : 通道
*/
private SocketChannel socketChannel;
/**
*@Fields clientHandlerInitilizer : 初始化
*/
@Autowired
private NettyClientHandlerInitilizer clientHandlerInitilizer;
/**
* @Description: 启动客户端
* @Author: 杨攀
* @Since: 2019 年 9 月 12 日下午 4:43:21
*/
@SuppressWarnings("unchecked")
@PostConstruct
public void start() {Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
// 指定 Channel
.channel(NioSocketChannel.class)
// 服务端地址
.remoteAddress(hostIp, port)
// 将小的数据包包装成更大的帧进行传送,提高网络的负载, 即 TCP 延迟传输
.option(ChannelOption.SO_KEEPALIVE, true)
// 将小的数据包包装成更大的帧进行传送,提高网络的负载, 即 TCP 延迟传输
.option(ChannelOption.TCP_NODELAY, true)
.handler(clientHandlerInitilizer);
// 连接
ChannelFuture channelFuture = bootstrap.connect();
// 客户端断线重连逻辑
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {if(future.isSuccess()) {LOGGER.info("连接 Netty 服务端成功...");
}else {LOGGER.info("连接 Netty 服务端失败,进行断线重连...");
final EventLoop loop =future.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {LOGGER.info("连接正在重试...");
start();}
}, 20, TimeUnit.SECONDS);
}
}
});
socketChannel = (SocketChannel) channelFuture.channel();}
/**
*@Description: 消息发送
*@Author: 杨攀
*@Since: 2019 年 9 月 12 日下午 5:08:47
*@param message
*/
public void sendMsg(String message) {String msg = message.concat(NettyClient.DELIMITER);
ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
ChannelFuture future = socketChannel.writeAndFlush(byteBuf);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {if(future.isSuccess()) {System.out.println("=========== 发送成功");
}else {System.out.println("------------------ 发送失败");
}
}
});
}
/**
*@Description: 发送同步消息
*@Author: 杨攀
*@Since: 2019 年 9 月 12 日下午 5:08:47
*@param message
*/
public String sendSyncMsg(String message, SyncFuture<String> syncFuture) {
String result = "";
String msg = message.concat(NettyClient.DELIMITER);
ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
try {ChannelFuture future = socketChannel.writeAndFlush(byteBuf);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {if(future.isSuccess()) {System.out.println("=========== 发送成功");
}else {System.out.println("------------------ 发送失败");
}
}
});
// 等待 8 秒
result = syncFuture.get(8, TimeUnit.SECONDS);
} catch (InterruptedException e) {e.printStackTrace();
}
return result;
}
public String getHostIp() {return hostIp;}
public void setHostIp(String hostIp) {this.hostIp = hostIp;}
public int getPort() {return port;}
public void setPort(int port) {this.port = port;}
}
NettyClientHandlerInitilizer
NettyClientHandlerInitilizer:初始化
package com.topinfo.ci.netty.client;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
@Component
public class NettyClientHandlerInitilizer extends ChannelInitializer<Channel> {
/**
*@Fields clientHandler : 客户端处理
*/
@Autowired
private NettyClientHandler clientHandler;
@Override
protected void initChannel(Channel ch) throws Exception {
// 通过 socketChannel 去获得对应的管道
ChannelPipeline channelPipeline = ch.pipeline();
/*
* channelPipeline 中会有很多 handler 类(也称之拦截器类)* 获得 pipeline 之后,可以直接.addLast 添加 handler
*/
ByteBuf buf = Unpooled.copiedBuffer(NettyClient.DELIMITER.getBytes());
channelPipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024*1024*2, buf));
//channelPipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
//channelPipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
channelPipeline.addLast(clientHandler);
}
}
NettyClientHandler
NettyClientHandler:客户端处理类,实现了接收
package com.topinfo.ci.netty.client;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.topinfo.ci.netty.service.NettyClientService;
import com.topinfo.ci.netty.utils.ExceptionUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
@Component
@ChannelHandler.Sharable // 标注一个 channel handler 可以被多个 channel 安全地共享
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientHandler.class);
@Autowired
private NettyClientService service;
@Autowired
private NettyClient nettyClient;
/**
* @Description: 服务端发生消息给客户端,会触发该方法进行接收消息
* @Author: 杨攀
* @Since: 2019 年 9 月 12 日下午 5:03:31
* @param ctx
* @param byteBuf
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {String msg = byteBuf.toString(CharsetUtil.UTF_8);
LOGGER.info("客户端收到消息:{}", msg);
//service.ackMsg(msg);
service.ackSyncMsg(msg); // 同步消息返回
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {LOGGER.info("请求连接成功...");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {LOGGER.info("连接被断开...");
// 使用过程中断线重连
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
// 重连
nettyClient.start();}
}, 20, TimeUnit.SECONDS);
super.channelInactive(ctx);
}
/**
* 处理异常, 一般将实现异常处理逻辑的 Handler 放在 ChannelPipeline 的最后
* 这样确保所有入站消息都总是被处理,无论它们发生在什么位置,下面只是简单的关闭 Channel 并打印异常信息
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();
// 输出到日志中
ExceptionUtil.getStackTrace(cause);
Channel channel = ctx.channel();
if (channel.isActive()) {ctx.close();
}
}
}
NettyClientServiceImpl
NettyClientServiceImpl:客户端封装实现类,它接口就不贴出来了。
package com.topinfo.ci.netty.service.impl;
import com.topinfo.ci.netty.bean.RealDataInfo;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.topinfo.ci.netty.bean.Message;
import com.topinfo.ci.netty.client.NettyClient;
import com.topinfo.ci.netty.client.SyncFuture;
import com.topinfo.ci.netty.service.NettyClientService;
import com.topinfo.ci.netty.utils.AESUtil;
@Service
public class NettyClientServiceImpl implements NettyClientService {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientServiceImpl.class);
// 缓存接口这里是 LoadingCache,LoadingCache 在缓存项不存在时可以自动加载缓存
private static LoadingCache<String, SyncFuture> futureCache = CacheBuilder.newBuilder()
// 设置缓存容器的初始容量为 10
.initialCapacity(100)
// maximumSize 设置缓存大小
.maximumSize(10000)
// 设置并发级别为 20,并发级别是指可以同时写缓存的线程数
.concurrencyLevel(20)
// expireAfterWrite 设置写缓存后 8 秒钟过期
.expireAfterWrite(8, TimeUnit.SECONDS)
// 设置缓存的移除通知
.removalListener(new RemovalListener<Object, Object>() {
@Override
public void onRemoval(RemovalNotification<Object, Object> notification) {LOGGER.debug("LoadingCache: {} was removed, cause is {}",notification.getKey(), notification.getCause());
}
})
//build 方法中可以指定 CacheLoader,在缓存不存在时通过 CacheLoader 的实现自动加载缓存
.build(new CacheLoader<String, SyncFuture>() {
@Override
public SyncFuture load(String key) throws Exception {
// 当获取 key 的缓存不存在时,不需要自动添加
return null;
}
});
@Autowired
private NettyClient nettyClient;
@Autowired
private CacheManager cacheManager;
@Override
public boolean sendMsg(String text, String dataId, String serviceId) {LOGGER.info("发送的内容:{}", text);
//TODO
//nettyClient.sendMsg(json);
return true;
}
@Override
public String sendSyncMsg(String text, String dataId, String serviceId) {SyncFuture<String> syncFuture = new SyncFuture<String>();
// 放入缓存中
futureCache.put(dataId, syncFuture);
// 封装数据
JSONObject object = new JSONObject();
object.put("dataId", dataId);
object.put("text", text);
// 发送同步消息
String result = nettyClient.sendSyncMsg(object.toJSONString(), syncFuture);
return result;
}
@Override
public void ackSyncMsg(String msg) {LOGGER.info("ACK 确认信息: {}",msg);
JSONObject object =JSON.parseObject(msg);
String dataId = object.getString("dataId");
// 从缓存中获取数据
SyncFuture<String> syncFuture = futureCache.getIfPresent(dataId);
// 如果不为 null, 则通知返回
if(syncFuture != null) {syncFuture.setResponse(msg);
}
}
}
TestController
TestController:测试 TestController。
package com.topinfo.ci.netty.controller;
import com.alibaba.fastjson.JSON;
import com.topinfo.ci.netty.bean.CmwSensoralert;
import com.topinfo.ci.netty.bean.Equip;
import com.topinfo.ci.netty.bean.JsonResult;
import com.topinfo.ci.netty.bean.RealDataInfo;
import com.topinfo.ci.netty.mapper.SensorAlertMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.topinfo.ci.netty.service.NettyClientService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private NettyClientService clientService;
@Autowired
private SensorAlertMapper sensorAlertMapper;
@RequestMapping("/sendSyncMsg")
public String sendSyncMsg(String dataId, String text) {
String serviceId = "mmmm";
String result = clientService.sendSyncMsg(text, dataId, serviceId);
return "result:"+result ;
}
}
测试,完美实现了“请求 - 响应”的效果。
服务端代码
NettyServer
package com.topinfo.ju.ccon.netty.server;
import java.net.InetSocketAddress;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@Component
public class NettyServer {private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
/**
*@Fields DELIMITER : 自定义分隔符,服务端和客户端要保持一致
*/
public static final String DELIMITER = "@@";
/**
* @Fields boss : boss 线程组用于处理连接工作, 默认是系统 CPU 个数的两倍,也可以根据实际情况指定
*/
private EventLoopGroup boss = new NioEventLoopGroup();
/**
* @Fields work : work 线程组用于数据处理, 默认是系统 CPU 个数的两倍,也可以根据实际情况指定
*/
private EventLoopGroup work = new NioEventLoopGroup();
/**
* @Fields port : 监听端口
*/
private Integer port = 8888;
@Autowired
private NettyServerHandlerInitializer handlerInitializer;
/**
* @throws InterruptedException
* @Description: 启动 Netty Server
* @Author: 杨攀
* @Since: 2019 年 9 月 12 日下午 4:21:35
*/
@PostConstruct
public void start() throws InterruptedException {ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work)
// 指定 Channel
.channel(NioServerSocketChannel.class)
// 使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(port))
// 服务端可连接队列数, 对应 TCP/IP 协议 listen 函数中 backlog 参数
.option(ChannelOption.SO_BACKLOG, 1024)
// 设置 TCP 长连接, 一般如果两个小时内没有数据的通信时,TCP 会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 将小的数据包包装成更大的帧进行传送,提高网络的负载, 即 TCP 延迟传输
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(handlerInitializer);
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {LOGGER.info("启动 Netty Server...");
}
}
@PreDestroy
public void destory() throws InterruptedException {boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
LOGGER.info("关闭 Netty...");
}
}
NettyServerHandlerInitializer
package com.topinfo.ju.ccon.netty.server;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
/**
*@Fields serverHandler : 服务处理
*/
@Autowired
private NettyServerHandler serverHandler;
@Override
protected void initChannel(Channel ch) throws Exception {
// 通过 socketChannel 去获得对应的管道
ChannelPipeline channelPipeline = ch.pipeline();
/*
* channelPipeline 中会有很多 handler 类(也称之拦截器类)* 获得 pipeline 之后,可以直接.addLast 添加 handler
*/
ByteBuf buf = Unpooled.copiedBuffer(NettyServer.DELIMITER.getBytes());
channelPipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024*1024*2, buf));
//channelPipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
//channelPipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
// 自定义解码器,粘包 / 拆包 / 断包
channelPipeline.addLast(serverHandler);
}
}
NettyServerHandler
package com.topinfo.ju.ccon.netty.server;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.topinfo.ju.ccon.netty.bean.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
@Component
@ChannelHandler.Sharable // 标注一个 channel handler 可以被多个 channel 安全地共享
public class NettyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);
public static AtomicInteger nConnection = new AtomicInteger(0);
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {String txt = msg.toString(CharsetUtil.UTF_8);
LOGGER.info("收到客户端的消息:{}", txt);
ackMessage(ctx, txt);
}
/**
*@Description: 确认消息
*@Author: 杨攀
*@Since: 2019 年 9 月 17 日上午 11:22:27
*@param ctx
*@param message
*/
public void ackMessage(ChannelHandlerContext ctx, String message) {
// 自定义分隔符
String msg = message+NettyServer.DELIMITER;
ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
// 回应客户端
ctx.writeAndFlush(byteBuf);
}
/**
*@Description: 每次来一个新连接就对连接数加一
*@Author: 杨攀
*@Since: 2019 年 9 月 16 日下午 3:04:42
*@param ctx
*@throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {nConnection.incrementAndGet();
LOGGER.info("请求连接...{},当前连接数::{}", ctx.channel().id(),nConnection.get());
}
/**
*@Description: 每次与服务器断开的时候,连接数减一
*@Author: 杨攀
*@Since: 2019 年 9 月 16 日下午 3:06:10
*@param ctx
*@throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {nConnection.decrementAndGet();
LOGGER.info("断开连接... 当前连接数::{}", nConnection.get());
}
/**
*@Description: 连接异常的时候回调
*@Author: 杨攀
*@Since: 2019 年 9 月 16 日下午 3:06:55
*@param ctx
*@param cause
*@throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);
// 打印错误日志
cause.printStackTrace();
Channel channel = ctx.channel();
if(channel.isActive()){ctx.close();
}
}
}
核心代码基本就这些,希望对大家有帮助。