前言
HTTP是咱们替换数据和媒体流的古代利用网络,无效利用HTTP能够使咱们节俭带宽和更快地加载数据,Square公司开源的OkHttp网络申请是有效率的HTTP客户端。之前的知识面仅限于框架API的调用,接触到理论的工作之后深知本人常识的有余,故而深挖框架源码尽力汲取前辈的设计教训。对于此框架的源码解析网上的教程多不胜数,此文名为源码解析,实则是炒冷饭之作,如有谬误和不足之处还望各位看官指出。
拦截器
拦截器是OkHttp框架设计的精华所在,拦截器所定义的是Request的所通过的责任链而不论Request的具体执行过程,并且能够让开发人员自定义本人的拦截器性能并且插入到责任链中
用户自定义的拦截器位于 OkHttpClient.addInterceptor() 增加到interceptors责任链中
RealCall.execute()执行的时候调用RealCall.getResponseWithInterceptorChain()将 来自 OkHttpClient的interceptors以及默认的拦截器一并退出到RealInterceptorChain责任链中并调用, 代码并没有对originalRequest进行封装, InterceptorChain和originalRequest一并流转到 RealInterceptorChain类中解决
CustomInterceptorRetryAndFollowUpInterceptorBridgeInterceptorCacheInterceptorConnectInterceptorNetworkInterceptorsCallServiceInterceptor
1.RealInterceptorChain.proceed()
2.EventListener.callStart()也是在RealCall.execute()嵌入到Request调用过程, EventListener.callEnd()位于StreamAllocation中调用
3.Request.Builder
url (String/URL/HttpUrl)
header
CacheControl
Tag (Use this API to attach timing, debugging, or other application data to a request so that you may read it in interceptors, event listeners, or callbacks.)
BridgeInterceptor
Bridges from application code to network code. First it builds a network request from a user request. Then it proceeds to call the network. Finally it builds a user response from the network response.
此拦截器是利用码到网络码的桥接。它会将用户申请封装成一个网络申请并且执行申请,同时它还实现从网络响应到用户响应的转化. 最初Chain.proceed() 办法启动拦截器责任链, RealInterceptorChain中通过递归调用将网络申请以及响应的工作别离调配到各个拦截器中, 而后通过ResponseBuilder.build()办法将网络响应封装, 而后递归调用责任链模式使得调用以及Response解决的过程能够一并写入BridgeInterceptor中
public final class RealInterceptorChain implements Interceptor.Chain { public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; ... // Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,writeTimeout); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); ... return response; }}
CallServiceInterceptor;
Interceptor的逻辑均在intercept()办法中实现, 在通过Chain实体类获取到申请主题之后,通过BufferedSink接口将申请转发到Okio接口,在拦挡过程中通过EventListener接口将拦截器解决状态(次要是RequestBodyStart和RequestBodyEnd两个状态)发送进来
public final class CallServiceInterceptor implements Interceptor { @Override public Response intercept(Chain chain) throws IOException { Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { httpCodec.flushRequest(); realChain.eventListener().responseHeadersStart(realChain.call()); responseBuilder = httpCodec.readResponseHeaders(true); } if (responseBuilder == null) { // Write the request body if the "Expect: 100-continue" expectation was met. realChain.eventListener().requestBodyStart(realChain.call()); long contentLength = request.body().contentLength(); CountingSink requestBodyOut = new CountingSink(httpCodec.createRequestBody(request, contentLength)); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); realChain.eventListener() .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount); } else if (!connection.isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. streamAllocation.noNewStreams(); } } }}
CacheInterceptor;
public final class CacheInterceptor implements Interceptor { @Override public Response intercept(Chain chain) throws IOException { CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); Request networkRequest = strategy.networkRequest; Response cacheResponse = strategy.cacheResponse; if (cache != null) { /** * Track an HTTP response being satisfied with {@code cacheStrategy}. * 次要是跟踪networkRequest次数以及对应Cache的hitcount */ cache.trackResponse(strategy); } if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. } // If we're forbidden from using the network and the cache is insufficient, fail. if (networkRequest == null && cacheResponse == null) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } // If we don't need the network, we're done. if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } //在chain.proceed()调用下一个拦截器 Response networkResponse = null; try { networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } } //解决response并返回 ... return response; }}
OkHttpClient
OkHttpClient托管着所有HTTP调用, 每个Client均领有本人的连接池和线程池
实现抽象类Internal的办法,这是Internel抽象类惟一的实现,办法与CacheInterceptor管制Http的Header.Lenient区域和StreamAlloction从连接池中获取连贯无关
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { ... synchronized (connectionPool) { ... if (result == null) { // Attempt to get a connection from the pool. Internal.instance.get(connectionPool, address, this, null); if (connection != null) { foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } return result;
RouteDatabase && RouteSeletor
RouteDatabase是记录连贯失败的连贯门路的黑名单,从而OkHttp能够从失败中学习并且偏向于抉择其余可用的门路,RouteSeletor通过RouteDatabase.shouldPostpone(route)办法可获知此门路是否近期曾连贯失败,RouteSelector局部源码如下:
public final class RouteSelector { /** * Clients should invoke this method when they encounter a connectivity failure on a connection * returned by this route selector. * 在StreamAllocation.streamFailed()中增加了routeSelector.connectFailed()逻辑 */ public void connectFailed(Route failedRoute, IOException failure) { if (failedRoute.proxy().type() != Proxy.Type.DIRECT && address.proxySelector() != null) { // Tell the proxy selector when we fail to connect on a fresh connection. address.proxySelector().connectFailed( address.url().uri(), failedRoute.proxy().address(), failure); } routeDatabase.failed(failedRoute); }}
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } } ... /** Used by {@code Call#execute} to signal it is in-flight. */ synchronized void executed(RealCall call) { runningSyncCalls.add(call); }
ExecutorSevice.execute(AsyncCall)执行代码位于AsyncCall外部复写的execute()办法, 办法内定义一些Callback回调节点运行逻辑,包含用户被动勾销执行(应用retryAndFollowUpInterceptor)以及执行申请胜利或者失败时的回调办法
final class AsyncCall extends NamedRunnable { ... @Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { eventListener.callFailed(RealCall.this, e); responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } }
惰性初始模式(Created Lazily)成员
ExecutorService()
CacheControl
WebSocket
WebSocket 异步非梗塞的web socket接口 (通过Enqueue办法来实现)
OkHttpClient 通过实现 WebSocket.Factory.newWebSocket 接口实现工厂结构, 通常是由 OkHttpClient来结构
WebSocket生命周期:
Connecting状态: 每个websocket的初始状态, 此时Message可能位于入队状态然而还没有被Dispatcher解决
Open状态: WebSocket曾经被服务器端承受并且Socket位于齐全凋谢状态, 所有Message入队之后会即刻被解决
Closing状态: WebSocket进入优雅的敞开状态,WebSocket持续解决已入队的Message但回绝新的Message入队
Closed状态: WebSocket已实现收发Message的过程, 进入齐全敞开状态
WebSocket受到网络等各种因素影响, 可能会断路而提前进入敞开流程
Canceled状态: 被动WebSocket失败连贯为非优雅的过程, 而被动则是优雅短路过程
RealWebSocket
RealWebSocket治理着Request队列内容所占的空间大小以及敞开Socket之后留给优雅敞开的工夫,默认为16M和60秒,在RealWebSocket.connect()办法中RealWebSocket对OkHttpClient以及Request封装成Call的模式,而后通过Call.enqueue()办法定义调用胜利和失败时的Callback代码
public void connect(OkHttpClient client) { client = client.newBuilder() .eventListener(EventListener.NONE) .protocols(ONLY_HTTP1) .build(); final Request request = originalRequest.newBuilder() .header("Upgrade", "websocket") .header("Connection", "Upgrade") .header("Sec-WebSocket-Key", key) .header("Sec-WebSocket-Version", "13") .build(); call = Internal.instance.newWebSocketCall(client, request); call.enqueue(new Callback() { @Override public void onResponse(Call call, Response response) { try { checkResponse(response); } catch (ProtocolException e) { failWebSocket(e, response); closeQuietly(response); return; } // Promote the HTTP streams into web socket streams. StreamAllocation streamAllocation = Internal.instance.streamAllocation(call); streamAllocation.noNewStreams(); // Prevent connection pooling! Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation); // Process all web socket messages. try { listener.onOpen(RealWebSocket.this, response); String name = "OkHttp WebSocket " + request.url().redact(); initReaderAndWriter(name, streams); streamAllocation.connection().socket().setSoTimeout(0); loopReader(); } catch (Exception e) { failWebSocket(e, null); } } @Override public void onFailure(Call call, IOException e) { failWebSocket(e, null); } }); }
当Call申请被服务端响应的时候就将HTTP流导入到Web Socket流中,并且调用WebSocketListener绝对应的状态办法, WebSocketListener状态如下:
onOpen()onMessage()onClosing()onClosed()onFailure()
WebSocket -> RealWebSocket
Connection -> RealConnection
Interceptor -> RealInterceptorChain
Call -> RealCall
ResponseBody -> RealResponseBody
Gzip压缩机制
解决Gzip压缩的代码在BridgeInterceptor中,默认状况下为gzip压缩状态,能够从上面的源码片段中获知。如果header中没有Accept-Encoding,默认主动增加 ,且标记变量transparentGzip为true
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing // the transfer stream. boolean transparentGzip = false; if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true; requestBuilder.header("Accept-Encoding", "gzip"); }
BridgeInterceptor解压缩的过程调用了okio.GzipSource()办法并调用Okio.buffer()缓存解压过程,源码如下
if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) && HttpHeaders.hasBody(networkResponse)) { GzipSource responseBody = new GzipSource(networkResponse.body().source()); Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build(); responseBuilder.headers(strippedHeaders); String contentType = networkResponse.header("Content-Type"); responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody))); }
RealCall构造方法
在RealCall构造方法下面,晚期版本的RealCall构造方法中将EventListener.Factory以及EventListenerFactory.Create()离开解决导致RealCall构造方法非线程平安. 当初版本的RealCall的构造函数应用OkHttpClient.eventListenerFactory().create()
晚期版本如下:
final class RealCall implements Call { RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { ... final EventListener.Factory eventListenerFactory = client.eventListenerFactory(); this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; //重试和跟进拦截器 this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket); // TODO(jwilson): this is unsafe publication and not threadsafe. // 这是不平安的公布,不是线程平安的。 this.eventListener = eventListenerFactory.create(this); }
当初 OkHttp 3.11.0 的RealCall源代码如下
final class RealCall implements Call { private EventListener eventListener; ... private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket); } static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { // Safely publish the Call instance to the EventListener. RealCall call = new RealCall(client, originalRequest, forWebSocket); call.eventListener = client.eventListenerFactory().create(call); return call; }}
ConnetionPool
连接池可能复用http连贯从而缩小拜访雷同指标主机状况下的网络提早,此类实现治理连贯开闭的策略并应用与连接池一一对应的后盾线程清理过期的连贯。ConnectionPool提供对Deque<RealConnection>进行操作的办法别离为put、get、connectionBecameIdle和evictAll几个操作。别离对应放入连贯、获取连贯、移除连贯和移除所有连贯操作,这里咱们举例put和get操作。
public final class ConnectionPool { ... private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */, Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true)); /** The maximum number of idle connections for each address. */ private final int maxIdleConnections; private final long keepAliveDurationNs; private final Runnable cleanupRunnable = new Runnable() { @Override public void run() { while (true) { long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try { ConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } } }; ...}
cleanUpRunnable外面是一个while(true),一个循环包含:
调用一次cleanUp办法进行清理并返回一个long
如果是-1则退出,否则调用wait办法期待这个long值的工夫
okhttp是依据StreamAllocation援用计数是否为0来实现主动回收连贯的。cleanUpRunnable遍历每一个RealConnection,通过援用数目确定哪些是闲暇的,哪些是在应用中,同时找到闲暇工夫最长的RealConnection。如果闲暇数目超过最大闲暇数或者闲暇工夫超过最大闲暇工夫,则清理掉这个RealConnection并返回0,示意须要立即再次清理
public final class ConnectionPool { ... void put(RealConnection connection) { assert (Thread.holdsLock(this)); if (!cleanupRunning) { cleanupRunning = true; executor.execute(cleanupRunnable); } connections.add(connection); } ...}
咱们在put操作前首先要调用executor.execute(cleanupRunnable)来清理闲置的线程。
RealConnection
RealConnection是socket物理连贯的包装,它外面保护了List<Reference<StreamAllocation>>的援用。List中StreamAllocation的数量也就是socket被援用的计数,如果计数为0的话,阐明此连贯没有被应用就是闲暇的,须要被回收;如果计数不为0,则示意下层代码依然援用,就不须要敞开连贯。
相干链接
【Android教程】基于Okhttp的高可用网络框架原理解析