乐趣区

一个群聊的netty 例子

翻了翻《Netty 实战》,看了些概念,还是觉得似是而非,于是写了些代码,通道啥的都建得挺好。但是 channelRead0 一直收不到消息。后来东摸西摸,发现我 client 创建 channel 后马上发消息是不对的,此时 channel 还没有完全建好,导致服务端收不到消息。改成通道 active 后,server 给 client 发送消息,client 收到消息后才开始向 server 发消息,此时 sever 是能收到的。
实现 Server handler
@Slf4j
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 处理 handler 添加,全局保存 channel
log.info(“add a handler”);
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(“[SERVER] – ” + incoming.remoteAddress() + ” 加入 \n”);
}
channels.add(ctx.channel());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 处理 handler 删除,全局删除 channel
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(“[SERVER] – ” + incoming.remoteAddress() + ” 离开 \n”);
}
channels.remove(ctx.channel());
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel incoming = ctx.channel();

log.info(“ 收到消息:” + msg);
for (Channel channel : channels) {
if (channel != incoming) {
// 群发给其他用户
channel.writeAndFlush(“[” + incoming.remoteAddress() + “]” + msg + “\n”);
} else {
// 回应当前用户
channel.writeAndFlush(“[ 响应]” + msg + “\n”);
}
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 通道激活,通知 client,可以准备通信了
Channel incoming = ctx.channel();
log.info(“ChatClient:” + incoming.remoteAddress() + “ 在线 ”);
incoming.writeAndFlush(“welcome” + “\n”);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 通道失效,client 掉线
Channel incoming = ctx.channel();
log.info(“ChatClient:” + incoming.remoteAddress() + “ 掉线 ”);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
Channel incoming = ctx.channel();

// 当出现异常就关闭连接
log.info(“ChatClient:” + incoming.remoteAddress() + “ 异常 ”);
cause.printStackTrace();
ctx.close();
}
}
服务端通过引导绑定 IP
ServerBootstrap serverBootstrap = new ServerBootstrap();

NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(“framer”, new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(“decoder”, new StringDecoder());
pipeline.addLast(“encoder”, new StringEncoder());
pipeline.addLast(“handler”, new ChatServerHandler());
log.info(“ChatClient:” + ch.remoteAddress() + “ 连接上 ”);
}
}
).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.bind(18169);
实现 client handler
@Slf4j
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info(msg);
Channel ch = ctx.channel();

// 收到 welcome 后,给 server 发送消息
if(msg.startsWith(“welcome”)){
ch.writeAndFlush(“hello world!” + “\r\n”);
ch.writeAndFlush(“wish you happy!” + “\r\n”);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
cause.printStackTrace();
// 出现异常时关闭连接
ctx.close();
}
}
通过引导创建 client channel
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();

bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline();
// 解决粘包问题
pipeline.addLast(“framer”, new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(“decoder”, new StringDecoder());
pipeline.addLast(“encoder”, new StringEncoder());
pipeline.addLast(“handler”, new ChatClientHandler());
// log.info(“ChatServer:” + ch.remoteAddress() + “ 连接上 ”);
}
}
).option(ChannelOption.SO_KEEPALIVE, true);

Channel channel = bootstrap.connect(“127.0.0.1”, 18169).channel();

}

退出移动版