共计 9767 个字符,预计需要花费 25 分钟才能阅读完成。
1. 协定的作用
TCP/IP 中音讯传输基于流的形式,没有边界
协定的目标就是划定音讯的边界,制订通信单方要独特恪守的通信规定
2. Redis 协定
如果咱们要向 Redis 服务器发送一条 set name Nyima 的指令,须要恪守如下协定
// 该指令一共有 3 局部,每条指令之后都要增加回车与换行符
*3\r\n
// 第一个指令的长度是 3
$3\r\n
// 第一个指令是 set 指令
set\r\n
// 上面的指令以此类推
$4\r\n
name\r\n
$5\r\n
Nyima\r\n
客户端代码如下
public class RedisClient {static final Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();
try {ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 打印日志
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 回车与换行符
final byte[] LINE = {'\r','\n'};
// 取得 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 连贯建设后,向 Redis 中发送一条指令,留神增加回车与换行
// set name Nyima
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$5".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("Nyima".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
});
}
})
.connect(new InetSocketAddress("localhost", 6379));
channelFuture.sync();
// 敞开 channel
channelFuture.channel().close().sync();} catch (InterruptedException e) {e.printStackTrace();
} finally {
// 敞开 group
group.shutdownGracefully();}
}
}
控制台打印后果
1600 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x28c994f1, L:/127.0.0.1:60792 - R:localhost/127.0.0.1:6379] WRITE: 34B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3..$3..set..$4.|
|00000010| 0a 6e 61 6d 65 0d 0a 24 35 0d 0a 4e 79 69 6d 61 |.name..$5..Nyima|
|00000020| 0d 0a |.. |
+--------+-------------------------------------------------+----------------+
Redis 中查问执行后果
3. HTTP 协定
HTTP 协定在申请行申请头中都有很多的内容,本人实现较为艰难,能够应用 HttpServerCodec 作为 服务器端的解码器与编码器,来解决 HTTP 申请
// HttpServerCodec 中既有申请的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder
// Codec(CodeCombine) 个别代表该类既作为 编码器 又作为 解码器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec
服务器代码
public class HttpServer {static final Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();
new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 作为服务器,应用 HttpServerCodec 作为编码器与解码器
ch.pipeline().addLast(new HttpServerCodec());
// 服务器只解决 HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
// 取得申请 uri
log.debug(msg.uri());
// 取得残缺响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,防止浏览器始终接管响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);
// 写回响应
ctx.writeAndFlush(response);
}
});
}
})
.bind(8080);
}
}
服务器负责解决申请并响应浏览器。所以 只须要解决 HTTP 申请 即可
// 服务器只解决 HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()
取得申请后,须要返回响应给浏览器。须要创立响应对象 DefaultFullHttpResponse,设置 HTTP 版本号及状态码,为防止浏览器取得响应后,因为取得 CONTENT_LENGTH 而始终空转,须要增加 CONTENT_LENGTH 字段,表明响应体中数据的具体长度
// 取得残缺响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,防止浏览器始终接管响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);
运行后果
浏览器
控制台
// 申请内容
1714 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] READ: 688B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 |.Connection: kee|
|00000040| 70 2d 61 6c 69 76 65 0d 0a 50 72 61 67 6d 61 3a |p-alive..Pragma:|
....
// 响应内容
1716 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] WRITE: 61B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 43 6f 6e 74 65 6e 74 2d 4c 65 6e 67 74 68 3a |.Content-Length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e |, World!</h1> |
+--------+-------------------------------------------------+----------------+
4. 自定义协定
组成因素
- 魔数:用来在第一工夫断定接管的数据是否为有效数据包
- 版本号:能够反对协定的降级
-
序列化算法
:音讯注释到底采纳哪种序列化反序列化形式
- 如:json、protobuf、hessian、jdk
- 指令类型:是登录、注册、单聊、群聊… 跟业务相干
- 申请序号:为了双工通信,提供异步能力
- 注释长度
- 音讯注释
编码器与解码器
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 设置魔数 4 个字节
out.writeBytes(new byte[]{'N','Y','I','M'});
// 设置版本号 1 个字节
out.writeByte(1);
// 设置序列化形式 1 个字节
out.writeByte(1);
// 设置指令类型 1 个字节
out.writeByte(msg.getMessageType());
// 设置申请序号 4 个字节
out.writeInt(msg.getSequenceId());
// 为了补齐为 16 个字节,填充 1 个字节的数据
out.writeByte(0xff);
// 取得序列化后的 msg
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 取得并设置注释长度 长度用 4 个字节标识
out.writeInt(bytes.length);
// 设置音讯注释
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 获取魔数
int magic = in.readInt();
// 获取版本号
byte version = in.readByte();
// 取得序列化形式
byte seqType = in.readByte();
// 取得指令类型
byte messageType = in.readByte();
// 取得申请序号
int sequenceId = in.readInt();
// 移除补齐字节
in.readByte();
// 取得注释长度
int length = in.readInt();
// 取得注释
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
// 将信息放入 List 中,传递给下一个 handler
out.add(message);
// 打印取得的信息注释
System.out.println("=========== 魔数 ===========");
System.out.println(magic);
System.out.println("=========== 版本号 ===========");
System.out.println(version);
System.out.println("=========== 序列化办法 ===========");
System.out.println(seqType);
System.out.println("=========== 指令类型 ===========");
System.out.println(messageType);
System.out.println("=========== 申请序号 ===========");
System.out.println(sequenceId);
System.out.println("=========== 注释长度 ===========");
System.out.println(length);
System.out.println("=========== 注释 ===========");
System.out.println(message);
}
}
- 编码器与解码器办法源于 父类 ByteToMessageCodec,通过该类能够自定义编码器与解码器,泛型类型为被编码与被解码的类。此处应用了自定义类 Message,代表音讯
public class MessageCodec extends ByteToMessageCodec<Message>
- 编码器 负责将附加信息与注释信息写入到 ByteBuf 中 ,其中附加信息 总字节数最好为 2n,有余须要补齐。注释内容如果为对象,须要通过序列化将其放入到 ByteBuf 中
- 解码器 负责将 ByteBuf 中的信息取出,并放入 List 中,该 List 用于将信息传递给下一个 handler
编写测试类
public class TestCodec {static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel();
// 增加解码器,防止粘包半包问题
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
channel.pipeline().addLast(new MessageCodec());
LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");
// 测试编码与解码
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, user, byteBuf);
channel.writeInbound(byteBuf);
}
}
- 测试类中用到了 LengthFieldBasedFrameDecoder,防止粘包半包问题
- 通过 MessageCodec 的 encode 办法将附加信息与注释写入到 ByteBuf 中,通过 channel 执行入站操作。入站时会调用 decode 办法进行解码
运行后果
@Sharable 注解
为了 进步 handler 的复用率,能够将 handler 创立为 handler 对象,而后在不同的 channel 中应用该 handler 对象进行解决操作
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的 channel 中应用同一个 handler 对象,进步复用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);
然而 并不是所有的 handler 都能通过这种办法来进步复用率的,例如 LengthFieldBasedFrameDecoder。如果多个 channel 中应用同一个 LengthFieldBasedFrameDecoder 对象,则可能产生如下问题
- channel1 中收到了一个半包,LengthFieldBasedFrameDecoder 发现不是一条残缺的数据,则没有持续向下流传
- 此时 channel2 中也收到了一个半包,因 为两个 channel 应用了同一个 LengthFieldBasedFrameDecoder,存入其中的数据刚好拼凑成了一个残缺的数据包 。LengthFieldBasedFrameDecoder 让该数据包持续向下流传, 最终引发谬误
为了进步 handler 的复用率,同时又避免出现一些并发问题,Netty 中原生的 handler 中用 @Sharable 注解来表明,该 handler 是否在多个 channel 中共享。
只有带有该注解,能力通过对象的形式被共享,否则无奈被共享
自定义编解码器是否应用 @Sharable 注解
这须要依据自定义的 handler 的解决逻辑进行剖析
咱们的 MessageCodec 自身接管的是 LengthFieldBasedFrameDecoder 解决之后的数据,那么数据必定是残缺的,按剖析来说是能够增加 @Sharable 注解的
然而理论状况咱们并 不能 增加该注解,会抛出异样信息 ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared
-
因为 MessageCodec 继承自 ByteToMessageCodec,ByteToMessageCodec 类的注解如下
这就意味着 ByteToMessageCodec 不能被多个 channel 所共享的
- 起因:因为该类的指标是:将 ByteBuf 转化为 Message,意味着传进该 handler 的数据还未被解决过 。所以传过来的 ByteBuf 可能并不是残缺的数据,如果共享则会呈现问题
如果想要共享,须要怎么办呢?
继承 MessageToMessageDecoder 即可。该类的指标是:将曾经被解决的残缺数据再次被解决 。传过来的 Message 如果是被解决过的残缺数据,那么被共享也就不会呈现问题了,也就能够应用 @Sharable 注解了。实现形式与 ByteToMessageCodec 相似
@ChannelHandler.Sharable
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {...}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {...}
}
如果本文对您有帮忙,欢送
关注
和点赞
`,您的反对是我保持创作的能源。转载请注明出处!