乐趣区

netty 基于 protobuf 协议 实现 websocket 版本的简易客服系统

结构

netty 作为服务端
protobuf 作为序列化数据的协议
websocket 前端通讯

演示

netty 服务端实现
Server.java 启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

//websocket 长连接示例
public class Server {
public static void main(String[] args) throws Exception{

// 主线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 从线程组
EventLoopGroup wokerGroup = new NioEventLoopGroup();

try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,wokerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerChannelInitializer());

ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}

}
}
ServerChannelInitializer.java
import com.example.nettydemo.protobuf.MessageData;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.util.List;

import static io.netty.buffer.Unpooled.wrappedBuffer;

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// HTTP 请求的解码和编码
pipeline.addLast(new HttpServerCodec());
// 把多个消息转换为一个单一的 FullHttpRequest 或是 FullHttpResponse,
// 原因是 HTTP 解码器会在每个 HTTP 消息中生成多个消息对象 HttpRequest/HttpResponse,HttpContent,LastHttpContent
pipeline.addLast(new HttpObjectAggregator(65536));
// 主要用于处理大数据流,比如一个 1G 大小的文件如果你直接传输肯定会撑暴 jvm 内存的; 增加之后就不用考虑这个问题了
pipeline.addLast(new ChunkedWriteHandler());
// WebSocket 数据压缩
pipeline.addLast(new WebSocketServerCompressionHandler());
// 协议包长度限制
pipeline.addLast(new WebSocketServerProtocolHandler(“/ws”, null, true));
// 协议包解码
pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
ByteBuf buf = ((BinaryWebSocketFrame) frame).content();
objs.add(buf);
buf.retain();
}
});
// 协议包编码
pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
@Override
protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
ByteBuf result = null;
if (msg instanceof MessageLite) {
result = wrappedBuffer(((MessageLite) msg).toByteArray());
}
if (msg instanceof MessageLite.Builder) {
result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
}

// ==== 上面代码片段是拷贝自 TCP ProtobufEncoder 源码 ====
// 然后下面再转成 websocket 二进制流,因为客户端不能直接解析 protobuf 编码生成的

WebSocketFrame frame = new BinaryWebSocketFrame(result);
out.add(frame);
}
});

// 协议包解码时指定 Protobuf 字节数实例化为 CommonProtocol 类型
pipeline.addLast(new ProtobufDecoder(MessageData.RequestUser.getDefaultInstance()));

// websocket 定义了传递数据的 6 中 frame 类型
pipeline.addLast(new ServerFrameHandler());

}
}
ServerFrameHandler.java
import com.example.nettydemo.protobuf.MessageData;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.List;

// 处理文本协议数据,处理 TextWebSocketFrame 类型的数据,websocket 专门处理文本的 frame 就是 TextWebSocketFrame
public class ServerFrameHandler extends SimpleChannelInboundHandler<MessageData.RequestUser> {

private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

// 读到客户端的内容并且向客户端去写内容
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageData.RequestUser msg) throws Exception {
// channelGroup.add();

Channel channel = ctx.channel();
System.out.println(msg.getUserName());
System.out.println(msg.getAge());
System.out.println(msg.getPassword());
MessageData.ResponseUser bank = MessageData
.ResponseUser.newBuilder()
.setUserName(“ 你好, 请问有什么可以帮助你!”)
.setAge(18).setPassword(“11111”).build();

channel.writeAndFlush(bank);
}

// 每个 channel 都有一个唯一的 id 值
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 打印出 channel 唯一值,asLongText 方法是 channel 的 id 的全名
// System.out.println(“handlerAdded:”+ctx.channel().id().asLongText());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// System.out.println(“handlerRemoved:” + ctx.channel().id().asLongText());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(“ 异常发生 ”);
ctx.close();
}

}
protobuf 文件的使用
proto 文件
syntax =”proto2″;

package com.example.nettydemo.protobuf;

//optimize_for 加快解析的速度
option optimize_for = SPEED;
option java_package = “com.example.nettydemo.protobuf”;
option java_outer_classname=”MessageData”;

// 客户端发送过来的消息实体
message RequestUser{
optional string user_name = 1;
optional int32 age = 2;
optional string password = 3;
}

// 返回给客户端的消息实体
message ResponseUser{
optional string user_name = 1;
optional int32 age = 2;
optional string password = 3;
}
生成 proto 的 Java 类
批量生成工具,直接找到这个 bat 或者 sh 文件,在对应的平台执行就可以了具体可以自行百度 protobuf 怎么使用
Windows 版本
set outPath=../../java
set fileArray=(MessageDataProto ATestProto)

# 将.proto 文件生成 java 类
for %%i in %fileArray% do (
echo generate cli protocol java code: %%i.proto
protoc –java_out=%outPath% ./%%i.proto
)

pause
sh 版本 地址:https://github.com/lmxdawn/ne…
#!/bin/bash

outPath=../../java
fileArray=(MessageDataProto ATestProto)

for i in ${fileArray[@]};
do
echo “generate cli protocol java code: ${i}.proto”
protoc –java_out=$outPath ./$i.proto
done
websocket 实现
<!DOCTYPE html>
<html lang=”en”>
<head>
<meta charset=”UTF-8″>
<title>WebSocket 客户端 </title>
</head>
<body>

<script src=”protobuf.min.js”></script>

<script type=”text/javascript”>
var socket;
// 如果浏览器支持 WebSocket
if (window.WebSocket) {
// 参数就是与服务器连接的地址
socket = new WebSocket(“ws://localhost:8899/ws”);
// 客户端收到服务器消息的时候就会执行这个回调方法
socket.onmessage = function (event) {
var ta = document.getElementById(“responseText”);
// 解码
responseUserDecoder({
data: event.data,
success: function (responseUser) {
var content = “ 客服小姐姐: ” + responseUser.userName +
“, 小姐姐年龄: ” + responseUser.age +
“, 密码: ” + responseUser.password;
ta.value = ta.value + “\n” + content;
},
fail: function (err) {
console.log(err);
},
complete: function () {
console.log(“ 解码全部完成 ”)
}
})
}
// 连接建立的回调函数
socket.onopen = function (event) {
var ta = document.getElementById(“responseText”);
ta.value = “ 连接开启 ”;
}
// 连接断掉的回调函数
socket.onclose = function (event) {
var ta = document.getElementById(“responseText”);
ta.value = ta.value + “\n” + “ 连接关闭 ”;
}
} else {
alert(“ 浏览器不支持 WebSocket!”);
}
// 发送数据
function send(message) {
if (!window.WebSocket) {
return;
}
// socket.binaryType = “arraybuffer”;
// 判断是否开启
if (socket.readyState !== WebSocket.OPEN) {
alert(“ 连接没有开启 ”);
return;
}
var data = {
userName: message,
age: 18,
password: “11111”
};
requestUserEncoder({
data: data,
success: function (buffer) {
console.log(“ 编码成功 ”);
socket.send(buffer);
},
fail: function (err) {
console.log(err);
},
complete: function () {
console.log(“ 编码全部完成 ”)
}
});
}
/**
* 发送的消息编码成 protobuf
*/
function requestUserEncoder(obj) {
var data = obj.data;
var success = obj.success; // 成功的回调
var fail = obj.fail; // 失败的回调
var complete = obj.complete; // 成功或者失败都会回调
protobuf.load(“../proto/MessageDataProto.proto”, function (err, root) {
if (err) {
if (typeof fail === “function”) {
fail(err)
}
if (typeof complete === “function”) {
complete()
}
return;
}
// Obtain a message type
var RequestUser = root.lookupType(“com.example.nettydemo.protobuf.RequestUser”);
// Exemplary payload
var payload = data;
// Verify the payload if necessary (i.e. when possibly incomplete or invalid)
var errMsg = RequestUser.verify(payload);
if (errMsg) {
if (typeof fail === “function”) {
fail(errMsg)
}
if (typeof complete === “function”) {
complete()
}
return;
}
// Create a new message
var message = RequestUser.create(payload); // or use .fromObject if conversion is necessary
// Encode a message to an Uint8Array (browser) or Buffer (node)
var buffer = RequestUser.encode(message).finish();
if (typeof success === “function”) {
success(buffer)
}
if (typeof complete === “function”) {
complete()
}
});
}
/**
* 接收到服务器二进制流的消息进行解码
*/
function responseUserDecoder(obj) {
var data = obj.data;
var success = obj.success; // 成功的回调
var fail = obj.fail; // 失败的回调
var complete = obj.complete; // 成功或者失败都会回调
protobuf.load(“../proto/MessageDataProto.proto”, function (err, root) {
if (err) {
if (typeof fail === “function”) {
fail(err)
}
if (typeof complete === “function”) {
complete()
}
return;
}
// Obtain a message type
var ResponseUser = root.lookupType(“com.example.nettydemo.protobuf.ResponseUser”);
var reader = new FileReader();
reader.readAsArrayBuffer(data);
reader.onload = function (e) {
var buf = new Uint8Array(reader.result);
var responseUser = ResponseUser.decode(buf);
if (typeof success === “function”) {
success(responseUser)
}
if (typeof complete === “function”) {
complete()
}
}
});
}
</script>

<h1> 欢迎访问客服系统 </h1>

<form onsubmit=”return false”>

<textarea name=”message” style=”width: 400px;height: 200px”></textarea>

<input type=”button” value=” 发送数据 ” onclick=”send(this.form.message.value);”>

<h3> 回复消息:</h3>

<textarea id=”responseText” style=”width: 400px;height: 300px;”></textarea>

<input type=”button” onclick=”javascript:document.getElementById(‘responseText’).value=”” value=” 清空数据 ”>
</form>
</body>
</html>
扩展阅读
spring boot 实现的后台管理系统 vue + element-ui 实现的后台管理界面,接入 spring boot API 接口

退出移动版