关于android:Android开发神器OkHttp框架源码解析

33次阅读

共计 13691 个字符,预计需要花费 35 分钟才能阅读完成。

前言

HTTP 是咱们替换数据和媒体流的古代利用网络,无效利用 HTTP 能够使咱们节俭带宽和更快地加载数据,Square 公司开源的 OkHttp 网络申请是有效率的 HTTP 客户端。之前的知识面仅限于框架 API 的调用,接触到理论的工作之后深知本人常识的有余,故而深挖框架源码尽力汲取前辈的设计教训。对于此框架的源码解析网上的教程多不胜数,此文名为源码解析,实则是炒冷饭之作,如有谬误和不足之处还望各位看官指出。

拦截器

拦截器是 OkHttp 框架设计的精华所在,拦截器所定义的是 Request 的所通过的责任链而不论 Request 的具体执行过程,并且能够让开发人员自定义本人的拦截器性能并且插入到责任链中

用户自定义的拦截器位于 OkHttpClient.addInterceptor() 增加到 interceptors 责任链中

RealCall.execute()执行的时候调用 RealCall.getResponseWithInterceptorChain()将 来自 OkHttpClient 的 interceptors 以及默认的拦截器一并退出到 RealInterceptorChain 责任链中并调用, 代码并没有对 originalRequest 进行封装, InterceptorChain 和 originalRequest 一并流转到 RealInterceptorChain 类中解决

CustomInterceptor
RetryAndFollowUpInterceptor
BridgeInterceptor
CacheInterceptor
ConnectInterceptor
NetworkInterceptors
CallServiceInterceptor

1.RealInterceptorChain.proceed()
2.EventListener.callStart()也是在 RealCall.execute()嵌入到 Request 调用过程, EventListener.callEnd()位于 StreamAllocation 中调用
3.Request.Builder

url (String/URL/HttpUrl)

header

CacheControl

Tag (Use this API to attach timing, debugging, or other application data to a request so that you may read it in interceptors, event listeners, or callbacks.)

BridgeInterceptor

Bridges from application code to network code. First it builds a network request from a user request. Then it proceeds to call the network. Finally it builds a user response from the network response.

此拦截器是利用码到网络码的桥接。它会将用户申请封装成一个网络申请并且执行申请,同时它还实现从网络响应到用户响应的转化. 最初 Chain.proceed() 办法启动拦截器责任链,RealInterceptorChain 中通过递归调用将网络申请以及响应的工作别离调配到各个拦截器中,而后通过 ResponseBuilder.build()办法将网络响应封装, 而后递归调用责任链模式使得调用以及 Response 解决的过程能够一并写入 BridgeInterceptor 中

public final class RealInterceptorChain implements Interceptor.Chain { public Response proceed(Request request, StreamAllocation streamAllocation, 
 HttpCodec httpCodec, RealConnection connection) throws IOException {if (index >= interceptors.size()) throw new AssertionError();
 calls++;
 ... // Call the next interceptor in the chain.
 RealInterceptorChain next = new RealInterceptorChain(interceptors, 
 streamAllocation, httpCodec,connection, index + 1, request, call, 
 eventListener, connectTimeout, readTimeout,writeTimeout);
 Interceptor interceptor = interceptors.get(index);
 Response response = interceptor.intercept(next);
 ... return response;
 }
}

CallServiceInterceptor;

Interceptor 的逻辑均在 intercept()办法中实现, 在通过 Chain 实体类获取到申请主题之后,通过 BufferedSink 接口将申请转发到 Okio 接口,在拦挡过程中通过 EventListener 接口将拦截器解决状态(次要是 RequestBodyStart 和 RequestBodyEnd 两个状态)发送进来

public final class CallServiceInterceptor implements Interceptor {@Override public Response intercept(Chain chain) throws IOException {Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a"Expect: 100-continue"header on the request, wait for a"HTTP/1.1 100
 // Continue"response before transmitting the request body. If we don't get that, return
 // what we did get (such as a 4xx response) without ever transmitting the request body.
 if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {httpCodec.flushRequest();
 realChain.eventListener().responseHeadersStart(realChain.call());
 responseBuilder = httpCodec.readResponseHeaders(true);
 } if (responseBuilder == null) { // Write the request body if the "Expect: 100-continue" expectation was met.
 realChain.eventListener().requestBodyStart(realChain.call()); long contentLength = request.body().contentLength();
 CountingSink requestBodyOut = new CountingSink(httpCodec.createRequestBody(request, contentLength));
 BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
 request.body().writeTo(bufferedRequestBody);
 bufferedRequestBody.close();
 realChain.eventListener()
 .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
 } else if (!connection.isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
 // from being reused. Otherwise we're still obligated to transmit the request body to
 // leave the connection in a consistent state.
 streamAllocation.noNewStreams();}
 }
 }
}

CacheInterceptor;

public final class CacheInterceptor implements Interceptor {@Override public Response intercept(Chain chain) throws IOException {CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
 Request networkRequest = strategy.networkRequest;
 Response cacheResponse = strategy.cacheResponse; if (cache != null) { /**
 * Track an HTTP response being satisfied with {@code cacheStrategy}.
 * 次要是跟踪 networkRequest 次数以及对应 Cache 的 hitcount
 */
 cache.trackResponse(strategy);
 } if (cacheCandidate != null && cacheResponse == null) {closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
 } // If we're forbidden from using the network and the cache is insufficient, fail.
 if (networkRequest == null && cacheResponse == null) {return new Response.Builder()
 .request(chain.request())
 .protocol(Protocol.HTTP_1_1)
 .code(504)
 .message("Unsatisfiable Request (only-if-cached)")
 .body(Util.EMPTY_RESPONSE)
 .sentRequestAtMillis(-1L)
 .receivedResponseAtMillis(System.currentTimeMillis())
 .build();} // If we don't need the network, we're done.
 if (networkRequest == null) {return cacheResponse.newBuilder()
 .cacheResponse(stripBody(cacheResponse))
 .build();} // 在 chain.proceed()调用下一个拦截器
 Response networkResponse = null; try {networkResponse = chain.proceed(networkRequest);
 } finally { // If we're crashing on I/O or otherwise, don't leak the cache body.
 if (networkResponse == null && cacheCandidate != null) {closeQuietly(cacheCandidate.body());
 }
 } // 解决 response 并返回
 ... return response;
 }
}

OkHttpClient

OkHttpClient 托管着所有 HTTP 调用, 每个 Client 均领有本人的连接池和线程池

实现抽象类 Internal 的办法,这是 Internel 抽象类惟一的实现,办法与 CacheInterceptor 管制 Http 的 Header.Lenient 区域和 StreamAlloction 从连接池中获取连贯无关

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {... synchronized (connectionPool) {... if (result == null) { // Attempt to get a connection from the pool.
 Internal.instance.get(connectionPool, address, this, null); if (connection != null) {
 foundPooledConnection = true;
 result = connection;
 } else {selectedRoute = route;}
 }
 } return result;

RouteDatabase && RouteSeletor

RouteDatabase 是记录连贯失败的连贯门路的黑名单,从而 OkHttp 能够从失败中学习并且偏向于抉择其余可用的门路,RouteSeletor 通过 RouteDatabase.shouldPostpone(route)办法可获知此门路是否近期曾连贯失败,RouteSelector 局部源码如下:

public final class RouteSelector { /**
 * Clients should invoke this method when they encounter a connectivity failure on a connection
 * returned by this route selector.
 * 在 StreamAllocation.streamFailed()中增加了 routeSelector.connectFailed()逻辑
 */
 public void connectFailed(Route failedRoute, IOException failure) {if (failedRoute.proxy().type() != Proxy.Type.DIRECT && address.proxySelector() != null) { // Tell the proxy selector when we fail to connect on a fresh connection.
 address.proxySelector().connectFailed(address.url().uri(), failedRoute.proxy().address(), failure);
 }
 routeDatabase.failed(failedRoute);
 }
}
synchronized void enqueue(AsyncCall call) {if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {runningAsyncCalls.add(call);
 executorService().execute(call);
 } else {readyAsyncCalls.add(call);
 }
 }
 ... /** Used by {@code Call#execute} to signal it is in-flight. */
 synchronized void executed(RealCall call) {runningSyncCalls.add(call);
 }

ExecutorSevice.execute(AsyncCall)执行代码位于 AsyncCall 外部复写的 execute()办法, 办法内定义一些 Callback 回调节点运行逻辑,包含用户被动勾销执行(应用 retryAndFollowUpInterceptor)以及执行申请胜利或者失败时的回调办法

final class AsyncCall extends NamedRunnable {... @Override protected void execute() { boolean signalledCallback = false; try {Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) {
 signalledCallback = true;
 responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
 } else {
 signalledCallback = true;
 responseCallback.onResponse(RealCall.this, response);
 }
 } catch (IOException e) {if (signalledCallback) { // Do not signal the callback twice!
 Platform.get().log(INFO, "Callback failure for" + toLoggableString(), e);
 } else {eventListener.callFailed(RealCall.this, e);
 responseCallback.onFailure(RealCall.this, e);
 }
 } finally {client.dispatcher().finished(this);
 }
 }
 }

惰性初始模式 (Created Lazily) 成员

ExecutorService()

CacheControl

WebSocket

WebSocket 异步非梗塞的 web socket 接口(通过 Enqueue 办法来实现)

OkHttpClient 通过实现 WebSocket.Factory.newWebSocket 接口实现工厂结构, 通常是由 OkHttpClient 来结构

WebSocket 生命周期:

Connecting 状态: 每个 websocket 的初始状态, 此时 Message 可能位于入队状态然而还没有被 Dispatcher 解决

Open 状态: WebSocket 曾经被服务器端承受并且 Socket 位于齐全凋谢状态, 所有 Message 入队之后会即刻被解决

Closing 状态: WebSocket 进入优雅的敞开状态,WebSocket 持续解决已入队的 Message 但回绝新的 Message 入队

Closed 状态: WebSocket 已实现收发 Message 的过程, 进入齐全敞开状态

WebSocket 受到网络等各种因素影响, 可能会断路而提前进入敞开流程

Canceled 状态: 被动 WebSocket 失败连贯为非优雅的过程, 而被动则是优雅短路过程

RealWebSocket

RealWebSocket 治理着 Request 队列内容所占的空间大小以及敞开 Socket 之后留给优雅敞开的工夫,默认为 16M 和 60 秒,在 RealWebSocket.connect()办法中 RealWebSocket 对 OkHttpClient 以及 Request 封装成 Call 的模式,而后通过 Call.enqueue()办法定义调用胜利和失败时的 Callback 代码

public void connect(OkHttpClient client) {client = client.newBuilder()
 .eventListener(EventListener.NONE)
 .protocols(ONLY_HTTP1)
 .build(); final Request request = originalRequest.newBuilder()
 .header("Upgrade", "websocket")
 .header("Connection", "Upgrade")
 .header("Sec-WebSocket-Key", key)
 .header("Sec-WebSocket-Version", "13")
 .build();
 call = Internal.instance.newWebSocketCall(client, request);
 call.enqueue(new Callback() {@Override public void onResponse(Call call, Response response) { try {checkResponse(response);
 } catch (ProtocolException e) {failWebSocket(e, response);
 closeQuietly(response); return;
 } // Promote the HTTP streams into web socket streams.
 StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
 streamAllocation.noNewStreams(); // Prevent connection pooling!
 Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation); // Process all web socket messages.
 try {listener.onOpen(RealWebSocket.this, response);
 String name = "OkHttp WebSocket" + request.url().redact();
 initReaderAndWriter(name, streams);
 streamAllocation.connection().socket().setSoTimeout(0);
 loopReader();} catch (Exception e) {failWebSocket(e, null);
 }
 } @Override public void onFailure(Call call, IOException e) {failWebSocket(e, null);
 }
 });
 }

当 Call 申请被服务端响应的时候就将 HTTP 流导入到 Web Socket 流中,并且调用 WebSocketListener 绝对应的状态办法, WebSocketListener 状态如下:

onOpen()onMessage()onClosing()onClosed()onFailure()

WebSocket -> RealWebSocket

Connection -> RealConnection

Interceptor -> RealInterceptorChain

Call -> RealCall

ResponseBody -> RealResponseBody

Gzip 压缩机制

解决 Gzip 压缩的代码在 BridgeInterceptor 中,默认状况下为 gzip 压缩状态,能够从上面的源码片段中获知。如果 header 中没有 Accept-Encoding,默认主动增加,且标记变量 transparentGzip 为 true

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
 // the transfer stream.
 boolean transparentGzip = false; if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
 transparentGzip = true;
 requestBuilder.header("Accept-Encoding", "gzip");
 }

BridgeInterceptor 解压缩的过程调用了 okio.GzipSource()办法并调用 Okio.buffer()缓存解压过程,源码如下

if (transparentGzip
 && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
 && HttpHeaders.hasBody(networkResponse)) {GzipSource responseBody = new GzipSource(networkResponse.body().source());
 Headers strippedHeaders = networkResponse.headers().newBuilder()
 .removeAll("Content-Encoding")
 .removeAll("Content-Length")
 .build();
 responseBuilder.headers(strippedHeaders); String contentType = networkResponse.header("Content-Type");
 responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
 }

RealCall 构造方法

在 RealCall 构造方法下面,晚期版本的 RealCall 构造方法中将 EventListener.Factory 以及 EventListenerFactory.Create()离开解决导致 RealCall 构造方法非线程平安. 当初版本的 RealCall 的构造函数应用 OkHttpClient.eventListenerFactory().create()

晚期版本如下:

final class RealCall implements Call {RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {... final EventListener.Factory eventListenerFactory = client.eventListenerFactory(); this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; // 重试和跟进拦截器
 this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket); // TODO(jwilson): this is unsafe publication and not threadsafe. 
 // 这是不平安的公布,不是线程平安的。this.eventListener = eventListenerFactory.create(this);
 }

当初 OkHttp 3.11.0 的 RealCall 源代码如下

final class RealCall implements Call { private EventListener eventListener;
 ... private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
 } static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { // Safely publish the Call instance to the EventListener.
 RealCall call = new RealCall(client, originalRequest, forWebSocket);
 call.eventListener = client.eventListenerFactory().create(call); return call;
 }
}

ConnetionPool

连接池可能复用 http 连贯从而缩小拜访雷同指标主机状况下的网络提早,此类实现治理连贯开闭的策略并应用与连接池一一对应的后盾线程清理过期的连贯。ConnectionPool 提供对 Deque<RealConnection> 进行操作的办法别离为 put、get、connectionBecameIdle 和 evictAll 几个操作。别离对应放入连贯、获取连贯、移除连贯和移除所有连贯操作,这里咱们举例 put 和 get 操作。

public final class ConnectionPool {
 ... private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
 Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true)); /** The maximum number of idle connections for each address. */
 private final int maxIdleConnections; private final long keepAliveDurationNs; private final Runnable cleanupRunnable = new Runnable() { @Override public void run() {while (true) {long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L;
 waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try {ConnectionPool.this.wait(waitMillis, (int) waitNanos);
 } catch (InterruptedException ignored) {}}
 }
 }
 }
 };
 ...
}

cleanUpRunnable 外面是一个 while(true), 一个循环包含:

调用一次 cleanUp 办法进行清理并返回一个 long

如果是 - 1 则退出,否则调用 wait 办法期待这个 long 值的工夫

okhttp 是依据 StreamAllocation 援用计数是否为 0 来实现主动回收连贯的。cleanUpRunnable 遍历每一个 RealConnection,通过援用数目确定哪些是闲暇的,哪些是在应用中,同时找到闲暇工夫最长的 RealConnection。如果闲暇数目超过最大闲暇数或者闲暇工夫超过最大闲暇工夫,则清理掉这个 RealConnection 并返回 0,示意须要立即再次清理

public final class ConnectionPool {... void put(RealConnection connection) {assert (Thread.holdsLock(this)); if (!cleanupRunning) {
 cleanupRunning = true;
 executor.execute(cleanupRunnable);
 }
 connections.add(connection);
 }
 ...
}

咱们在 put 操作前首先要调用 executor.execute(cleanupRunnable)来清理闲置的线程。

RealConnection

RealConnection 是 socket 物理连贯的包装,它外面保护了 List<Reference<StreamAllocation>> 的援用。List 中 StreamAllocation 的数量也就是 socket 被援用的计数,如果计数为 0 的话,阐明此连贯没有被应用就是闲暇的,须要被回收;如果计数不为 0,则示意下层代码依然援用,就不须要敞开连贯。

相干链接
【Android 教程】基于 Okhttp 的高可用网络框架原理解析

正文完
 0