共计 11176 个字符,预计需要花费 28 分钟才能阅读完成。
一、背景
笔者在一次保护根底公共组件的过程中,不小心批改了类的包门路。蹩脚的是,这个类被各业务在 facade 中进行了援用、传递。侥幸的是,同一个类,在提供者和消费者的包门路不统一,没有引起各业务报错。
怀揣着好奇,对于 Dubbo 的编解码做了几次的 Debug 学习,在此分享一些学习教训。
1.1 RPC 的爱与恨
Dubbo 作为 Java 语言的 RPC 框架,劣势之一在于屏蔽了调用细节,可能像调用本地办法一样调用近程服务,不用为数据格式抓耳饶腮。正是这一个性,也引入来了一些问题。
比方引入 facade 包后呈现 jar 包抵触、服务无奈启动,更新 facade 包后某个类找不到等等问题。引入 jar 包,导致生产方和提供方在某种程度上有了肯定耦合。
正是这种耦合,在提供者批改了 Facade 包类的门路后,习惯性认为会引发报错,而实际上并没有。最后认为很奇怪,认真思考后才认为理当这样,调用方在依照约定的格局和协定根底上,即可与提供方实现通信。并不应该关注提供方自身上下文信息。(认为类的门路属于上下文信息)接下来揭秘 Dubbo 的编码解码过程。
二、Dubbo 编解码
Dubbo 默认用的 netty 作为通信框架,所有剖析都是以 netty 作为前提。波及的源码均为 Dubbo – 2.7.x 版本。在理论过程中,一个服务很有可能既是消费者,也是提供者。为了简化梳理流程,假设都是纯正的消费者、提供者。
2.1 In Dubbo
借用 Dubbo 官网文档的一张图,文档内,定义了通信和序列化层,并没有定义 ” 编解码 ” 含意,在此对 ” 编解码 ” 做简略解释。
编解码 = dubbo 外部编解码链路 + 序列化层
本文旨在梳理从 Java 对象到二进制流,以及二进制流到 Java 对象两种数据格式之间的互相转换。在此目标上,为了便于了解,附加通信层内容,以 encode,decode 为入口,梳理 dubbo 解决链路。又因 Dubbo 外部定义为 Encoder,Decoder,故在此定义为 ” 编解码 ”。
无论是序列化层,还是通信层,都是 Dubbo 高效、稳固运行的基石,理解底层实现逻辑,可能帮忙咱们更好的学习和应用 Dubbo 框架。
2.2 入口
消费者口在 NettyClient#doOpen 办法发动连贯,初始化 BootStrap 时,会在 Netty 的 pipeline 里增加不同类型的 ChannelHandler,其中就有编解码器。
同理,提供者在 NettyServer#doOpen 办法提供服务,初始化 ServerBootstrap 时,会增加编解码器。(adapter.getDecoder()- 解码器,adapater.getEncoder()– 编码器)。
NettyClient
/**
* Init bootstrap
*
* @throws Throwable
*/
@Override
protected void doOpen() throws Throwable {bootstrap = new Bootstrap();
// ...
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ...
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
// ...
}
});
}
NettyServer
/**
* Init and start netty server
*
* @throws Throwable
*/
@Override
protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();
// ...
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ...
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// ...
}
2.3 生产端链路
消费者在发送音讯时编码,接管响应时解码。
发送音讯
ChannelInboundHandler
...
NettyCodecAdapter#getEncoder()
->NettyCodecAdapter$InternalEncoder#encode
->DubboCountCodec#encode
->DubboCodec#encode
->ExchangeCodec#encode
->ExchangeCodec#encodeRequest
DubboCountCodec 类理论援用的是 DubboCodec,因 DubboCodec 继承于 ExchangeCodec,并未重写 encode 办法,所以理论代码跳转会间接进入 ExchangeCodec#encode 办法
接管响应
NettyCodecAdapter#getDecoder()
->NettyCodecAdapter$InternalDecoder#decode
->DubboCountCodec#decode
->DubboCodec#decode
->ExchangeCodec#decode
->DubboCodec#decodeBody
...
MultiMessageHandler#received
->HeartbeatHadnler#received
->AllChannelHandler#received
...
ChannelEventRunnable#run
->DecodeHandler#received
->DecodeHandler#decode
->DecodeableRpcResult#decode
解码链路绝对简单,过程中做了两次解码,在一次 DubboCodec#decodeBody 内,并未理论解码 channel 的数据,而是构建成 DecodeableRpcResult 对象,而后在业务解决的 Handler 里通过异步线程进行理论解码。
2.4 提供端链路
提供者在接管音讯时解码,回复响应时编码。
接管音讯
NettyCodecAdapter#getDecoder()
->NettyCodecAdapter$InternalDecoder#decode
->DubboCountCodec#decode
->DubboCodec#decode
->ExchangeCodec#decode
->DubboCodec#decodeBody
...
MultiMessageHandler#received
->HeartbeatHadnler#received
->AllChannelHandler#received
...
ChannelEventRunnable#run
->DecodeHandler#received
->DecodeHandler#decode
->DecodeableRpcInvocation#decode
提供端解码链路与生产端的相似,区别在于理论解码对象不一样,DecodeableRpcResult 替换成 DecodeableRpcInvocation。体现了 Dubbo 代码里的良好设计,形象解决链路,屏蔽解决细节,流程清晰可复用。
回复响应
NettyCodecAdapter#getEncoder()
->NettyCodecAdapter$InternalEncoder#encode
->DubboCountCodec#encode
->DubboCodec#encode
->ExchangeCodec#encode
->ExchangeCodec#encodeResponse
与生产方发送音讯链路统一,区别在于最初一步辨别 Request 和 Response,进行不同内容编码
2.5 Dubbo 协定头
Dubbo 反对多种通信协议,如 dubbo 协定,http,rmi,webservice 等等。默认为 Dubbo 协定。作为通信协议,有肯定的协定格局和约定,而这些信息是业务不关注的。是 Dubbo 框架在编码过程中,进行增加和解析。
dubbo 采纳定长音讯头 + 不定长音讯体进行数据传输。以下是音讯头的格局定义
2byte:magic,相似 java 字节码文件里的魔数,用来标识是否是 dubbo 协定的数据包。
1byte: 音讯标记位,5 位序列化 id,1 位心跳还是失常申请,1 位双向还是单向,1 位申请还是响应;
1byte: 响应状态,具体类型见 com.alibaba.dubbo.remoting.exchange.Response;
8byte: 音讯 ID,每一个申请的惟一辨认 id;
4byte: 音讯体 body 长度。
以生产端发送音讯为例,设置音讯头内容的代码见 ExchangeCodec#encodeRequest。
音讯编码
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {Serialization serialization = getSerialization(channel);
// header.
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) {header[2] |= FLAG_TWOWAY;
}
if (req.isEvent()) {header[2] |= FLAG_EVENT;
}
// set request id.
Bytes.long2bytes(req.getId(), header, 4);
// encode request data.
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {encodeEventData(channel, out, req.getData());
} else {encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {((Cleanable) out).cleanup();}
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
// body length
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
三、Hessian2
前节梳理了编解码的流程,本节认真看一看对象序列化的细节内容。
咱们晓得,Dubbo 反对多种序列化格局,hessian2,json,jdk 序列化等。hessian2 是阿里对于 hessian 进行了批改,也是 dubbo 默认的序列化框架。在此以生产端发送音讯序列化对象,接管响应反序列化为案例,看看 hessian2 的解决细节,同时解答前言问题。
3.1 序列化
前文提到,申请编码方法在 ExchangeCodec#encodeRequest,其中对象数据的序列化为 DubboCodec#encodeRequestData
DubboCodec
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {RpcInvocation inv = (RpcInvocation) data;
out.writeUTF(version);
// https://github.com/apache/dubbo/issues/6138
String serviceName = inv.getAttachment(INTERFACE_KEY);
if (serviceName == null) {serviceName = inv.getAttachment(PATH_KEY);
}
out.writeUTF(serviceName);
out.writeUTF(inv.getAttachment(VERSION_KEY));
out.writeUTF(inv.getMethodName());
out.writeUTF(inv.getParameterTypesDesc());
Object[] args = inv.getArguments();
if (args != null) {for (int i = 0; i < args.length; i++) {out.writeObject(encodeInvocationArgument(channel, inv, i));
}
}
out.writeAttachments(inv.getObjectAttachments());
}
咱们晓得,在 dubbo 调用过程中,是以 Invocation 作为上下文环境存储。这里先写入了版本号,服务名,办法名,办法参数,返回值等信息。随后循环参数列表,对每个参数进行序列化。在此,out 对象即是具体序列化框架对象,默认为 Hessian2ObjectOutput。这个 out 对象作为参数传递进来。
那么是在哪里确认理论序列化对象呢?
从头查看编码的调用链路,ExchangeCodec#encodeRequest 内有如下代码:
ExchangeCodec
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {Serialization serialization = getSerialization(channel);
// ...
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {encodeEventData(channel, out, req.getData());
} else {encodeRequestData(channel, out, req.getData(), req.getVersion());
}
// ...
}
out 对象来自于 serialization 对象,顺着往下看。在 CodecSupport 类有如下代码:
CodecSupport
public static Serialization getSerialization(URL url) {return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
}
能够看到,这里通过 URL 信息,基于 Dubbo 的 SPI 抉择 Serialization 对象,默认为 hessian2。再看看 serialization.serialize(channel.getUrl(),bos) 办法:
Hessian2Serialization
@Override
public ObjectOutput serialize(URL url, OutputStream out) throws IOException {return new Hessian2ObjectOutput(out);
}
至此,找到了理论序列化对象,参数序列化逻辑较为简单,不做赘述,简述如下: 写入申请参数类型 → 写入参数字段名 → 迭代字段列表,字段序列化 。
3.2 反序列化
绝对于序列化而言,反序列化会多一些束缚。序列化对象时,不须要关怀接收者的理论数据格式。反序列化则不然,须要保障原始数据和对象匹配。(这里的原始数据可能是二进制流,也可能是 json)。
生产端解码链路中有提到,产生了两次解码,第一次未理论解码业务数据,而是转换成 DecodeableRpcResult。具体代码如下:
DubboCodec
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response...
try {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
result.decode();} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
} catch (Throwable t) {// ...}
return res;
} else {
// decode request...
return req;
}
}
关键点
1)对于解码申请还是解码响应做了辨别,对于生产端而言,就是解码响应。对于提供端而言,即是解码申请。
2)为什么会呈现两次解码?具体见这行:
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
decode\_in\_io\_thread\_key – 是否在 io 线程内进行解码,默认是 false,防止在 io 线程内解决业务逻辑,这也是合乎 netty 的举荐做法。所以才有了异步的解码过程。
那看看解码业务对象的代码,还记得在哪儿吗?DecodeableRpcResult#decode
DecodeableRpcResult
@Override
public Object decode(Channel channel, InputStream input) throws IOException {ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE:
// ...
case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
handleValue(in);
handleAttachment(in);
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
// ...
default:
throw new IOException("Unknown result flag, expect'0''1' '2' '3' '4' '5', but received: " + flag);
}
// ...
return this;
}
private void handleValue(ObjectInput in) throws IOException {
try {Type[] returnTypes;
if (invocation instanceof RpcInvocation) {returnTypes = ((RpcInvocation) invocation).getReturnTypes();} else {returnTypes = RpcUtils.getReturnTypes(invocation);
}
Object value = null;
if (ArrayUtils.isEmpty(returnTypes)) {
// This almost never happens?
value = in.readObject();} else if (returnTypes.length == 1) {value = in.readObject((Class<?>) returnTypes[0]);
} else {value = in.readObject((Class<?>) returnTypes[0], returnTypes[1]);
}
setValue(value);
} catch (ClassNotFoundException e) {rethrow(e);
}
}
这里呈现了 ObjectInput,那底层的序列化框架抉择逻辑是怎么样的呢?如何放弃与生产端的序列化框架统一?
每一个序列化框架有一个 id 见 org.apache.dubbo.common.serialize.Constants;
1、申请时,序列化框架是依据 Url 信息进行抉择,默认是 hessian2
2、传输时,会将序列化框架标识写入协定头,具体见 ExchangeCodec#encodeRequest#218
3、提供收到生产端的申请时,会依据这个 id 应用对应的序列化框架。
此次理论持有对象为 Hessian2ObjectInput,因为 readObject 反序列化逻辑解决较为简单,流程如下:
四、常见问题
问题 1:提供端批改了 Facade 里的类门路,生产端反序列化为什么没报错?
答:反序列化时,生产端找不到提供端方返回的类门路时,会 catch 异样,以本地的返回类型为准做解决
问题 2:编码序列化时,没有为什么写入返回值?
答:因为在 Java 中,返回值不作为标识办法的信息之一
问题 3:反序列化流程图中,A 与 B 何时会呈现不统一的状况?A 的信息从何处读取?
答:当提供端批改了类门路时,A 与 B 会呈现不一样;A 的信息来源于,发动申请时,Request 对象里存储的 Invocation 上下文,是本地 jar 包里的返回值类型。
问题 4:提供者增删返回字段,消费者会报错吗?
答:不会,反序列化时,取两者字段交加。
问题 5:提供端批改对象的父类信息,生产端会报错吗?
答:不会,传输中只携带了父类的字段信息,没有携带父类类信息。实例化时,以本地类做实例化,不关联提供方理论代码的父类门路。
问题 6:反序列化过程中,如果返回对象子类和父类存在同名字段,且子类有值,父类无值,会产生什么?
答:在 dubbo – 3.0.x 版本,在会呈现返回字段为空的状况。起因在于编码侧迭代传输字段汇合时(生产端可能编码,提供端也可能编码),父类的字段信息在子类前面。解码侧拿到字段汇合迭代解码时,通过字段 key 拿到反序列化器,此时子类和父类同名,那么第一次反射会设置子类值,第二次反射会设置父类值进行笼罩。
在 dubbo – 2.7.x 版本中,该问题已解决。解决方案也比较简单,在编码侧传输时,通过 Collections.reverse(fields) 反转字段程序。
JavaSerializer
public JavaSerializer(Class cl, ClassLoader loader) {introspectWriteReplace(cl, loader);
// ...
List fields = new ArrayList();
fields.addAll(primitiveFields);
fields.addAll(compoundFields);
Collections.reverse(fields);
// ...
}
五、写在最初
编解码过程简单艰涩,数据类型多种多样。笔者遇到和理解的究竟无限,以最常见、最简略的数据类型梳理编解码的流程。如有谬误疏漏之处,还请见谅。
作者:vivo 互联网服务器团队 -Sun wen