前言
本文是对OkHttp
开源库的一个具体解析,如果你感觉本人不够理解OkHttp
,想进一步学习一下,置信本文对你会有所帮忙。
本文蕴含了具体的申请流程剖析、各大拦截器解读以及本人的一点反思总结,文章很长,欢送大家一起交换探讨。
应用办法
应用办法非常简略,别离创立一个OkHttpClient
对象,一个Request
对象,而后利用他们创立一个Call
对象,最初调用同步申请execute()
办法或者异步申请enqueue()
办法来拿到Response
。
private final OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url("https://github.com/") .build(); //同步申请 Response response = client.newCall(request).execute(); //todo handle response //异步申请 client.newCall(request).enqueue(new Callback() { @Override public void onFailure(@NotNull Call call, @NotNull IOException e) { //todo handle request failed } @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { //todo handle Response } });
根本对象介绍
正如应用办法中所述,咱们先后构建了 OkHttpClient
对象、Request
对象、Call
对象,那这些对象都是什么意思,有什么作用呢?这个就须要咱们进一步学习理解了。
OkHttpClient
一个申请的配置类,采纳了建造者模式,不便用户配置一些申请参数,如配置callTimeout
,cookie
,interceptor
等等。
open class OkHttpClient internal constructor( builder: Builder) : Cloneable, Call.Factory, WebSocket.Factory { constructor() : this(Builder()) class Builder constructor() { //调度器 internal var dispatcher: Dispatcher = Dispatcher() //连接池 internal var connectionPool: ConnectionPool = ConnectionPool() //整体流程拦截器 internal val interceptors: MutableList<Interceptor> = mutableListOf() //网络流程拦截器 internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() //流程监听器 internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() //连贯失败时是否重连 internal var retryOnConnectionFailure = true //服务器认证设置 internal var authenticator: Authenticator = Authenticator.NONE //是否重定向 internal var followRedirects = true //是否从HTTP重定向到HTTPS internal var followSslRedirects = true //cookie设置 internal var cookieJar: CookieJar = CookieJar.NO_COOKIES //缓存设置 internal var cache: Cache? = null //DNS设置 internal var dns: Dns = Dns.SYSTEM //代理设置 internal var proxy: Proxy? = null //代理选择器设置 internal var proxySelector: ProxySelector? = null //代理服务器认证设置 internal var proxyAuthenticator: Authenticator = Authenticator.NONE //socket配置 internal var socketFactory: SocketFactory = SocketFactory.getDefault() //https socket配置 internal var sslSocketFactoryOrNull: SSLSocketFactory? = null internal var x509TrustManagerOrNull: X509TrustManager? = null internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS //协定 internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS //域名校验 internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT internal var certificateChainCleaner: CertificateChainCleaner? = null //申请超时 internal var callTimeout = 0 //连贯超时 internal var connectTimeout = 10_000 //读取超时 internal var readTimeout = 10_000 //写入超时 internal var writeTimeout = 10_000 internal var pingInterval = 0 internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE internal var routeDatabase: RouteDatabase? = null ···省略代码···
Request
同样是申请参数的配置类,也同样采纳了建造者模式,但相比于OkHttpClient
,Request
就非常简略了,只有四个参数,别离是申请URL
、申请办法
、申请头
、申请体
。
class Request internal constructor( @get:JvmName("url") val url: HttpUrl, @get:JvmName("method") val method: String, @get:JvmName("headers") val headers: Headers, @get:JvmName("body") val body: RequestBody?, internal val tags: Map<Class<*>, Any>) { open class Builder { //申请的URL internal var url: HttpUrl? = null //申请办法,如:GET、POST.. internal var method: String //申请头 internal var headers: Headers.Builder //申请体 internal var body: RequestBody? = null ···省略代码···
Call
申请调用接口,示意这个申请曾经筹备好能够执行,也能够勾销,只能执行一次。
interface Call : Cloneable { /** 返回发动此调用的原始申请 */ fun request(): Request /** * 同步申请,立刻执行。 * * 抛出两种异样: * 1. 申请失败抛出IOException; * 2. 如果在执行过一回的前提下再次执行抛出IllegalStateException;*/ @Throws(IOException::class) fun execute(): Response /** * 异步申请,将申请安顿在未来的某个工夫点执行。 * 如果在执行过一回的前提下再次执行抛出IllegalStateException */ fun enqueue(responseCallback: Callback) /** 勾销申请。曾经实现的申请不能被勾销 */ fun cancel() /** 是否已被执行 */ fun isExecuted(): Boolean /** 是否被勾销 */ fun isCanceled(): Boolean /** 一个残缺Call申请流程的超时工夫配置,默认选自[OkHttpClient.Builder.callTimeout] */ fun timeout(): Timeout /** 克隆这个call,创立一个新的雷同的Call */ public override fun clone(): Call /** 利用工厂模式来让 OkHttpClient 来创立 Call对象 */ fun interface Factory { fun newCall(request: Request): Call }}
RealCall
在 OkHttpClient
中,咱们利用 newCall
办法来创立一个 Call
对象,但从源码中能够看出,newCall
办法返回的是一个 RealCall
对象。
OkHttpClient.ktoverride fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
RealCall
是Call接口
的具体实现类,是利用端与网络层的连贯桥,展现利用端原始的申请与连贯数据,以及网络层返回的response
及其它数据流。 通过应用办法也可知,创立RealCall
对象后,就要调用同步或异步申请办法,所以它外面还蕴含同步申请 execute()
与异步申请 enqueue()
办法。(前面具体开展剖析)
AsyncCall
异步申请调用,是RealCall
的一个外部类,就是一个Runnable
,被调度器中的线程池所执行。
inner class AsyncCall( //用户传入的响应回调办法 private val responseCallback: Callback ) : Runnable { //同一个域名的申请次数,volatile + AtomicInteger 保障在多线程下及时可见性与原子性 @Volatile var callsPerHost = AtomicInteger(0) private set fun reuseCallsPerHostFrom(other: AsyncCall) { this.callsPerHost = other.callsPerHost }···省略代码··· fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock() var success = false try { //调用线程池执行 executorService.execute(this) success = true } catch (e: RejectedExecutionException) { val ioException = InterruptedIOException("executor rejected") ioException.initCause(e) noMoreExchanges(ioException) //申请失败,调用 Callback.onFailure() 办法 responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { //申请失败,调用调度器finish办法 client.dispatcher.finished(this) // This call is no longer running! } } } override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false timeout.enter() try { //申请胜利,获取到服务器返回的response val response = getResponseWithInterceptorChain() signalledCallback = true //调用 Callback.onResponse() 办法,将 response 传递进来 responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e) } else { //申请失败,调用 Callback.onFailure() 办法 responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { //申请出现异常,调用cancel办法来勾销申请 cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") canceledException.addSuppressed(t) //申请失败,调用 Callback.onFailure() 办法 responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { //申请完结,调用调度器finish办法 client.dispatcher.finished(this) } } } }
Dispatcher
调度器,用来调度Call
对象,同时蕴含线程池与异步申请队列,用来寄存与执行AsyncCall
对象。
class Dispatcher constructor() { @get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { //创立一个缓存线程池,来解决申请调用 executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! } /** 已筹备好的异步申请队列 */ @get:Synchronized private val readyAsyncCalls = ArrayDeque<AsyncCall>() /** 正在运行的异步申请队列, 蕴含勾销然而还未finish的AsyncCall */ private val runningAsyncCalls = ArrayDeque<AsyncCall>() /** 正在运行的同步申请队列, 蕴含勾销然而还未finish的RealCall */ private val runningSyncCalls = ArrayDeque<RealCall>()···省略代码···}
总结一下
对象 | 作用 |
---|---|
Call | 申请调用接口,示意这个申请曾经筹备好能够执行,也能够被勾销,只能执行一次。 |
RealCall | Call 接口的具体实现类,是利用与网络层之间的连贯桥,蕴含OkHttpClient 与Request 信息。 |
AsyncCall | 异步申请调用,其实就是个Runnable ,会被放到线程池中进行解决。 |
Dispatcher | 调度器,用来调度Call 对象,同时蕴含线程池与异步申请队列,用来寄存与执行AsyncCall 对象。 |
Request | 申请类,蕴含url 、method 、headers 、body 。 |
Response | 网络层返回的响应数据。 |
Callback | 响应回调函数接口,蕴含onFailure 、onResponse 两个办法。 |
流程剖析
介绍完了对象,接下来就依据应用办法,具体看一下源码吧。
同步申请
同步申请的应用办法。
client.newCall(request).execute();
newCall
办法就是创立一个RealCall
对象,而后执行其execute()
办法。
RealCall.kt override fun execute(): Response { //CAS判断是否曾经被执行了, 确保只能执行一次,如果曾经执行过,则抛出异样 check(executed.compareAndSet(false, true)) { "Already Executed" } //申请超时开始计时 timeout.enter() //开启申请监听 callStart() try { //调用调度器中的 executed() 办法,调度器只是将 call 退出到了runningSyncCalls队列中 client.dispatcher.executed(this) //调用getResponseWithInterceptorChain 办法拿到 response return getResponseWithInterceptorChain() } finally { //执行结束,调度器将该 call 从 runningSyncCalls队列中移除 client.dispatcher.finished(this) } }
调用调度器executed
办法,就是将以后的RealCall
对象退出到runningSyncCalls
队列中,而后调用getResponseWithInterceptorChain
办法拿到response
。
异步申请
在来看看异步申请。
RealCall.kt override fun enqueue(responseCallback: Callback) { //CAS判断是否曾经被执行了, 确保只能执行一次,如果曾经执行过,则抛出异样 check(executed.compareAndSet(false, true)) { "Already Executed" } //开启申请监听 callStart() //新建一个AsyncCall对象,通过调度器enqueue办法退出到readyAsyncCalls队列中 client.dispatcher.enqueue(AsyncCall(responseCallback)) }
而后调用调度器的enqueue
办法,
Dispatcher.kt internal fun enqueue(call: AsyncCall) { //加锁,保障线程平安 synchronized(this) { //将该申请调用退出到 readyAsyncCalls 队列中 readyAsyncCalls.add(call) // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.call.forWebSocket) { //通过域名来查找有没有雷同域名的申请,有则复用。 val existingCall = findExistingCallWithHost(call.host) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } //执行申请 promoteAndExecute() } private fun promoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock() val executableCalls = mutableListOf<AsyncCall>() //判断是否有申请正在执行 val isRunning: Boolean //加锁,保障线程平安 synchronized(this) { //遍历 readyAsyncCalls 队列 val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() //runningAsyncCalls 的数量不能大于最大并发申请数 64 if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. //同域名最大申请数5,同一个域名最多容许5条线程同时执行申请 if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity. //从 readyAsyncCalls 队列中移除,并退出到 executableCalls 及 runningAsyncCalls 队列中 i.remove() asyncCall.callsPerHost.incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } //通过运行队列中的申请数量来判断是否有申请正在执行 isRunning = runningCallsCount() > 0 } //遍历可执行队列,调用线程池来执行AsyncCall for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) } return isRunning }
调度器的enqueue
办法就是将AsyncCall
退出到readyAsyncCalls
队列中,而后调用promoteAndExecute
办法来执行申请,promoteAndExecute
办法做的其实就是遍历readyAsyncCalls
队列,而后将符合条件的申请用线程池执行,也就是会执行AsyncCall.run()
办法。
AsyncCall 办法的具体代码看根本对象介绍 AsyncCall,这边就不在此展现了,简略来说就是调用getResponseWithInterceptorChain
办法拿到response
,而后通过Callback.onResponse
办法传递进来。反之,如果申请失败,捕捉了异样,就通过Callback.onFailure
将异样信息传递进来。 最终,申请完结,调用调度器finish
办法。
Dispatcher.kt /** 异步申请调用完结办法 */ internal fun finished(call: AsyncCall) { call.callsPerHost.decrementAndGet() finished(runningAsyncCalls, call) } /** 同步申请调用完结办法 */ internal fun finished(call: RealCall) { finished(runningSyncCalls, call) } private fun <T> finished(calls: Deque<T>, call: T) { val idleCallback: Runnable? synchronized(this) { //将以后申请调用从 正在运行队列 中移除 if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!") idleCallback = this.idleCallback } //继续执行残余申请,将call从readyAsyncCalls中取出退出到runningAsyncCalls,而后执行 val isRunning = promoteAndExecute() if (!isRunning && idleCallback != null) { //如果执行完了所有申请,处于闲置状态,调用闲置回调办法 idleCallback.run() } }
获取Response
接着就是看看getResponseWithInterceptorChain
办法是如何拿到response
的。
internal fun getResponseWithInterceptorChain(): Response { //拦截器列表 val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket) //构建拦截器责任链 val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis ) //如果call申请实现,那就意味着交互实现了,没有更多的货色来替换了 var calledNoMoreExchanges = false try { //执行拦截器责任链来获取 response val response = chain.proceed(originalRequest) //如果被勾销,敞开响应,抛出异样 if (isCanceled()) { response.closeQuietly() throw IOException("Canceled") } return response } catch (e: IOException) { calledNoMoreExchanges = true throw noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { noMoreExchanges(null) } } }
简略概括一下:这里采纳了责任链设计模式
,通过拦截器构建了以RealInterceptorChain
责任链,而后执行proceed
办法来失去response
。
那么,这又波及拦截器是什么?拦截器责任链又是什么?
Interceptor
只申明了一个拦截器办法,在子类中具体实现,还蕴含一个Chain
接口,外围办法是proceed(request)
解决申请来获取response
。
fun interface Interceptor { /** 拦挡办法 */ @Throws(IOException::class) fun intercept(chain: Chain): Response interface Chain { /** 原始申请数据 */ fun request(): Request /** 外围办法,解决申请,获取response */ @Throws(IOException::class) fun proceed(request: Request): Response fun connection(): Connection? fun call(): Call fun connectTimeoutMillis(): Int fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain fun readTimeoutMillis(): Int fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain fun writeTimeoutMillis(): Int fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain }}
RealInterceptorChain
拦截器链就是实现Interceptor.Chain
接口,重点就是复写的proceed
办法。
class RealInterceptorChain( internal val call: RealCall, private val interceptors: List<Interceptor>, private val index: Int, internal val exchange: Exchange?, internal val request: Request, internal val connectTimeoutMillis: Int, internal val readTimeoutMillis: Int, internal val writeTimeoutMillis: Int) : Interceptor.Chain {···省略代码··· private var calls: Int = 0 override fun call(): Call = call override fun request(): Request = request @Throws(IOException::class) override fun proceed(request: Request): Response { check(index < interceptors.size) calls++ if (exchange != null) { check(exchange.finder.sameHostAndPort(request.url)) { "network interceptor ${interceptors[index - 1]} must retain the same host and port" } check(calls == 1) { "network interceptor ${interceptors[index - 1]} must call proceed() exactly once" } } //index+1, 复制创立新的责任链,也就意味着调用责任链中的下一个解决者,也就是下一个拦截器 val next = copy(index = index + 1, request = request) //取出以后拦截器 val interceptor = interceptors[index] //执行以后拦截器的拦挡办法 @Suppress("USELESS_ELVIS") val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null") if (exchange != null) { check(index + 1 >= interceptors.size || next.calls == 1) { "network interceptor $interceptor must call proceed() exactly once" } } check(response.body != null) { "interceptor $interceptor returned a response with no body" } return response }}
链式调用,最终会执行拦截器列表中的每个拦截器,返回Response
。
拦截器
OK,接下来就该看看拦截器列表中的具体拦截器了。
先上各类拦截器的总结,按程序:
client.interceptors
:这是由开发者设置的,会在所有的拦截器解决之前进行最早的拦挡解决,可用于增加一些公共参数,如自定义header
、自定义log
等等。RetryAndFollowUpInterceptor
:这里会对连贯做一些初始化工作,以及申请失败的重试工作,重定向的后续申请工作。跟他的名字一样,就是做重试工作还有一些连贯跟踪工作。BridgeInterceptor
:是客户端与服务器之间的沟通桥梁,负责将用户构建的申请转换为服务器须要的申请,以及将网络申请返回回来的响应转换为用户可用的响应。CacheInterceptor
:这里次要是缓存的相干解决,会依据用户在OkHttpClient
里定义的缓存配置,而后联合申请新建一个缓存策略,由它来判断是应用网络还是缓存来构建response
。ConnectInterceptor
:这里次要就是负责建设连贯,会建设TCP连贯
或者TLS连贯
。client.networkInterceptors
:这里也是开发者本人设置的,所以实质上和第一个拦截器差不多,然而因为地位不同,所以用途也不同。CallServerInterceptor
:这里就是进行网络数据的申请和响应了,也就是理论的网络I/O操作,将申请头与申请体发送给服务器,以及解析服务器返回的response
。
接下来咱们按程序,从上往下,对这些拦截器进行一一解读。
client.interceptors
这是用户本人定义的拦截器,称为利用拦截器,会保留在OkHttpClient
的interceptors: List<Interceptor>
列表中。 他是拦截器责任链中的第一个拦截器,也就是说会第一个执行拦挡办法,咱们能够通过它来增加自定义Header信息
,如:
class HeaderInterceptor implements Interceptor { @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request().newBuilder() .addHeader("device-android", "xxxxxxxxxxx") .addHeader("country-code", "ZH") .build(); return chain.proceed(request); }}//而后在 OkHttpClient 中退出OkHttpClient client = new OkHttpClient.Builder() .connectTimeout(60, TimeUnit.SECONDS) .readTimeout(15, TimeUnit.SECONDS) .writeTimeout(15, TimeUnit.SECONDS) .cookieJar(new MyCookieJar()) .addInterceptor(new HeaderInterceptor())//增加自定义Header拦截器 .build();
RetryAndFollowUpInterceptor
第二个拦截器,从它的名字也可晓得,它负责申请失败的重试工作与重定向的后续申请工作,同时它会对连贯做一些初始化工作。
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain var request = chain.request val call = realChain.call var followUpCount = 0 var priorResponse: Response? = null var newExchangeFinder = true var recoveredFailures = listOf<IOException>() while (true) { //这里会新建一个ExchangeFinder,ConnectInterceptor会应用到 call.enterNetworkInterceptorExchange(request, newExchangeFinder) var response: Response var closeActiveExchange = true try { if (call.isCanceled()) { throw IOException("Canceled") } try { response = realChain.proceed(request) newExchangeFinder = true } catch (e: RouteException) { //尝试通过路由连贯失败。该申请将不会被发送。 if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) { throw e.firstConnectException.withSuppressed(recoveredFailures) } else { recoveredFailures += e.firstConnectException } newExchangeFinder = false continue } catch (e: IOException) { //尝试与服务器通信失败。该申请可能已发送。 if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } else { recoveredFailures += e } newExchangeFinder = false continue } // Attach the prior response if it exists. Such responses never have a body. //尝试关联上一个response,留神:body是为null if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() } val exchange = call.interceptorScopedExchange //会依据 responseCode 来判断,构建一个新的request并返回来重试或者重定向 val followUp = followUpRequest(response, exchange) if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response } //如果申请体是一次性的,不须要再次重试 val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response } response.body?.closeQuietly() //最大重试次数,不同的浏览器是不同的,比方:Chrome为21,Safari则是16 if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") } request = followUp priorResponse = response } finally { call.exitNetworkInterceptorExchange(closeActiveExchange) } } } /** 判断是否要进行重连,false->不尝试重连;true->尝试重连。*/ private fun recover( e: IOException, call: RealCall, userRequest: Request, requestSendStarted: Boolean ): Boolean { //客户端禁止重试 if (!client.retryOnConnectionFailure) return false //不能再次发送该申请体 if (requestSendStarted && requestIsOneShot(e, userRequest)) return false //产生的异样是致命的,无奈复原,如:ProtocolException if (!isRecoverable(e, requestSendStarted)) return false //没有更多的路由来尝试重连 if (!call.retryAfterFailure()) return false // 对于失败复原,应用带有新连贯的雷同路由选择器 return true }···省略代码···
BridgeInterceptor
从它的名字能够看出,他的定位是客户端与服务器之间的沟通桥梁,负责将用户构建的申请转换为服务器须要的申请,比方:增加 Content-Type
,增加 Cookie
,增加User-Agent
等等。再将服务器返回的response
做一些解决转换为客户端须要的response
。比方:移除响应头中的Content-Encoding
、Content-Length
等等。
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { //获取原始申请数据 val userRequest = chain.request() val requestBuilder = userRequest.newBuilder() //从新构建申请头,申请体信息 val body = userRequest.body val contentType = body.contentType() requestBuilder.header("Content-Type", contentType.toString()) requestBuilder.header("Content-Length", contentLength.toString()) requestBuilder.header("Transfer-Encoding", "chunked") requestBuilder.header("Host", userRequest.url.toHostHeader()) requestBuilder.header("Connection", "Keep-Alive") ···省略代码··· //增加cookie val cookies = cookieJar.loadForRequest(userRequest.url) if (cookies.isNotEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)) } //增加user-agent if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", userAgent) } //从新构建一个Request,而后执行下一个拦截器来解决该申请 val networkResponse = chain.proceed(requestBuilder.build()) cookieJar.receiveHeaders(userRequest.url, networkResponse.headers) //创立一个新的responseBuilder,目标是将原始申请数据构建到response中 val responseBuilder = networkResponse.newBuilder() .request(userRequest) if (transparentGzip && "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) && networkResponse.promisesBody()) { val responseBody = networkResponse.body if (responseBody != null) { val gzipSource = GzipSource(responseBody.source()) val strippedHeaders = networkResponse.headers.newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build() //批改response header信息,移除Content-Encoding,Content-Length信息 responseBuilder.headers(strippedHeaders) val contentType = networkResponse.header("Content-Type") //批改response body信息 responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer())) } } return responseBuilder.build() ···省略代码···
CacheInterceptor
用户能够通过OkHttpClient.cache
来配置缓存,缓存拦截器通过CacheStrategy
来判断是应用网络还是缓存来构建response
。
class CacheInterceptor(internal val cache: Cache?) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val call = chain.call() //通过request从OkHttpClient.cache中获取缓存 val cacheCandidate = cache?.get(chain.request()) val now = System.currentTimeMillis() //创立一个缓存策略,用来确定怎么应用缓存 val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() //为空示意不应用网络,反之,则示意应用网络 val networkRequest = strategy.networkRequest //为空示意不应用缓存,反之,则示意应用缓存 val cacheResponse = strategy.cacheResponse //追踪网络与缓存的应用状况 cache?.trackResponse(strategy) val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE //有缓存但不实用,敞开它 if (cacheCandidate != null && cacheResponse == null) { cacheCandidate.body?.closeQuietly() } //如果网络被禁止,然而缓存又是空的,构建一个code为504的response,并返回 if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build().also { listener.satisfactionFailure(call, it) } } //如果咱们禁用了网络不应用网络,且有缓存,间接依据缓存内容构建并返回response if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } } //为缓存增加监听 if (cacheResponse != null) { listener.cacheConditionalHit(call, cacheResponse) } else if (cache != null) { listener.cacheMiss(call) } var networkResponse: Response? = null try { //责任链往下解决,从服务器返回response 赋值给 networkResponse networkResponse = chain.proceed(networkRequest) } finally { //捕捉I/O或其余异样,申请失败,networkResponse为空,且有缓存的时候,不裸露缓存内容。 if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } } //如果有缓存 if (cacheResponse != null) { //且网络返回response code为304的时候,应用缓存内容新构建一个Response返回。 if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!!.close() // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache!!.trackConditionalCacheHit() cache.update(cacheResponse, response) return response.also { listener.cacheHit(call, it) } } else { //否则敞开缓存响应体 cacheResponse.body?.closeQuietly() } } //构建网络申请的response val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() //如果cache不为null,即用户在OkHttpClient中配置了缓存,则将上一步新构建的网络申请response存到cache中 if (cache != null) { //依据response的code,header以及CacheControl.noStore来判断是否能够缓存 if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // 将该response存入缓存 val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response).also { if (cacheResponse != null) { listener.cacheMiss(call) } } } //依据申请办法来判断缓存是否无效,只对Get申请进行缓存,其它办法的申请则移除 if (HttpMethod.invalidatesCache(networkRequest.method)) { try { //缓存有效,将该申请缓存从client缓存配置中移除 cache.remove(networkRequest) } catch (_: IOException) { // The cache cannot be written. } } } return response } ···省略代码···
ConnectInterceptor
负责实现与服务器真正建设起连贯,
object ConnectInterceptor : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain //初始化一个exchange对象 val exchange = realChain.call.initExchange(chain) //依据这个exchange对象来复制创立一个新的连贯责任链 val connectedChain = realChain.copy(exchange = exchange) //执行该连贯责任链 return connectedChain.proceed(realChain.request) }}
一扫下来,代码非常简略,拦挡办法里就只有三步。
- 初始化一个
exchange
对象。 - 而后依据这个
exchange
对象来复制创立一个新的连贯责任链。 - 执行该连贯责任链。
那这个exchange
对象又是什么呢?
RealCall.ktinternal fun initExchange(chain: RealInterceptorChain): Exchange { ...省略代码... //这里的exchangeFinder就是在RetryAndFollowUpInterceptor中创立的 val exchangeFinder = this.exchangeFinder!! //返回一个ExchangeCodec(是个编码器,为request编码以及为response解码) val codec = exchangeFinder.find(client, chain) //依据exchangeFinder与codec新构建一个Exchange对象,并返回 val result = Exchange(this, eventListener, exchangeFinder, codec) ...省略代码... return result }
具体看看ExchangeFinder.find()
这一步,
ExchangeFinder.ktfun find( client: OkHttpClient, chain: RealInterceptorChain ): ExchangeCodec { try { //查找合格可用的连贯,返回一个 RealConnection 对象 val resultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method != "GET" ) //依据连贯,创立并返回一个申请响应编码器:Http1ExchangeCodec 或者 Http2ExchangeCodec,别离对应Http1协定与Http2协定 return resultConnection.newCodec(client, chain) } catch (e: RouteException) { trackFailure(e.lastConnectException) throw e } catch (e: IOException) { trackFailure(e) throw RouteException(e) } }
持续往下看findHealthyConnection
办法
ExchangeFinder.kt private fun findHealthyConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection { while (true) { //重点:查找连贯 val candidate = findConnection( connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout, pingIntervalMillis = pingIntervalMillis, connectionRetryEnabled = connectionRetryEnabled ) //查看该连贯是否合格可用,合格则间接返回该连贯 if (candidate.isHealthy(doExtensiveHealthChecks)) { return candidate } //如果该连贯不合格,标记为不可用,从连接池中移除 candidate.noNewExchanges() ...省略代码... } }
简略概括一下就是:通过findConnection
办法来查找连贯,找到连贯后判断是否是合格可用的,合格就间接返回该连贯。
所以外围办法就是findConnection
,咱们持续深刻看看该办法:
private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { if (call.isCanceled()) throw IOException("Canceled") //第一次,尝试重连 call 中的 connection,不须要去从新获取连贯 val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! if (callConnection != null) { var toClose: Socket? = null synchronized(callConnection) { if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } } //如果 call 中的 connection 还没有开释,就重用它。 if (call.connection != null) { check(toClose == null) return callConnection } //如果 call 中的 connection 曾经被开释,敞开Socket. toClose?.closeQuietly() eventListener.connectionReleased(call, callConnection) } //须要一个新的连贯,所以重置一些状态 refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0 //第二次,尝试从连接池中获取一个连贯,不领路由,不带多路复用 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } //连接池中是空的,筹备下次尝试连贯的路由 val routes: List<Route>? val route: Route ...省略代码... //第三次,再次尝试从连接池中获取一个连贯,领路由,不带多路复用 if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } route = localRouteSelection.next() } //第四次,手动创立一个新连贯 val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route()) //第五次,再次尝试从连接池中获取一个连贯,领路由,带多路复用。 //这一步次要是为了校验一下,比方曾经有了一条连贯了,就能够间接复用,而不必应用手动创立的新连贯。 if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { val result = call.connection!! nextRouteToTry = route newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result } synchronized(newConnection) { //将手动创立的新连贯放入连接池 connectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) } eventListener.connectionAcquired(call, newConnection) return newConnection }
在代码中能够看出,一共做了5次尝试去失去连贯:
- 第一次,尝试重连 call 中的 connection,不须要去从新获取连贯。
- 第二次,尝试从连接池中获取一个连贯,不领路由,不带多路复用。
- 第三次,再次尝试从连接池中获取一个连贯,领路由,不带多路复用。
- 第四次,手动创立一个新连贯。
- 第五次,再次尝试从连接池中获取一个连贯,领路由,带多路复用。
OK,到了这一步,就算建设起了连贯。
client.networkInterceptors
该拦截器称为网络拦截器,与client.interceptors
一样也是由用户本人定义的,同样是以列表的模式存在OkHttpClient
中。
那这两个拦截器有什么不同呢?
其实他两的不同都是因为他们所处的地位不同所导致的,利用拦截器处于第一个地位,所以无论如何它都会被执行,而且只会执行一次。而网络拦截器处于倒数第二的地位,它不肯定会被执行,而且可能会被执行屡次,比方:在RetryAndFollowUpInterceptor
失败或者CacheInterceptor
间接返回缓存的状况下,咱们的网络拦截器是不会被执行的。
CallServerInterceptor
到了这里,客户端与服务器曾经建设好了连贯,接着就是将申请头与申请体发送给服务器,以及解析服务器返回的response
了。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body var invokeStartEvent = true var responseBuilder: Response.Builder? = null try { //写入申请头 exchange.writeRequestHeaders(request) //如果不是GET申请,并且申请体不为空 if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { //当申请头为"Expect: 100-continue"时,在发送申请体之前须要期待服务器返回"HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送申请体。 //POST申请,先发送申请头,在获取到100持续状态后持续发送申请体 if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { //刷新申请,即发送申请头 exchange.flushRequest() //解析响应头 responseBuilder = exchange.readResponseHeaders(expectContinue = true) exchange.responseHeadersStart() invokeStartEvent = false } //写入申请体 if (responseBuilder == null) { if (requestBody.isDuplex()) { //如果申请体是双公体,就先发送申请头,稍后在发送申请体 exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() //写入申请体 requestBody.writeTo(bufferedRequestBody) } else { //如果获取到了"Expect: 100-continue"响应,写入申请体 val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } ···省略代码··· //申请完结,发送申请体 exchange.finishRequest() ···省略代码··· try { if (responseBuilder == null) { //读取响应头 responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! ···省略代码··· //构建一个response var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code ···省略代码··· return response···省略代码···
简略概括一下:写入发送申请头,而后依据条件是否写入发送申请体,申请完结。解析服务器返回的申请头,而后构建一个新的response
,并返回。 这里CallServerInterceptor
是拦截器责任链中最初一个拦截器了,所以他不会再调用chain.proceed()
办法往下执行,而是将这个构建的response
往上传递给责任链中的每个拦截器。
总结
咱们剖析了申请的流程,包含同步申请与异步申请,还仔细分析了拦截器责任链中的每个拦截器,当初画一个流程图,简略总结一下,你能够对照着流程图,在走一遍流程。
反思
设计模式
- 建造者模式:不论是在
OkHttpClient
、Request
还是Response
中都用到了建造者模式,因为这几个类中都有很多参数,须要供用户抉择须要的参数来构建其想要的实例,所以在开源库中,Build模式
是很常见的。 - 工厂办法模式:帮忙生成简单对象,如:
OkHttpClient.newCall(request Request) 来创立 Call 对象
。 - 责任链模式:这个就用的很绝妙了,将7个拦截器形成拦截器责任链,而后按程序从上往下执行,失去
Response
后,从下往上传回去。
线程平安
在 AsyncCall
类中的 callsPerHost
变量,应用了 Volatile
+ AtomicInteger
来润饰,从而保障在多线程下的线程平安。
inner class AsyncCall( private val responseCallback: Callback ) : Runnable { //同一个域名的申请次数,volatile + AtomicInteger 保障在多线程下及时可见性与原子性 @Volatile var callsPerHost = AtomicInteger(0) private set ...省略代码...
数据结构
为什么readyAsyncCalls
runningAsyncCalls
runningSyncCalls
采纳ArrayDeque
呢?
两个点答复:一、他们都是用来寄存网络申请的,这些申请须要做到先到先得,所以采纳队列。二、依据代码所示,当执行enqueue
时,咱们须要遍历readyAsyncCalls
,将合乎执行条件的Call
退出到runningAsyncCalls
,这绝对比于链表来说,数组的查找效率要更高,所以采纳ArrayDeque
。
结尾
到此,对于OkHttp
的源码解析就介绍啦。
其实学习源码的最好形式,就是本人将代码克隆下来,而后对着应用办法,按流程,一步一步往下走。
其实分享文章的最大目标正是期待着有人指出我的谬误,如果你发现哪里有谬误,请毫无保留的指出即可,虚心求教。 另外,如果你感觉文章不错,对你有所帮忙,请给我点个赞,就当激励,谢谢~Peace~!
视频:
资深架构师逐题详解Android大厂精选高频面试题之OkHttp
Android(安卓)开发零根底从入门到精通之OkHttp
原文: https://juejin.cn/post/7033307467199021086