Netty实现同步请求响应的同步通信机制

111次阅读

共计 18275 个字符,预计需要花费 46 分钟才能阅读完成。

需求

实现基于 Netty 的“请求 - 响应”同步通信机制。

设计思路

Netty 提供了异步 IO 和同步 IO 的统一实现,但是我们的需求其实和 IO 的同步异步并无关系。我们的关键是要实现请求 - 响应这种典型的一问一答交互方式。要实现这个需求,需要解决两个问题:

请求和响应的正确匹配。

客户端发送数据后,服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢,(即一个请求对应一个自己的响应)?
解决思路:通过客户端唯一的 RequestId,服务端返回的响应中需要包含该 RequestId,这样客户端就可以通过 RequestId 来正确匹配请求响应。

请求线程和响应线程的通信。

请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty 客户端在接受到响应之后,怎么通知请求线程结果。
解决思路:客户端线程在发送请求后,进入等待,服务器返回响应后,根据 RequestId 来唤醒客户端的请求线程,并把结果返回给请求线程。

解决方案

利用 Java 中的 CountDownLatch 类来实现同步 Future。
具体过程是:客户端发送请求后将 < 请求 ID,Future> 的键值对保存到一个缓存中,这时候用 Future 等待结果,挂住请求线程;当 Netty 客户端收到服务端的响应后,响应线程根据请求 ID 从缓存中取出 Future,然后设置响应结果到 Future 中。这个时候利用 CountDownLatch 的通知机制,通知请求线程。请求线程从 Future 中拿到响应结果,然后做业务处理。
缓存使用 google 的 guava

具体代码

首先引入依赖

        <!-- guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>28.0-jre</version>
        </dependency>

        <!-- Netty -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.39.Final</version>
        </dependency>

SyncFuture

SyncFuture:同步的 Future。这个是核心,通过这个工具类来实现线程等待。

package com.topinfo.ci.netty.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SyncFuture<T> implements Future<T> {

    // 因为请求和响应是一一对应的,因此初始化 CountDownLatch 值为 1。private CountDownLatch latch = new CountDownLatch(1);
    
    // 需要响应线程设置的响应结果
    private T response;
    
    // Futrue 的请求时间,用于计算 Future 是否超时
    private long beginTime = System.currentTimeMillis();

    public SyncFuture() {}

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {return false;}

    @Override
    public boolean isCancelled() {return false;}

    @Override
    public boolean isDone() {if (response != null) {return true;}
        return false;
    }

    // 获取响应结果,直到有结果才返回。@Override
    public T get() throws InterruptedException {latch.await();
        return this.response;
    }

    // 获取响应结果,直到有结果或者超过指定时间就返回。@Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException {if (latch.await(timeout, unit)) {return this.response;}
        return null;
    }

    // 用于设置响应结果,并且做 countDown 操作,通知请求线程
    public void setResponse(T response) {
        this.response = response;
        latch.countDown();}

    public long getBeginTime() {return beginTime;}
}

客户端代码

NettyClient

NettyClient:有消息同步的和异步的方法,具体内容如下:

package com.topinfo.ci.netty.client;

import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;


/**
 *@Description: Netty 客户端
 *@Author: 杨攀
 *@Since:2019 年 9 月 26 日下午 8:54:59  
 */
@Component
public class NettyClient {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);
    
    private EventLoopGroup group = new NioEventLoopGroup();

    /** 
     *@Fields DELIMITER : 自定义分隔符,服务端和客户端要保持一致
     */ 
    public static final String DELIMITER = "@@";
    
    /**
     * @Fields hostIp : 服务端 ip
     */
    private String hostIp = "192.168.90.96";

    /**
     * @Fields port : 服务端端口
     */
    private int port= 8888;

    /**
     * @Fields socketChannel : 通道
     */
    private SocketChannel socketChannel;

    
    /** 
     *@Fields clientHandlerInitilizer : 初始化
     */ 
    @Autowired
    private NettyClientHandlerInitilizer clientHandlerInitilizer;
    
    /**
     * @Description: 启动客户端
     * @Author: 杨攀
     * @Since: 2019 年 9 月 12 日下午 4:43:21
     */
    @SuppressWarnings("unchecked")
    @PostConstruct
    public void start() {Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
            // 指定 Channel
            .channel(NioSocketChannel.class)
            // 服务端地址
            .remoteAddress(hostIp, port)
            
            // 将小的数据包包装成更大的帧进行传送,提高网络的负载, 即 TCP 延迟传输
            .option(ChannelOption.SO_KEEPALIVE, true)
            // 将小的数据包包装成更大的帧进行传送,提高网络的负载, 即 TCP 延迟传输
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(clientHandlerInitilizer);
        
        // 连接
        ChannelFuture channelFuture = bootstrap.connect();
         // 客户端断线重连逻辑
        channelFuture.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {if(future.isSuccess()) {LOGGER.info("连接 Netty 服务端成功...");
                }else {LOGGER.info("连接 Netty 服务端失败,进行断线重连...");
                    final EventLoop loop =future.channel().eventLoop();
                    loop.schedule(new Runnable() {
                        @Override
                        public void run() {LOGGER.info("连接正在重试...");
                            start();}
                    }, 20, TimeUnit.SECONDS);
                }
            }

             
        });
        
        socketChannel = (SocketChannel) channelFuture.channel();}

    
    /**
     *@Description: 消息发送
     *@Author: 杨攀
     *@Since: 2019 年 9 月 12 日下午 5:08:47
     *@param message
     */
    public void sendMsg(String  message) {String msg = message.concat(NettyClient.DELIMITER);

        ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
        ChannelFuture future = socketChannel.writeAndFlush(byteBuf);

        future.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {if(future.isSuccess()) {System.out.println("=========== 发送成功");
                }else {System.out.println("------------------ 发送失败");
                }
            }
        });
    }

    

    /**
     *@Description: 发送同步消息
     *@Author: 杨攀
     *@Since: 2019 年 9 月 12 日下午 5:08:47
     *@param message
     */
    public String sendSyncMsg(String  message, SyncFuture<String> syncFuture) {
        
        String result = "";
        
        String msg = message.concat(NettyClient.DELIMITER);

        ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
        
        try {ChannelFuture future = socketChannel.writeAndFlush(byteBuf);
            future.addListener(new ChannelFutureListener() {

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {if(future.isSuccess()) {System.out.println("=========== 发送成功");
                    }else {System.out.println("------------------ 发送失败");
                    }
                }
            });
            
            // 等待 8 秒
            result = syncFuture.get(8, TimeUnit.SECONDS);
            
        } catch (InterruptedException e) {e.printStackTrace();
        }
        
        return result;
    }

    public String getHostIp() {return hostIp;}


    public void setHostIp(String hostIp) {this.hostIp = hostIp;}


    public int getPort() {return port;}


    public void setPort(int port) {this.port = port;}
    
    
}

NettyClientHandlerInitilizer

NettyClientHandlerInitilizer:初始化

package com.topinfo.ci.netty.client;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

@Component
public class NettyClientHandlerInitilizer extends ChannelInitializer<Channel> {

    /** 
     *@Fields clientHandler : 客户端处理 
     */ 
    @Autowired
    private NettyClientHandler clientHandler;
    
    @Override
    protected void initChannel(Channel ch) throws Exception {
        
        // 通过 socketChannel 去获得对应的管道
                ChannelPipeline channelPipeline = ch.pipeline();
                
                /*
                 * channelPipeline 中会有很多 handler 类(也称之拦截器类)* 获得 pipeline 之后,可以直接.addLast 添加 handler
                 */
                ByteBuf buf = Unpooled.copiedBuffer(NettyClient.DELIMITER.getBytes());
                channelPipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024*1024*2, buf));
                //channelPipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
                //channelPipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
                channelPipeline.addLast(clientHandler);
        
    }

}

NettyClientHandler

NettyClientHandler:客户端处理类,实现了接收

package com.topinfo.ci.netty.client;

import java.util.concurrent.TimeUnit;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.topinfo.ci.netty.service.NettyClientService;
import com.topinfo.ci.netty.utils.ExceptionUtil;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;

@Component
@ChannelHandler.Sharable // 标注一个 channel handler 可以被多个 channel 安全地共享
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientHandler.class);

    @Autowired
    private NettyClientService service;

    @Autowired
    private NettyClient nettyClient;

    /**
     * @Description: 服务端发生消息给客户端,会触发该方法进行接收消息
     * @Author: 杨攀
     * @Since: 2019 年 9 月 12 日下午 5:03:31
     * @param ctx
     * @param byteBuf
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {String msg = byteBuf.toString(CharsetUtil.UTF_8);

        LOGGER.info("客户端收到消息:{}", msg);

        //service.ackMsg(msg);
        service.ackSyncMsg(msg); // 同步消息返回
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {LOGGER.info("请求连接成功...");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {LOGGER.info("连接被断开...");

        // 使用过程中断线重连
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(new Runnable() {

            @Override
            public void run() {
                // 重连
                nettyClient.start();}
        }, 20, TimeUnit.SECONDS);
        super.channelInactive(ctx);
    }

    /**
     * 处理异常, 一般将实现异常处理逻辑的 Handler 放在 ChannelPipeline 的最后
     * 这样确保所有入站消息都总是被处理,无论它们发生在什么位置,下面只是简单的关闭 Channel 并打印异常信息
     * 
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();

        // 输出到日志中
        ExceptionUtil.getStackTrace(cause);

        Channel channel = ctx.channel();
        if (channel.isActive()) {ctx.close();
        }
    }

}

NettyClientServiceImpl

NettyClientServiceImpl:客户端封装实现类,它接口就不贴出来了。

package com.topinfo.ci.netty.service.impl;

import com.topinfo.ci.netty.bean.RealDataInfo;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.topinfo.ci.netty.bean.Message;
import com.topinfo.ci.netty.client.NettyClient;
import com.topinfo.ci.netty.client.SyncFuture;
import com.topinfo.ci.netty.service.NettyClientService;
import com.topinfo.ci.netty.utils.AESUtil;

@Service
public class NettyClientServiceImpl implements NettyClientService {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientServiceImpl.class);
    

    // 缓存接口这里是 LoadingCache,LoadingCache 在缓存项不存在时可以自动加载缓存
    private static LoadingCache<String, SyncFuture> futureCache = CacheBuilder.newBuilder()
            // 设置缓存容器的初始容量为 10
            .initialCapacity(100)
            // maximumSize 设置缓存大小
            .maximumSize(10000)
            // 设置并发级别为 20,并发级别是指可以同时写缓存的线程数
            .concurrencyLevel(20)
            // expireAfterWrite 设置写缓存后 8 秒钟过期
            .expireAfterWrite(8, TimeUnit.SECONDS)
            // 设置缓存的移除通知
            .removalListener(new RemovalListener<Object, Object>() {
                @Override
                public void onRemoval(RemovalNotification<Object, Object> notification) {LOGGER.debug("LoadingCache: {} was removed, cause is {}",notification.getKey(), notification.getCause());
                }
            })
            //build 方法中可以指定 CacheLoader,在缓存不存在时通过 CacheLoader 的实现自动加载缓存
            .build(new CacheLoader<String, SyncFuture>() {
                @Override
                public SyncFuture load(String key) throws Exception {
                    // 当获取 key 的缓存不存在时,不需要自动添加
                    return null;
                }
            });
    
    
    @Autowired
    private NettyClient nettyClient;
    
    @Autowired
    private CacheManager cacheManager;
 
    @Override
    public boolean sendMsg(String text, String dataId, String serviceId) {LOGGER.info("发送的内容:{}", text);

        //TODO        
        //nettyClient.sendMsg(json);
        return true;
    }
 

    @Override
    public String sendSyncMsg(String text, String dataId, String serviceId) {SyncFuture<String> syncFuture = new SyncFuture<String>();
        // 放入缓存中
        futureCache.put(dataId, syncFuture);
        
        // 封装数据
        JSONObject object = new JSONObject();
        object.put("dataId", dataId);
        object.put("text", text);
        
        // 发送同步消息
        String result = nettyClient.sendSyncMsg(object.toJSONString(), syncFuture);
        
        return result;
    }

    @Override
    public void ackSyncMsg(String msg) {LOGGER.info("ACK 确认信息: {}",msg);
        
        JSONObject object =JSON.parseObject(msg);
        String dataId = object.getString("dataId");
        
        // 从缓存中获取数据
        SyncFuture<String> syncFuture = futureCache.getIfPresent(dataId);
        
        // 如果不为 null, 则通知返回
        if(syncFuture != null) {syncFuture.setResponse(msg);
        }
    }

}

TestController

TestController:测试 TestController。

package com.topinfo.ci.netty.controller;

import com.alibaba.fastjson.JSON;
import com.topinfo.ci.netty.bean.CmwSensoralert;
import com.topinfo.ci.netty.bean.Equip;
import com.topinfo.ci.netty.bean.JsonResult;
import com.topinfo.ci.netty.bean.RealDataInfo;
import com.topinfo.ci.netty.mapper.SensorAlertMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.topinfo.ci.netty.service.NettyClientService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    private NettyClientService clientService;
    @Autowired
    private SensorAlertMapper sensorAlertMapper;

    @RequestMapping("/sendSyncMsg")
    public String sendSyncMsg(String dataId, String text) {
        
        String serviceId = "mmmm";
        
        String result = clientService.sendSyncMsg(text, dataId, serviceId);
        
        return "result:"+result ;
    }
     
}

测试,完美实现了“请求 - 响应”的效果。

服务端代码

NettyServer

package com.topinfo.ju.ccon.netty.server;

import java.net.InetSocketAddress;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

@Component
public class NettyServer {private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
    
    /** 
     *@Fields DELIMITER : 自定义分隔符,服务端和客户端要保持一致
     */ 
    public static final String DELIMITER = "@@";
    
    /**
     * @Fields boss : boss 线程组用于处理连接工作, 默认是系统 CPU 个数的两倍,也可以根据实际情况指定
     */
    private EventLoopGroup boss = new NioEventLoopGroup();

    /**
     * @Fields work : work 线程组用于数据处理, 默认是系统 CPU 个数的两倍,也可以根据实际情况指定
     */
    private EventLoopGroup work = new NioEventLoopGroup();

    /**
     * @Fields port : 监听端口
     */
    private Integer port = 8888;
    
    @Autowired
    private NettyServerHandlerInitializer handlerInitializer;

    /**
     * @throws InterruptedException 
     * @Description: 启动 Netty Server
     * @Author: 杨攀
     * @Since: 2019 年 9 月 12 日下午 4:21:35
     */
    @PostConstruct
    public void start() throws InterruptedException {ServerBootstrap bootstrap = new ServerBootstrap();

        bootstrap.group(boss, work)
                // 指定 Channel
                .channel(NioServerSocketChannel.class)
                // 使用指定的端口设置套接字地址
                .localAddress(new InetSocketAddress(port))

                // 服务端可连接队列数, 对应 TCP/IP 协议 listen 函数中 backlog 参数
                .option(ChannelOption.SO_BACKLOG, 1024)

                // 设置 TCP 长连接, 一般如果两个小时内没有数据的通信时,TCP 会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                // 将小的数据包包装成更大的帧进行传送,提高网络的负载, 即 TCP 延迟传输
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(handlerInitializer);

        ChannelFuture future = bootstrap.bind().sync();
        
        if (future.isSuccess()) {LOGGER.info("启动 Netty Server...");
        }
    }
    
    @PreDestroy
    public void destory() throws InterruptedException {boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
        LOGGER.info("关闭 Netty...");
    }
}

NettyServerHandlerInitializer

package com.topinfo.ju.ccon.netty.server;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {

    /** 
     *@Fields serverHandler : 服务处理
     */ 
    @Autowired
    private NettyServerHandler serverHandler;
    
    
    @Override
    protected void initChannel(Channel ch) throws Exception {

        // 通过 socketChannel 去获得对应的管道
        ChannelPipeline channelPipeline = ch.pipeline();
        
        /*
         * channelPipeline 中会有很多 handler 类(也称之拦截器类)* 获得 pipeline 之后,可以直接.addLast 添加 handler
         */
        ByteBuf buf = Unpooled.copiedBuffer(NettyServer.DELIMITER.getBytes());
        channelPipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024*1024*2, buf));
        //channelPipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
        //channelPipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
        
        // 自定义解码器,粘包 / 拆包 / 断包
        
        channelPipeline.addLast(serverHandler);
    }

}

NettyServerHandler

package com.topinfo.ju.ccon.netty.server;


import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.topinfo.ju.ccon.netty.bean.Message;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

@Component
@ChannelHandler.Sharable // 标注一个 channel handler 可以被多个 channel 安全地共享
public class NettyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);
    
    public static AtomicInteger nConnection = new AtomicInteger(0);
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {String txt = msg.toString(CharsetUtil.UTF_8);
        
        LOGGER.info("收到客户端的消息:{}", txt);
    
        
        
        ackMessage(ctx, txt);
    }

    /**
     *@Description: 确认消息 
     *@Author: 杨攀
     *@Since: 2019 年 9 月 17 日上午 11:22:27
     *@param ctx
     *@param message
     */
    public void ackMessage(ChannelHandlerContext ctx, String message) {
        
        // 自定义分隔符
        String msg = message+NettyServer.DELIMITER;
        
        ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
        
        // 回应客户端
        ctx.writeAndFlush(byteBuf);
    }
    
    

    /**
     *@Description: 每次来一个新连接就对连接数加一
     *@Author: 杨攀
     *@Since: 2019 年 9 月 16 日下午 3:04:42
     *@param ctx
     *@throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {nConnection.incrementAndGet();
        
        LOGGER.info("请求连接...{},当前连接数::{}",  ctx.channel().id(),nConnection.get());
    }
    
    /**
     *@Description: 每次与服务器断开的时候,连接数减一
     *@Author: 杨攀
     *@Since: 2019 年 9 月 16 日下午 3:06:10
     *@param ctx
     *@throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {nConnection.decrementAndGet();
        LOGGER.info("断开连接... 当前连接数::{}",  nConnection.get());
    }
    
    
    /**
     *@Description: 连接异常的时候回调 
     *@Author: 杨攀
     *@Since: 2019 年 9 月 16 日下午 3:06:55
     *@param ctx
     *@param cause
     *@throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);
        
        // 打印错误日志
        cause.printStackTrace();
        
        Channel channel = ctx.channel();
        
        if(channel.isActive()){ctx.close();
        }
        
    }
    
}

核心代码基本就这些,希望对大家有帮助。

正文完
 0