乐趣区

netty搭建web聊天室(1)

之前一直在搞前端的东西,都快忘了自己是个 java 开发。其实还有好多 java 方面的东西没搞过,突然了解到 netty,觉得有必要学一学。
介绍
Netty 是由 JBOSS 提供的一个 java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用 Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty 相当于简化和流线化了网络应用的编程开发过程,例如:基于 TCP 和 UDP 的 socket 服务开发。
一些 IO 概念

NIO(non-blocking IO)非阻塞
BIO(blocking IO)阻塞

以上两种又可分为同步和异步,即同步阻塞,同步非阻塞,异步阻塞,异步非阻塞。

阻塞:数据没来,啥都不做,直到数据来了,才进行下一步的处理。
非阻塞:数据没来,进程就不停的去检测数据,直到数据来。

至于这块的详细概念,大家可以自行百度学习。总之,netty 处理 io 很高效,不需要你担心。
netty 结构

可以看出它支持的网络传输协议,以及容器支持,安全支持,io.

工作流程:所有客户端的连接交给住主线程去管理,响应客户端的消息交给从线程去处理,整个线程池由 netty 负责。
搭建服务
创建 maven 工程引入最新的依赖
<project xmlns=”http://maven.apache.org/POM/4.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>
<modelVersion>4.0.0</modelVersion>
<groupId>com.mike</groupId>
<artifactId>netty</artifactId>
<version>0.0.1-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>
</dependencies>
</project>
创建消息处理器
package netty;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
*
*/
public class ChatHandler extends SimpleChannelInboundHandler{

public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

/**
* 每当从服务端收到新的客户端连接时,客户端的 Channel 存入 ChannelGroup 列表中,并通知列表中的其他客户端 Channel
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(“[SERVER] – ” + incoming.remoteAddress() + ” 加入 \n”);
}
channels.add(ctx.channel());
}

/**
* 每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客户端 Channel
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(“[SERVER] – ” + incoming.remoteAddress() + ” 离开 \n”);
}
channels.remove(ctx.channel());
}

/**
* 会话建立时
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {// (5)
Channel incoming = ctx.channel();
System.out.println(“ChatClient:”+incoming.remoteAddress()+” 在线 ”);
}

/**
* 会话结束时
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {// (6)
Channel incoming = ctx.channel();
System.out.println(“ChatClient:”+incoming.remoteAddress()+” 掉线 ”);
}

/**
* 出现异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// (7)
Channel incoming = ctx.channel();
System.out.println(“ChatClient:”+incoming.remoteAddress()+” 异常 ”);
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}

/**
* 读取客户端发送的消息,并将信息转发给其他客户端的 Channel。
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object request) throws Exception {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,HttpResponseStatus.OK , Unpooled.wrappedBuffer(“Hello netty”
.getBytes()));
response.headers().set(“Content-Type”, “text/plain”);
response.headers().set(“Content-Length”, response.content().readableBytes());
response.headers().set(“connection”, HttpHeaderValues.KEEP_ALIVE);
ctx.writeAndFlush(response);

}

}
这里面其实只需要重写 channelRead0 方法就可以了,其他是它的生命周期的方法,可以用来做日至记录。我们在读取消息后,往 channel 里写入了一个 http 的 response。
初始化我们的消息处理器
package netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

/**
* 用来增加多个的处理类到 ChannelPipeline 上,包括编码、解码、SimpleChatServerHandler 等。
*/
public class ChatServerInitializer extends ChannelInitializer<SocketChannel>{

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(“HttpResponseEncoder”,new HttpResponseEncoder());
pipeline.addLast(“HttpRequestDecoder”,new HttpRequestDecoder());
pipeline.addLast(“chathandler”, new ChatHandler());

System.out.println(“ChatClient:”+ch.remoteAddress() +” 连接上 ”);

}

}
这个 pipeline 可以理解为 netty 的拦截器,每个消息进来,经过各个拦截器的处理。我们需要响应 http 消息,所以加入了响应编码以及请求解码,最后加上了我们的自定义处理器。这里面有很多处理器,netty 以及帮你定义好的。
服务启动类
package netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* The class ChatServer
*/
public class ChatServer {
private int port;

public ChatServer(int port) {
this.port = port;
}

public void run() throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChatServerInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

System.out.println(“ChatServer 启动了 ”);

// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync(); // (7)

// 等待服务器 socket 关闭。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();

} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();

System.out.println(“ChatServer 关闭了 ”);
}
}

public static void main(String[] args) throws Exception {
new ChatServer(8090).run();

}
}
这个启动类就是按照上面那个结构图来的,添加两个线程组,设置 channel,添加消息处理器,配置一些选项 option。
测试
启动程序,浏览器访问 http://localhost:8090
可以在浏览器看到我们返回的消息,但是控制台却显示连接了多个客户端,其实是因为浏览器发送了无关的请求道服务端,由于我们没有做路由,所以所有请求都是 200。
可以看到,发送了两次请求。现在我们换 postman 测试。
这次只有一个客户端连接,当我们关闭 postman: 客户端显示掉线,整个会话过程结束。
总结
我们完成了服务端的简单搭建,模拟了聊天会话场景。后面再接着完善。
别忘了关注我 mike 啥都想搞
还有其他后端技术分享在我的公众号。

退出移动版