ESA Stack(Elastic Service Architecture) 是 OPPO 云计算中心孵化的技术品牌,致力于微服务相干技术栈,帮忙用户疾速构建高性能,高可用的云原生微服务。产品蕴含高性能 Web 服务框架、RPC 框架、服务治理框架、注册核心、配置核心、调用链追踪零碎,Service Mesh、Serverless 等各类产品及钻研方向。
以后局部产品曾经对外开源:
开源主站:https://www.esastack.io
Github: https://github.com/esastack
RestClient 我的项目地址:https://github.com/esastack/e…
RestClient 文档地址:https://www.esastack.io/esa-r…
ESA RestClient
ESA RestClient 是一个基于 Netty 的全链路异步事件驱动的高性能轻量级的 HTTP 客户端。
以下简称 RestClient
1、Quick Start
Step1:增加依赖
<dependency>
<groupId>io.esastack</groupId>
<artifactId>restclient</artifactId>
<version>1.0.0</version>
</dependency>
Step2: 构建 RestClient 并发送申请解决响应
final RestClient client = RestClient.ofDefault(); // 疾速创立 RestClient,各项配置均为默认配置。// 如果用户想自定义一些配置,则能够应用 RestClient.create()来进行自定义配置。client.post("http://127.0.0.1:8081/")
.entity("Hello Server") // 设置申请体
.execute() // 执行申请逻辑
.thenAccept((response)-> { // 异步解决响应
try {System.out.println(response.bodyToEntity(String.class)); // 调用 response.bodyToEntity(Class TargetClass)来 Decode 响应,//TargetClass 为冀望的响应类型
} catch (Exception e) {e.printStackTrace();
}
});
2、性能个性
- Http1/H2/H2cUpgrade/Https
- Encode 与 EncodeAdvice
- Decode 与 DecodeAdvice
- RestInterceptor
- 大文件发送
- 申请级别读超时
- 申请级别重试
- 申请级别重定向
- 100-expect-continue
- Multipart
- Metrics
- more …
2.1 Encode 与 EncodeAdvice
2.1.1 Encode
RestClient 会主动依据用户的 Headers 与 Entity 等抉择适合的 Encoder 进行 Encode。其内置了上面这些 Encoder:
-
Json
jackson(默认)
fastjson
gson
- ProtoBuf
- File
- String
- byte[]
除此之外 RestClient 也反对用户自定义 Encoder。
2.1.1.1 应用 Json Encoder
指定 contentType 为 MediaType.APPLICATION_JSON,将主动应用 Json Encoder 来对 Entity 来进行 Encode。示例如下:
final RestClient client = RestCient.ofDefault();
client.post("localhost:8080/path")
.contentType(MediaTpe.APPLICATION_JSON)
.entity(new Person("Bob","male"))
.execute();
2.1.1.2 应用 ProtoBuf Encoder
指定 contentType 为 ProtoBufCodec.PROTO_BUF,且 Entity 类型为 com.google.protobuf.Message 的子类时,将主动应用 ProtoBuf Encoder 来对 Entity 来进行 Encode。示例如下:
final RestClient client = RestClient.ofDefault();
client.post("localhost:8080/path")
.contentType(ProtoBufCodec.PROTO_BUF)
.entity(message)
.execute();
2.1.1.3 应用 File Encoder
当 Entity 类型为 File 时,将主动应用 File Encoder 来对 Entity 来进行 Encode。示例如下:
final RestClient client = RestClient.ofDefault();
client.post("localhost:8080/path")
.entity(new File("tem"))
.execute();
2.1.1.4 自定义 Encoder
当 RestClient 内置的 Encoder 无奈满足用户需要时,用户能够自定义 Encoder,示例如下:
public class StringEncoder implements ByteEncoder {
@Override
public RequestContent<byte[]> doEncode(EncodeContext<byte[]> ctx) {if (MediaType.TEXT_PLAIN.equals(ctx.contentType())) {if (ctx.entity() != null) {return RequestContent.of(((String) ctx.entity()).getBytes(StandardCharsets.UTF_8));
} else {return RequestContent.of("null");
}
}
// 该 Encoder 无奈 Encode 这种类型,将 Encode 工作交给下一个 Encoder
return ctx.next();}
}
用户能够将自定义的 Encoder 间接绑定到申请或者 Client 上,同时也反对用户通过 SPI 的形式加载 Encoder,具体参见文档:《RestClient 配置 Encoder》
2.1.1.5 Encode 执行机会
见申请解决残缺流程中的 Encoder。
2.1.2 EncodeAdvice
用户能够通过 EncodeAdvice 在 Encode 前后插入业务逻辑,来对要 Encode 的 Entity 或者 Encode 后的 RequestContent 进行批改替换等操作。
2.1.2.1 示例
public class EncodeAdviceImpl implements EncodeAdvice {
@Override
public RequestContent<?> aroundEncode(EncodeAdviceContext ctx) throws Exception {
//...before encode
RequestContent<?> requestContent = ctx.next();
//...after encode
return requestContent;
}
}
用户能够将自定义的 EncodeAdvice 间接绑定到 Client 上,同时也反对用户通过 SPI 的形式加载 EncodeAdvice,具体参见文档:《RestClient 配置 EncodeAdvice》
2.1.2.2 执行机会
见申请解决残缺流程中的 EncodeAdvice。
2.2 Decode 与 DecodeAdvice
2.2.1 Decode
RestClient 会主动依据用户的 Headers 与 冀望 Entity 类型 等抉择适合的 Decoder 进行 Decode。RestClient 内置了上面这些 Decoder:
-
Json
jackson(默认)
fastjson
gson
- ProtoBuf
- String
- byte[]
除此之外 RestClient 也反对用户自定义解码器。
2.2.1.1 应用 Json Decoder
当 Response 的 contentType 为 MediaType.APPLICATION_JSON,将主动应用 Json Decoder 来来进行 Decode。
final RestClient client = RestClient.ofDefault();
client.get("localhost:8080/path")
.execute()
.thenAccept((response)-> {
try {// 当 MediaType.APPLICATION_JSON.equals(response.contentType()) 时将主动应用 Json Decoder
System.out.println(response.bodyToEntity(Person.class));
} catch (Exception e) {e.printStackTrace();
}
});
2.2.1.2 应用 ProtoBuf Decoder
当 Response 的 contentType 为 ProtoBufCodec.PROTO_BUF,且 response.bodyToEntity()传入的类型为 com.google.protobuf.Message 的子类时,将主动应用 ProtoBuf Decoder 来进行 Decode。
final RestClient client = RestClient.ofDefault();
client.get("localhost:8080/path")
.execute()
.thenAccept((response)-> {
try {// 当 ProtoBufCodec.PROTO_BUF.equals(response.contentType()),且 Person 为 Message 的子类时,将主动应用 ProtoBuf Decoder
System.out.println(response.bodyToEntity(Person.class));
} catch (Exception e) {e.printStackTrace();
}
});
2.2.1.3 自定义 Decoder
当 RestClient 内置的 Decoder 无奈满足用户需要时,用户能够自定义 Decoder,示例如下:
public class StringDecoder implements ByteDecoder {
@Override
public Object doDecode(DecodeContext<byte[]> ctx) {if (String.class.isAssignableFrom(ctx.targetType())) {return new String(ctx.content().value());
}
return ctx.next();}
}
用户能够将自定义的 Decoder 间接绑定到申请或者 Client 上,同时也反对用户通过 SPI 的形式加载 Decoder,具体参见文档:《RestClient 配置 Decoder》
2.2.1.4 执行机会
见申请解决残缺流程中的 Decoder。
2.2.2 DecodeAdvice
用户能够通过 DecodeAdvice 在 Decode 前后进行来插入业务逻辑,来对要解码的 ResponseContent 或者 Decode 后的对象 进行批改替换等操作。
2.2.2.1 示例
public class DecodeAdviceImpl implements DecodeAdvice {
@Override
public Object aroundDecode(DecodeAdviceContext ctx) throws Exception {
//...before decode
Object decoded = ctx.next();
//...after decode
return decoded;
}
}
用户能够将自定义的 DecodeAdvice 间接绑定到 Client 上,同时也反对用户通过 SPI 的形式加载 DecodeAdvice,具体参见文档:《RestClient 配置 DecodeAdvice》
2.2.2.2 执行机会
见申请解决残缺流程中的 DecodeAdvice。
2.3 RestInterceptor
用户能够应用 RestInterceptor 在申请发送前和响应接管起初插入业务逻辑。RestClient 反对通过 builder 配置和 SPI 加载两种形式配置 RestInterceptor。
2.3.1 Builder 配置
在结构 RestClient 时传入自定义的 RestInterceptor 实例,如:
final RestClient client = RestClient.create()
.addInterceptor((request, next) -> {System.out.println("Interceptor");
return next.proceed(request);
}).build();
2.3.2 SPI
2.3.2.1 一般 SPI
RestClient 反对通过 SPI 的形式加载 RestInterceptor 接口的实现类,应用时只须要依照 SPI 的加载规定将自定义的 RestInterceptor 放入指定的目录下即可。
2.3.2.2 RestInterceptorFactory
如果用户自定义的 RestInterceptor 对于不同 RestClient 的配置有不同的实现,则用户能够实现 RestInterceptorFactory 接口,并依照 SPI 的加载规定将自定义的 RestInterceptorFactory 放入指定的目录下即可。
public interface RestInterceptorFactory {Collection<RestInterceptor> interceptors(RestClientOptions clientOptions);
}
在 RestClient 构建时将调用 RestInterceptorFactory.interceptors(RestClientOptions clientOptions),该办法返回的所有 RestInterceptor 都将退出到构建好的 RestClient 中。
2.3.2.3 执行机会
见申请解决残缺流程中的 RestInterceptor。
2.4 大文件发送
当文件较小时,可通过间接将文件内容写入申请 body 来发送文件。然而当文件内容过大时,间接写入会有 OOM 危险。
为了解决这个问题,RestClient 借助底层的 Netty 应用 NIO 以零拷贝的形式发送文件,防止了 OOM 的同时又缩小了数据的屡次拷贝。
用户只须要简略的接口调用便可应用该性能:
final RestClient client = RestClient.ofDefault();
final String entity = client.post("http://127.0.0.1:8081/")
.entity(new File("bigFile"))
.execute();
2.5 读超时
RestClient 反对申请级别的读超时,同时也反对 Client 级别的读超时。默认读超时为 6000L。
2.5.1 Client 级别读超时
Client 级别的读超时将对该 Client 下的所有申请失效,具体配置形式如下:
final RestClient client = RestClient.create()
.readTimeout(3000L)
.build();
2.5.2 Request 级别读超时
当 Request 设置了读超时,其数据将笼罩 Client 设置的读超时,具体配置形式如下:
final RestClient client = RestClient.ofDefault();
client.get("http://127.0.0.1:8081/")
.readTimeout(3000L)
.execute()
.thenAccept((response)-> {
try {System.out.println(response.bodyToEntity(String.class));
} catch (Exception e) {e.printStackTrace();
}
});
2.6 重试
RestClient 反对申请级别的重试,同时也反对 Client 级别的重试。
默认状况下,RestClient 仅会对所有抛出连贯异样的申请进行重试(避免服务端的服务为非幂等),其中:最大重试次数为 3(不包含原始申请),重试间隔时间为 0。应用时,能够通过自定义 RetryOptions 参数更改重试次数、重试条件、重试间隔时间等。
2.6.1 Client 级别重试
Client 级别的重试将对该 Client 下的所有 Request 失效,应用时,能够通过自定义 RetryOptions 参数更改重试次数、重试条件、重试间隔时间等。具体配置形式如下
final RestClient client = RestClient.create()
.retryOptions(RetryOptions.options()
.maxRetries(3)
// 设置每次重试的间隔时间
.intervalMs(retryCount->(retryCount+1)* 3000L)
// 判断是否要重试
.predicate((request, response, ctx, cause) -> cause != null)
.build())
.connectionPoolSize(2048)
.build();
2.6.2 Request 级别重试
当 Request 设置了重试次数,其数据将笼罩 Client 设置的重试次数,具体配置形式如下:
final RestClient client = RestClient.ofDefault();
client.get("http://127.0.0.1:8081/")
.maxRetries(3)
.execute()
.thenAccept((response)-> {
try {System.out.println(response.bodyToEntity(String.class));
} catch (Exception e) {e.printStackTrace();
}
});
2.7 重定向
默认状况下,RestClient 会对响应状态码为 301,302,303,307,308 的申请重定向,其中:最大重定向次数为 5(不蕴含原始申请)。应用时,能够通过 maxRedirects 更新重定向次数或者禁用(maxRedirects=0)重定向性能。
2.7.1 Client 设置重定向
Client 级别的重定向将对该 Client 下的所有 Request 失效,具体配置形式如下:
final RestClient client = RestClient.create()
.maxRedirects(3)
.build();
2.7.2 Request 设置重定向笼罩 Client 的设置
当 Request 设置了重定向次数,其数据将笼罩 Client 设置的重定向次数,具体配置形式如下:
final RestClient client = RestClient.ofDefault();
client.get("http://127.0.0.1:8081/")
.maxRedirects(3)
.execute()
.thenAccept((response)-> {
try {System.out.println(response.bodyToEntity(String.class));
} catch (Exception e) {e.printStackTrace();
}
});
2.8 其它性能
如果用户想对 RestClient 的性能有进一步理解,能够参考:《RestClient 性能文档》。
3 性能体现
3.1 测试场景
服务端为一个 Echo 服务器,客户端别离应用 RestClient、Apache HttpAsyncClient、OK Httpclient 均应用 POST 申请,申请体内容为固定字符串: OK,响应体内容也为固定字符串:OK。
3.2 机器配置
3.3 JVM 参数
-Xms1024m -Xmx1024m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70
3.4 客户端版本
3.5 测试方法
如何测试异步客户端的性能?这是咱们在性能测试前要面对的第一个问题,上面是咱们对于该问题的一些思考:
是否 for 循环发动同步申请,同时应用多线程来达到框架的申请解决极限,并将该极限视为客户端的最佳 TPS?
一般来说,用户既然抉择异步客户端,必定大部分工夫都会应用异步的形式去发动申请,应用同步形式进行测试的后果并不能代表异步时客户端的性能。因而对于异步客户端的用户而言,同步形式测试的最终后果并没有很大的参考价值。
因而这种形式并不适宜用来对异步客户端进行性能测试。
是否应用单线程 for 循环异步发动申请,间接将这个时候的 TPS 视为客户端的最佳 TPS?
异步客户端异步发动申请时,发动申请的办法返回的十分快 (因为申请执行的过程次要在 IO 线程池中进行)。只管只应用单线程,如果始终 for 循环异步发动申请,申请发动的速度也会比 IO 线程池解决申请的速度快得多,这会导致大量申请在程序中的某个中央(如获取连贯) 沉积,从而导致程序报错,或性能低下。
因而这种形式也并不适宜用来对异步客户端进行性能测试。
那么应该如何测试异步客户端的性能呢?
异步客户端专为异步而生,用户既然抉择异步客户端,必定大部分工夫都会应用异步的形式发动申请,因而对于异步客户端而言,应用异步的形式去测试其性能是一种更加适合的形式。问题的关键在于在测试过程中,如何防止过快地发动异步申请导致发动申请的速度超过框架的解决能力。
次要问题确定了,那么答案根本也就确定了。要防止过快地发动异步申请,咱们能够 想方法调整异步申请发动的速度,对于调整异步申请的发动速度,咱们能够尝试用以下两种形式:
- for 循环周期性地发送肯定次数的异步申请后,sleep 一会儿,而后再持续发动异步申请。咱们能够通过 管制 sleep 的工夫 和 管制多少个申请距离进行 sleep 两个变量来管制异步申请的发动速率。
- for 循环周期性地发送肯定次数的异步申请后,发送一个同步申请,而后再持续发动异步申请。用同步申请去代替 sleep 的工夫,该同步申请执行完恰好阐明了申请队列中的申请都曾经排队完结。其实原理是雷同的,但这样管制的变量更少一些,仅须要管制发动多少个异步申请后发动一次同步申请(即:一个周期内异步申请次数与同步申请次数的比例)。
下面两种办法都能够管制异步申请的发动速率,最终咱们抉择应用第二种办法来管制异步申请的发动速率,因为第二种形式须要管制的变量更少,这样咱们的测试过程也会更加简略。
因而最终咱们的测试方法为:
应用异步与同步交替的形式来发动申请,一直调整一个周期内异步申请与同步申请的比例,在每个比例下调整客户端的各项配置,使其达到最佳的 TPS,记录每个比例下,框架的最佳 TPS,直到找到减少 异步申请与同步申请的比例 时,框架的 TPS 不再回升,甚至降落的拐点,该拐点即为框架的性能极限点。
3.6 测试后果
上图中,横坐标为异步申请与同步申请的比例,纵坐标为 TPS,通过上图咱们能够看出:
- RestClient:随着异步与同步申请比例增大而先增大后减小,异步与同步申请比例为 800 时,TPS 最佳,为 111217.98。
- Apache HttpAsyncClient:随着异步与同步申请比例增大而先增大后减小,异步与同步申请比例为 800 时,TPS 最佳,为 83962.54。
- OK Httpclient:随着异步与同步申请比例增大而先增大后减小,异步与同步申请比例为 300 时,TPS 最佳,为 70501.59。
3.7 论断
RestClient 在下面场景中最佳 TPS 较 Apache HttpAsyncClient 的最佳 TPS 高 32%,较 OK Httpclient 的最佳 TPS 高 57%。
4 架构设计
4.1 设计准则
- 高性能:继续不懈谋求的指标 & 外围竞争力。
- 高扩展性:凋谢扩大点,满足业务多样化的需要。
- 全链路异步:基于 CompletableStage 提供欠缺的异步解决能力。
4.2 结构设计
上图为 RestClient 的结构图,咱们由上到下顺次介绍一下各个局部的含意:
4.2.1RestInterceptorChain
RestInterceptorChain 为 RestInterceptor 的汇合,用户调用申请时,将顺次通过 RestInterceptorChain 中的所有 RestInterceptor。用户能够通过实现 RestInterceptor 中的 getOrder()办法来指定其在 RestInterceptorChain 中的排序。
4.2.2EncodeAdviceChain
EncodeAdviceChain 为 EncodeAdvice 的汇合,在 Encode 前,将顺次通过 EncodeAdviceChain 中的所有 EncodeAdvice。用户能够通过实现 EncodeAdvice 中的 getOrder()办法来指定其在 EncodeAdviceChain 中的排序。
4.2.3 EncoderChain
EncoderChain 为 Encoder 的汇合,在 Encode 时,将顺次通过 EncoderChain 中的所有 Encoder,直到某个 Encoder 间接返回 Encode 的后果(即:其能够 Encode 该申请)。用户能够通过实现 Encoder 中的 getOrder()办法来指定其在 EncoderChain 中的排序。
4.2.4DecodeAdviceChain
DecodeAdviceChain 为 DecodeAdvice 的汇合,在 Decode 前,将顺次通过 DecodeAdviceChain 中的所有 DecodeAdvice。用户能够通过实现 DecodeAdvice 中的 getOrder()办法来指定其在 DecodeAdviceChain 中的排序。
4.2.5DecoderChain
DecoderChain 为 Decoder 的汇合,在 Decode 时,将顺次通过 DecoderChain 中的所有 Decoder,直到某个 Decoder 间接返回 Decode 的后果(即:其能够 Decode 该响应)。用户能够通过实现 Decoder 中的 getOrder()办法来指定其在 DecoderChain 中的排序。
4.2.6NettyTransceiver
NettyTransceiver 是 RestClient 与其底层框架 Neety 之间连贯的桥梁,在介绍其之前,须要一些准备常识,咱们先来简略介绍一下这些准备常识:
4.2.6.1Channel & ChannelPool &ChannelPools
Channel:Channel 是 Netty 网络操作抽象类,它聚合了一组性能,包含但不限于网络的读、写,客户端发动连贯、被动敞开连贯,链路敞开,取得通信单方的网络地址等。它也蕴含了 Netty 框架相干的一些性能,包含获取该 Channel 的 EventLoop, 获取缓冲分配器 ByteBufAllocator 和 pipeline 等。
ChannelPool:ChannelPool 用于缓存 Channel,它容许获取和开释 Channel,并充当这些 Channel 的池,从而达到复用 Channel 的目标。在 RestClient 中,每一个 Server host 对应一个 ChannelPool。
ChannelPools:ChannelPools 用于缓存 ChannelPool。在 RestClient 中,当一个 Server host 长时间没有被拜访时,其所对应的 ChannelPool 将会被视作缓存过期,从而被回收资源。
4.2.6.2EventLoop & EventLoopGroup
EventLoop:EventLoop 在 Netty 中被用来运行工作来解决在 Channel 的生命周期内产生的事件。在 RestClient 中,一个 EventLoop 对应了一个线程。
EventLoopGroup:EventLoopGroup 为一组 EventLoop,其保障将多个工作尽可能地平均地调配在多个 EventLoop 上。
4.2.6.3Epoll
Epoll 是 Linux 内核的可扩大 I / O 事件告诉机制,蕴含上面这三个零碎调用。
int epoll_create(int size);
在内核中创立 epoll 实例并返回一个 epoll 文件描述符(对应上图中 EpollEventLoop 中的 epollFD)。在最后的实现中,调用者通过 size 参数告知内核须要监听的文件描述符数量。如果监听的文件描述符数量超过 size, 则内核会主动扩容。而当初 size 曾经没有这种语义了,然而调用者调用时 size 仍然必须大于 0,以保障后向兼容性。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
向 epfd 对应的内核 epoll 实例增加、批改或删除对 fd 上事件 event 的监听。op 能够为 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 别离对应的是增加新的事件,批改文件描述符上监听的事件类型,从实例上删除一个事件。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
当 timeout 为 0 时,epoll_wait 永远会立刻返回。而 timeout 为 -1 时,epoll_wait 会始终阻塞直到任一已注册的事件变为就绪。当 timeout 为一正整数时,epoll 会阻塞直到计时 timeout 毫秒完毕或已注册的事件变为就绪。因为内核调度提早,阻塞的工夫可能会稍微超过 timeout 毫秒。
Epoll 运作流程:
1、过程先通过调用 epoll_create 来创立一个 epoll 文件描述符(对应上图中 EpollEventLoop 中的 epollFD)。epoll 通过 mmap 开拓一块共享空间,该共享空间中蕴含一个红黑树和一个链表(对应上图 epollFD 中对应的 Shared space)。
2、过程调用 epoll 的 epoll_ctl add,把新来的链接的文件描述符放入红黑树中。
3、当红黑树中的 fd 有数据到了,就把它放入一个链表中并保护该数据可写还是可读。
下层用户空间 (通过 epoll_wait) 从链表中取出所有 fd,而后对其进行读写数据。
4.2.6.4NettyTransceiver 初始化
当 RestClient 刚实现初始化时,NettyTransceiver 也刚实现初始化,其初始化次要蕴含上面两局部:
- 初始化 ChannelPools,刚初始化的 ChannelPools 为空,其外部不含有任何 ChannelPool。
- 初始化 EpoolEventLoopGroup,EpoolEventLoopGroup 蕴含多个 EpoolEventLoop。每个 EpoolEventLoop 都蕴含上面这三个局部:
executor:真正执行工作的线程。
taskQueue:工作队列,用户要执行的工作将被退出 到该队列中,而后再被 executor 执行。
epollFD:epoll 的文件描述符,在 EpoolEventLoop 创立时,调用 epoll_create 来创立一个 epoll 的共享空间,其对应的文件描述符就是 epollFD。
4.2.6.5NettyTransceiver 发送申请
当第一次发送申请时:NettyTransceiver 将会为该 Server host 创立一个 ChannelPool(如上图中的 ChannelPool1),并缓存到 channelPools 中(默认 10 分钟内该 Server host 没有申请则视为缓存过期,其对应的 ChannelPool 将被从 channelPools 中删除)。ChannelPool 在初始化时,次要蕴含上面两局部:
- 初始化 channelDeque,用于缓存 channel,获取 channel 就是从 channelDeque 中拿出一个 channel。
- 在 EventLoopGroup 中选定一个 EventLoop 作为 executor,该 executor 用来执行获取连贯等操作。之所以 ChannelPool 须要一个固定的 executor 来执行获取连贯等操作,是为了避免出现多个线程同时获取连贯的状况,从而不须要对获取连贯的操作进行加锁。
ChannelPool 初始化实现后,则将由 executor 从 ChannelPool 中获取 channel,首次获取时,因为 ChannelPool 中还没有 channel,则将初始化第一个 channel,channel 的初始化步骤次要蕴含上面几步:
- 创立连贯,将连贯封装为 channel。
- 将 channel 对应的连贯通过 epoll_ctl add 办法退出到 EpollEventLoopGroup 中的一个 EpollEventLoop 的 epollFD 对应的共享空间的红黑树中。
- 将 channel 放到对应 ChannelPool 的 channelDeque 中。
初始化 channel 实现后,executor 则将初始化好的 channel 返回,由绑定该 channel 的 EpollEventLoop(即初始化 channel 第二步中所选定的 EpollEventLoop)继续执行发送申请数据的工作。
4.2.6.6NettyTransceiver 接管响应
4.2.7.1 线程模型的进一步优化
下面的线程模型为咱们以后版本的线程模型,也是 Netty 自带连接池的线程模型。然而这种线程模型的性能肯定是最高的吗?
这个问题的答案应该是否定的,因为只管 ChannelPool 中指定一个 EventLoop 作为 executor 来执行获取 Channel 的操作能够使得 获取 Channel 的过程无多线程争抢,然而却引入了上面这两个问题:
- 获取 Channel 到 Channel.write()之间大概率会进行一次 EventLoop 切换 (有可能会将 获取 Channel 与 Channel.write() 调配到同一个 EventLoop,如果调配到同一个 EventLoop,则不须要进行 EventLoop 切换,所以这里说大概率会切换), 这次切换是有肯定的性能老本的。
- EventLoopGroup 中的 EventLoop 任务分配不平均。因为 channelPool 中获取连贯的那个 EventLoop 在获取连贯的同时还要解决数据的收发,比其余 EventLoop 多做一些工作,该 EventLoop 也成为了性能瓶颈点。在咱们理论测试当中,也确实发现有一个 EventLoop 的线程 CPU 利用率较其它 EventLoop 更高一些。
那么更优越的线程模型是怎么的呢?通过下面的剖析,咱们感觉它应该要满足上面两点:
- 获取 Channel 到 Channel.write() 之间无线程切换。
- 每个 EventLoop 的任务分配平均。
基于咱们的需要,咱们能够得出最佳的构造模型与线程模型应该为上面这种:
优化后的构造模型:
如上图所示:一个 ChannelPool 由多个 ChildChannelPool 形成(个数 = IO 线程个数),一个 ChildChannelPool 与一个 EventLoopGroup 绑定,该 EventLoopGroup 仅含有一个 EventLoop (即一个 ChildChannelPool 对应一个 EventLoop)。
优化后的线程模型:
如上图所示:先在业务线程中执行一些操作并获取
ChannelPool,及选取一个 ChildChannelPool (选取的实现相似于 EventLoopGroup.next()实现,其保障了 ChildChannelPool 的平均调配),而后通过 ChildChannelPool 来获取 Channel (该过程在 ChildChannelPool 对应的 EventLoop 中执行),而后调用 Channel.write()
(该过程也在 ChildChannelPool 对应的 EventLoop 中执行)。
上述过程奇妙的达成了咱们一开始所须要的高性能线程模型的两点:
- 获取 Channel 到 Channel.write() 之间无线程切换 —— 因为 ChildChannelPool 中的 EventLoopGroup 仅有一个 EventLoop,其创立的 Channel 也只能绑定该 EventLoop,因而获取 Channel 与 Channel.write()都只能在该 EventLoop 种执行,从而没有了线程切换。
- 每 个 EventLoop 任务分配平均 —— 因为 ChildChannelPool 是被平均地从 ChannelPool 中获取的(该过程与 EventLoopGroup.next() 的过程相似),而一个 ChildChannelPool 刚好对应了一个 EventLoop,从而使得申请工作被平均调配。
实际中咱们也通过一个 Demo 进行了测试:发现采纳下面这种线程模型与构造模型,使得 RestClient 的性能在以后版本的根底上又晋升了 20% 左右。预计下个版本中 RestClient 将会提供下面这种线程模型与构造模型。
其它性能优化的一些设计
5、Netty
RestClient 基于 Netty 编写,Netty 自带的一些高性能个性天然是 RestClient 高性能的基石,Netty 常见个性均在 RestClient 中有所使用:
- Epoll
- Channel & ChannelPool
- EventLoop & EventLoopGroup
- ByteBuf & PooledByteBufAllocator
- Future & Promise
- FastThreadLocal &InternalThreadLocalMap
- …
其中:Epoll、Channel & ChannelPool、EventLoop & EventLoopGroup 咱们在该篇文档的结构设计局部曾经有过解说,这里不再对其做过多解释,上面咱们次要来看看其它几个局部:
5.1 ByteBuf & PooledByteBufAllocator
Netty 应用了即易于应用又具备良好性能的 ByteBuf 来代替 ByteBuffer。这里不对 ByteBuf 进行具体的介绍,次要简略介绍 RestClient 中如何利用 ByteBuf 来进步性能以取得更好地用户体验:
- 发送申请时,应用 PooledByteBufAllocator 来调配 ByteBuf,其池化了 ByteBuf 的实例以进步性能并最大限度地缩小内存碎片。
- 接管响应时,应用 CompositeByteBuf,它提供了一个将多个缓冲区示意为单个合并缓冲区的虚构示意,缩小了当响应分批次到来时聚合响应产生的不必要地数据拷贝。
5.2 Future & Promise
Future & Promise 为 Netty 异步的基石,这里不对 Future & Promise 进行具体的介绍,次要介绍 RestClient 中对于 Future & Promise 一些相干的技术上取舍。
RestClient 利用 Future & Promise 来实现数据包收发时的异步,并在面向用户时将 Future & Promise 转化成 CompletionStage。由此实现了从数据包收发 到用户编解码的整个申请链路的异步化。
5.3WhyCompletionStage,Not Future & Promise?
CompletionStage 是 Java8 新增的一个接口,用于异步执行中的阶段解决,其大量用在 Lambda 表达式计算过程中,目前只有 CompletableFuture 一个实现类。
比起 Netty 的 Future & Promise,Java 开发者更加相熟 CompletionStage,且 CompletionStage 的接口性能也更加弱小,用户能够借其更加灵便地实现业务逻辑。
5.4 Why CompletionStage,Not CompletableFuture?
之所以应用 CompletionStage
而不应用 CompletableFuture。是因为 CompletionStage 是接口,而 CompletableFuture 为 CompletionStage 的实现,应用 CompletionStage 更合乎面向接口编程的准则。同时用户也能够应用 CompletionStage.toCompletableFuture()来将 CompletionStage 转化为 CompletableFuture。
5.5How To Combine Future & Promise With CompletionStage?
在用户调用申请发送时,咱们构建了一个 CompletionStage,并在执行 Netty 解决申请与响应逻辑返回的 Future 中减少 Listener,在该 Listener 中完结 CompletionStage。通过这样实现了将 Future & Promise 与 CompletionStage 联合,从而实现整个申请链路的异步化。
对这块感兴趣的用户能够查看 io.esastack.httpclient.core.netty.HttpTransceiverImpl 中的 handle()办法,该办法中实现了 Future 到 CompletionStage 的转化。
5.6 FastThreadLocal&InternalThreadLocalMap
FastThreadLocal 通过将 ThreadLocal 中应用哈希构造的 ThreadLocalMap 改为了间接应用数组构造的 InternalThreadLocalMap。ThreadLocal 与 FastThreadLocal 结构图大抵如下:
ThreadLocal 结构图
FastThreadLocal 结构图
如上图所示,比起 ThreadLocalMap,InternalThreadLocalMap 间接依据 index 来获取值、设置值的做法更加简略,且间接应用数组的复杂度更低(尽管 ThreadLocalMap 也是数组构造,然而其在数组的存取操作外还封装了大量 hash 计算以及避免 hash 碰撞的相干操作)。因而 FastThreadLocal 取得了更高的性能。
RestClient 中均应用 FastThreadLocal 代替 ThreadLocal 以获取更高的性能。
5.7 Encode & Decode
与大多数 Http 客户端框架不同,RestClient 不仅反对将 Java 对象 Encode 成 byte[],还反对将 Java 对象 Encode 成其余底层 Netty 反对的对象,如:File、MultipartBody 等,将来还将会反对 ChunkInput 用来反对将申请体比拟大的申请分块发送。
之所以这么设计,是因为如果咱们仅仅反对将 Java 对象 Encode 成 byte[],那么当 Encode 后的 byte[]数据过大时,将会呈现 OutOfMemoryException。用户在发送大文件或者自身申请体较大的申请时,都会呈现这个问题。
为了解决这个问题,RestClient 通过让用户能够将 Java 对象 Encode 成 File 或者 ChunkInput 来解决这一类问题。当用户将 Java 对象 Encode 成 File 时,RestClient 将会借助底层的 Netty 应用 NIO 以零拷贝的形式发送文件,防止了 OOM 的同时又缩小了数据的屡次拷贝。
同理当用户将 Java 对象 Encode 成 ChunkInput 时,RestClient 将会分块发送数据,防止数据一次性全副加载进内存,从而防止 OOM 的状况。(PS:ChunkInput 以后版本暂不反对,但已留出扩大点,将在下一版本反对)
Decode 时也做了同样的优化,因为原理雷同这里就不再开展解说了。
6、结语
只管 RestClient 次要只波及发动申请这个简略的性能,然而“麻雀虽小,五脏俱全”,它思考到了性能优化的方方面面,同时在 接口设计、代码整洁、功能完善 几个方面上也尽量做到了毫不妥协。
它还是一个年老的我的项目,欢送各路技术爱好者们退出,一起探讨学习与提高。
作者简介
Without OPPO 云计算中心云原生组后端工程师
深度参加 ESA Restlight、ESA RestClient、ServiceKeeper 等多个高性能开源框架我的项目
获取更多精彩内容,请扫码关注 [OPPO 数智技术] 公众号