共计 30165 个字符,预计需要花费 76 分钟才能阅读完成。
前言
本文是对 OkHttp
开源库的一个具体解析,如果你感觉本人不够理解OkHttp
,想进一步学习一下,置信本文对你会有所帮忙。
本文蕴含了具体的申请流程剖析、各大拦截器解读以及本人的一点反思总结,文章很长,欢送大家一起交换探讨。
应用办法
应用办法非常简略,别离创立一个 OkHttpClient
对象,一个 Request
对象,而后利用他们创立一个 Call
对象,最初调用同步申请 execute()
办法或者异步申请 enqueue()
办法来拿到Response
。
private final OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("https://github.com/")
.build();
// 同步申请
Response response = client.newCall(request).execute();
//todo handle response
// 异步申请
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {//todo handle request failed}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {//todo handle Response}
});
根本对象介绍
正如应用办法中所述,咱们先后构建了 OkHttpClient
对象、Request
对象、Call
对象,那这些对象都是什么意思,有什么作用呢?这个就须要咱们进一步学习理解了。
OkHttpClient
一个申请的配置类,采纳了 建造者模式 ,不便用户配置一些申请参数,如配置callTimeout
,cookie
,interceptor
等等。
open class OkHttpClient internal constructor(builder: Builder) : Cloneable, Call.Factory, WebSocket.Factory {constructor() : this(Builder())
class Builder constructor() {
// 调度器
internal var dispatcher: Dispatcher = Dispatcher()
// 连接池
internal var connectionPool: ConnectionPool = ConnectionPool()
// 整体流程拦截器
internal val interceptors: MutableList<Interceptor> = mutableListOf()
// 网络流程拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
// 流程监听器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
// 连贯失败时是否重连
internal var retryOnConnectionFailure = true
// 服务器认证设置
internal var authenticator: Authenticator = Authenticator.NONE
// 是否重定向
internal var followRedirects = true
// 是否从 HTTP 重定向到 HTTPS
internal var followSslRedirects = true
//cookie 设置
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
// 缓存设置
internal var cache: Cache? = null
//DNS 设置
internal var dns: Dns = Dns.SYSTEM
// 代理设置
internal var proxy: Proxy? = null
// 代理选择器设置
internal var proxySelector: ProxySelector? = null
// 代理服务器认证设置
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
//socket 配置
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
//https socket 配置
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
// 协定
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
// 域名校验
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
// 申请超时
internal var callTimeout = 0
// 连贯超时
internal var connectTimeout = 10_000
// 读取超时
internal var readTimeout = 10_000
// 写入超时
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
···省略代码···
Request
同样是申请参数的配置类,也同样采纳了 建造者模式 ,但相比于OkHttpClient
,Request
就非常简略了,只有四个参数,别离是 申请 URL
、申请办法
、 申请头
、 申请体
。
class Request internal constructor(@get:JvmName("url") val url: HttpUrl,
@get:JvmName("method") val method: String,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: RequestBody?,
internal val tags: Map<Class<*>, Any>
) {
open class Builder {
// 申请的 URL
internal var url: HttpUrl? = null
// 申请办法,如:GET、POST..
internal var method: String
// 申请头
internal var headers: Headers.Builder
// 申请体
internal var body: RequestBody? = null
···省略代码···
Call
申请调用接口,示意这个申请曾经筹备好 能够执行 ,也 能够勾销 , 只能执行一次。
interface Call : Cloneable {
/** 返回发动此调用的原始申请 */
fun request(): Request
/**
* 同步申请,立刻执行。*
* 抛出两种异样:* 1. 申请失败抛出 IOException;
* 2. 如果在执行过一回的前提下再次执行抛出 IllegalStateException;*/
@Throws(IOException::class)
fun execute(): Response
/**
* 异步申请,将申请安顿在未来的某个工夫点执行。* 如果在执行过一回的前提下再次执行抛出 IllegalStateException */
fun enqueue(responseCallback: Callback)
/** 勾销申请。曾经实现的申请不能被勾销 */
fun cancel()
/** 是否已被执行 */
fun isExecuted(): Boolean
/** 是否被勾销 */
fun isCanceled(): Boolean
/** 一个残缺 Call 申请流程的超时工夫配置,默认选自[OkHttpClient.Builder.callTimeout] */
fun timeout(): Timeout
/** 克隆这个 call,创立一个新的雷同的 Call */
public override fun clone(): Call
/** 利用工厂模式来让 OkHttpClient 来创立 Call 对象 */
fun interface Factory {fun newCall(request: Request): Call
}
}
RealCall
在 OkHttpClient
中,咱们利用 newCall
办法来创立一个 Call
对象,但从源码中能够看出,newCall
办法返回的是一个 RealCall
对象。
OkHttpClient.kt
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
RealCall
是 Call 接口
的具体实现类,是利用端与网络层的连贯桥,展现利用端原始的申请与连贯数据,以及网络层返回的 response
及其它数据流。通过应用办法也可知,创立 RealCall
对象后,就要调用同步或异步申请办法,所以它外面还蕴含 同步申请 execute()
与 异步申请 enqueue()
办法。(前面具体开展剖析)
AsyncCall
异步申请调用,是 RealCall
的一个外部类,就是一个Runnable
,被调度器中的线程池所执行。
inner class AsyncCall(
// 用户传入的响应回调办法
private val responseCallback: Callback
) : Runnable {
// 同一个域名的申请次数,volatile + AtomicInteger 保障在多线程下及时可见性与原子性
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {this.callsPerHost = other.callsPerHost}
···省略代码···
fun executeOn(executorService: ExecutorService) {client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 调用线程池执行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
// 申请失败,调用 Callback.onFailure() 办法
responseCallback.onFailure(this@RealCall, ioException)
} finally {if (!success) {
// 申请失败,调用调度器 finish 办法
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
// 申请胜利,获取到服务器返回的 response
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 调用 Callback.onResponse() 办法,将 response 传递进来
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {// 申请失败,调用 Callback.onFailure() 办法
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
// 申请出现异常,调用 cancel 办法来勾销申请
cancel()
if (!signalledCallback) {val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// 申请失败,调用 Callback.onFailure() 办法
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
// 申请完结,调用调度器 finish 办法
client.dispatcher.finished(this)
}
}
}
}
Dispatcher
调度器,用来调度 Call
对象,同时蕴含线程池与异步申请队列,用来寄存与执行 AsyncCall
对象。
class Dispatcher constructor() {
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {if (executorServiceOrNull == null) {
// 创立一个缓存线程池,来解决申请调用
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
/** 已筹备好的异步申请队列 */
@get:Synchronized
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在运行的异步申请队列, 蕴含勾销然而还未 finish 的 AsyncCall */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在运行的同步申请队列, 蕴含勾销然而还未 finish 的 RealCall */
private val runningSyncCalls = ArrayDeque<RealCall>()
···省略代码···
}
总结一下
对象 | 作用 |
---|---|
Call |
申请调用接口,示意这个申请曾经筹备好能够执行,也能够被勾销,只能执行一次。 |
RealCall |
Call 接口的具体实现类,是利用与网络层之间的连贯桥,蕴含 OkHttpClient 与Request 信息。 |
AsyncCall |
异步申请调用,其实就是个Runnable ,会被放到线程池中进行解决。 |
Dispatcher |
调度器,用来调度 Call 对象,同时蕴含线程池与异步申请队列,用来寄存与执行 AsyncCall 对象。 |
Request |
申请类,蕴含url 、method 、headers 、body 。 |
Response |
网络层返回的响应数据。 |
Callback |
响应回调函数接口,蕴含onFailure 、onResponse 两个办法。 |
流程剖析
介绍完了对象,接下来就依据应用办法,具体看一下源码吧。
同步申请
同步申请的应用办法。
client.newCall(request).execute();
newCall
办法就是创立一个 RealCall
对象,而后执行其 execute()
办法。
RealCall.kt
override fun execute(): Response {
//CAS 判断是否曾经被执行了, 确保只能执行一次,如果曾经执行过,则抛出异样
check(executed.compareAndSet(false, true)) {"Already Executed"}
// 申请超时开始计时
timeout.enter()
// 开启申请监听
callStart()
try {// 调用调度器中的 executed() 办法,调度器只是将 call 退出到了 runningSyncCalls 队列中
client.dispatcher.executed(this)
// 调用 getResponseWithInterceptorChain 办法拿到 response
return getResponseWithInterceptorChain()} finally {
// 执行结束,调度器将该 call 从 runningSyncCalls 队列中移除
client.dispatcher.finished(this)
}
}
调用调度器 executed
办法,就是将以后的 RealCall
对象退出到 runningSyncCalls
队列中,而后调用 getResponseWithInterceptorChain
办法拿到response
。
异步申请
在来看看异步申请。
RealCall.kt
override fun enqueue(responseCallback: Callback) {
//CAS 判断是否曾经被执行了, 确保只能执行一次,如果曾经执行过,则抛出异样
check(executed.compareAndSet(false, true)) {"Already Executed"}
// 开启申请监听
callStart()
// 新建一个 AsyncCall 对象,通过调度器 enqueue 办法退出到 readyAsyncCalls 队列中
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
而后调用调度器的 enqueue
办法,
Dispatcher.kt
internal fun enqueue(call: AsyncCall) {
// 加锁,保障线程平安
synchronized(this) {
// 将该申请调用退出到 readyAsyncCalls 队列中
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
// 通过域名来查找有没有雷同域名的申请,有则复用。val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 执行申请
promoteAndExecute()}
private fun promoteAndExecute(): Boolean {this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
// 判断是否有申请正在执行
val isRunning: Boolean
// 加锁,保障线程平安
synchronized(this) {
// 遍历 readyAsyncCalls 队列
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {val asyncCall = i.next()
//runningAsyncCalls 的数量不能大于最大并发申请数 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
// 同域名最大申请数 5,同一个域名最多容许 5 条线程同时执行申请
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
// 从 readyAsyncCalls 队列中移除,并退出到 executableCalls 及 runningAsyncCalls 队列中
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
// 通过运行队列中的申请数量来判断是否有申请正在执行
isRunning = runningCallsCount() > 0}
// 遍历可执行队列,调用线程池来执行 AsyncCall
for (i in 0 until executableCalls.size) {val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
调度器的 enqueue
办法就是将 AsyncCall
退出到 readyAsyncCalls
队列中,而后调用 promoteAndExecute
办法来执行申请,promoteAndExecute
办法做的其实就是遍历 readyAsyncCalls
队列,而后将符合条件的申请用线程池执行,也就是会执行 AsyncCall.run()
办法。
AsyncCall 办法的具体代码看根本对象介绍 AsyncCall,这边就不在此展现了,简略来说就是调用 getResponseWithInterceptorChain
办法拿到 response
,而后通过Callback.onResponse
办法传递进来。反之,如果申请失败,捕捉了异样,就通过 Callback.onFailure
将异样信息传递进来。最终,申请完结,调用调度器 finish
办法。
Dispatcher.kt
/** 异步申请调用完结办法 */
internal fun finished(call: AsyncCall) {call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/** 同步申请调用完结办法 */
internal fun finished(call: RealCall) {finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
// 将以后申请调用从 正在运行队列 中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
// 继续执行残余申请,将 call 从 readyAsyncCalls 中取出退出到 runningAsyncCalls,而后执行
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
// 如果执行完了所有申请,处于闲置状态,调用闲置回调办法
idleCallback.run()}
}
获取 Response
接着就是看看 getResponseWithInterceptorChain
办法是如何拿到 response
的。
internal fun getResponseWithInterceptorChain(): Response {
// 拦截器列表
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {interceptors += client.networkInterceptors}
interceptors += CallServerInterceptor(forWebSocket)
// 构建拦截器责任链
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
// 如果 call 申请实现,那就意味着交互实现了,没有更多的货色来替换了
var calledNoMoreExchanges = false
try {
// 执行拦截器责任链来获取 response
val response = chain.proceed(originalRequest)
// 如果被勾销,敞开响应,抛出异样
if (isCanceled()) {response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {if (!calledNoMoreExchanges) {noMoreExchanges(null)
}
}
}
简略概括一下:这里采纳了 责任链设计模式
,通过拦截器构建了以RealInterceptorChain
责任链,而后执行 proceed
办法来失去response
。
那么,这又波及 拦截器 是什么?拦截器责任链 又是什么?
Interceptor
只申明了一个拦截器办法,在子类中具体实现,还蕴含一个 Chain
接口,外围办法是 proceed(request)
解决申请来获取response
。
fun interface Interceptor {
/** 拦挡办法 */
@Throws(IOException::class)
fun intercept(chain: Chain): Response
interface Chain {
/** 原始申请数据 */
fun request(): Request
/** 外围办法,解决申请,获取 response */
@Throws(IOException::class)
fun proceed(request: Request): Response
fun connection(): Connection?
fun call(): Call
fun connectTimeoutMillis(): Int
fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
fun readTimeoutMillis(): Int
fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
fun writeTimeoutMillis(): Int
fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
}
}
RealInterceptorChain
拦截器链就是实现 Interceptor.Chain
接口,重点就是复写的 proceed
办法。
class RealInterceptorChain(
internal val call: RealCall,
private val interceptors: List<Interceptor>,
private val index: Int,
internal val exchange: Exchange?,
internal val request: Request,
internal val connectTimeoutMillis: Int,
internal val readTimeoutMillis: Int,
internal val writeTimeoutMillis: Int
) : Interceptor.Chain {
···省略代码···
private var calls: Int = 0
override fun call(): Call = call
override fun request(): Request = request
@Throws(IOException::class)
override fun proceed(request: Request): Response {check(index < interceptors.size)
calls++
if (exchange != null) {check(exchange.finder.sameHostAndPort(request.url)) {"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}
}
//index+1, 复制创立新的责任链,也就意味着调用责任链中的下一个解决者,也就是下一个拦截器
val next = copy(index = index + 1, request = request)
// 取出以后拦截器
val interceptor = interceptors[index]
// 执行以后拦截器的拦挡办法
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")
if (exchange != null) {check(index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) {"interceptor $interceptor returned a response with no body"}
return response
}
}
链式调用,最终会执行拦截器列表中的每个拦截器,返回Response
。
拦截器
OK,接下来就该看看拦截器列表中的具体拦截器了。
先上各类拦截器的总结,按程序:
client.interceptors
:这是由开发者设置的,会在所有的拦截器解决之前进行 最早 的拦挡解决,可用于增加一些公共参数,如自定义 header
、自定义 log
等等。RetryAndFollowUpInterceptor
:这里会对连贯做一些初始化工作,以及申请失败的重试工作,重定向的后续申请工作。跟他的名字一样,就是做重试工作还有一些连贯跟踪工作。BridgeInterceptor
:是客户端与服务器之间的沟通桥梁,负责将用户构建的申请转换为服务器须要的申请,以及将网络申请返回回来的响应转换为用户可用的响应。CacheInterceptor
:这里次要是缓存的相干解决,会依据用户在OkHttpClient
里定义的缓存配置,而后联合申请新建一个缓存策略,由它来判断是应用网络还是缓存来构建response
。ConnectInterceptor
:这里次要就是负责建设连贯,会建设TCP 连贯
或者TLS 连贯
。client.networkInterceptors
:这里也是开发者本人设置的,所以实质上和第一个拦截器差不多,然而因为地位不同,所以用途也不同。CallServerInterceptor
:这里就是进行网络数据的申请和响应了,也就是理论的网络 I / O 操作,将申请头与申请体发送给服务器,以及解析服务器返回的response
。
接下来咱们按程序,从上往下,对这些拦截器进行一一解读。
client.interceptors
这是用户本人定义的拦截器,称为 利用拦截器 ,会保留在OkHttpClient
的interceptors: List<Interceptor>
列表中。他是拦截器责任链中的 第一个拦截器 ,也就是说会第一个执行拦挡办法,咱们能够通过它来增加 自定义 Header 信息
,如:
class HeaderInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {Request request = chain.request().newBuilder()
.addHeader("device-android", "xxxxxxxxxxx")
.addHeader("country-code", "ZH")
.build();
return chain.proceed(request);
}
}
// 而后在 OkHttpClient 中退出
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(15, TimeUnit.SECONDS)
.writeTimeout(15, TimeUnit.SECONDS)
.cookieJar(new MyCookieJar())
.addInterceptor(new HeaderInterceptor())// 增加自定义 Header 拦截器
.build();
RetryAndFollowUpInterceptor
第二个拦截器,从它的名字也可晓得,它负责申请失败的重试工作与重定向的后续申请工作,同时它会对连贯做一些初始化工作。
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
// 这里会新建一个 ExchangeFinder,ConnectInterceptor 会应用到
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {if (call.isCanceled()) {throw IOException("Canceled")
}
try {response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
// 尝试通过路由连贯失败。该申请将不会被发送。if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {recoveredFailures += e.firstConnectException}
newExchangeFinder = false
continue
} catch (e: IOException) {
// 尝试与服务器通信失败。该申请可能已发送。if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {throw e.withSuppressed(recoveredFailures)
} else {recoveredFailures += e}
newExchangeFinder = false
continue
}
// Attach the prior response if it exists. Such responses never have a body.
// 尝试关联上一个 response,留神:body 是为 null
if (priorResponse != null) {response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()}
val exchange = call.interceptorScopedExchange
// 会依据 responseCode 来判断,构建一个新的 request 并返回来重试或者重定向
val followUp = followUpRequest(response, exchange)
if (followUp == null) {if (exchange != null && exchange.isDuplex) {call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
// 如果申请体是一次性的,不须要再次重试
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
// 最大重试次数,不同的浏览器是不同的,比方:Chrome 为 21,Safari 则是 16
if (++followUpCount > MAX_FOLLOW_UPS) {throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
} finally {call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
/** 判断是否要进行重连,false-> 不尝试重连;true-> 尝试重连。*/
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
// 客户端禁止重试
if (!client.retryOnConnectionFailure) return false
// 不能再次发送该申请体
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
// 产生的异样是致命的,无奈复原,如:ProtocolException
if (!isRecoverable(e, requestSendStarted)) return false
// 没有更多的路由来尝试重连
if (!call.retryAfterFailure()) return false
// 对于失败复原,应用带有新连贯的雷同路由选择器
return true
}
···省略代码···
BridgeInterceptor
从它的名字能够看出,他的定位是客户端与服务器之间的沟通桥梁,负责将用户构建的申请转换为服务器须要的申请,比方:增加 Content-Type
,增加 Cookie
,增加 User-Agent
等等。再将服务器返回的 response
做一些解决转换为客户端须要的 response
。比方:移除响应头中的Content-Encoding
、Content-Length
等等。
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// 获取原始申请数据
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
// 从新构建申请头,申请体信息
val body = userRequest.body
val contentType = body.contentType()
requestBuilder.header("Content-Type", contentType.toString())
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.header("Host", userRequest.url.toHostHeader())
requestBuilder.header("Connection", "Keep-Alive")
···省略代码···
// 增加 cookie
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {requestBuilder.header("Cookie", cookieHeader(cookies))
}
// 增加 user-agent
if (userRequest.header("User-Agent") == null) {requestBuilder.header("User-Agent", userAgent)
}
// 从新构建一个 Request,而后执行下一个拦截器来解决该申请
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
// 创立一个新的 responseBuilder,目标是将原始申请数据构建到 response 中
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
// 批改 response header 信息,移除 Content-Encoding,Content-Length 信息
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
// 批改 response body 信息
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
···省略代码···
CacheInterceptor
用户能够通过 OkHttpClient.cache
来配置缓存,缓存拦截器通过 CacheStrategy
来判断是应用网络还是缓存来构建response
。
class CacheInterceptor(internal val cache: Cache?) : Interceptor {@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {val call = chain.call()
// 通过 request 从 OkHttpClient.cache 中获取缓存
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// 创立一个缓存策略,用来确定怎么应用缓存
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
// 为空示意不应用网络,反之,则示意应用网络
val networkRequest = strategy.networkRequest
// 为空示意不应用缓存,反之,则示意应用缓存
val cacheResponse = strategy.cacheResponse
// 追踪网络与缓存的应用状况
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
// 有缓存但不实用,敞开它
if (cacheCandidate != null && cacheResponse == null) {cacheCandidate.body?.closeQuietly()
}
// 如果网络被禁止,然而缓存又是空的,构建一个 code 为 504 的 response,并返回
if (networkRequest == null && cacheResponse == null) {return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {listener.satisfactionFailure(call, it)
}
}
// 如果咱们禁用了网络不应用网络,且有缓存,间接依据缓存内容构建并返回 response
if (networkRequest == null) {return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {listener.cacheHit(call, it)
}
}
// 为缓存增加监听
if (cacheResponse != null) {listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
// 责任链往下解决,从服务器返回 response 赋值给 networkResponse
networkResponse = chain.proceed(networkRequest)
} finally {
// 捕捉 I / O 或其余异样,申请失败,networkResponse 为空,且有缓存的时候,不裸露缓存内容。if (networkResponse == null && cacheCandidate != null) {cacheCandidate.body?.closeQuietly()
}
}
// 如果有缓存
if (cacheResponse != null) {
// 且网络返回 response code 为 304 的时候,应用缓存内容新构建一个 Response 返回。if (networkResponse?.code == HTTP_NOT_MODIFIED) {val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {listener.cacheHit(call, it)
}
} else {
// 否则敞开缓存响应体
cacheResponse.body?.closeQuietly()}
}
// 构建网络申请的 response
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
// 如果 cache 不为 null,即用户在 OkHttpClient 中配置了缓存,则将上一步新构建的网络申请 response 存到 cache 中
if (cache != null) {
// 依据 response 的 code,header 以及 CacheControl.noStore 来判断是否能够缓存
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 将该 response 存入缓存
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {if (cacheResponse != null) {listener.cacheMiss(call)
}
}
}
// 依据申请办法来判断缓存是否无效,只对 Get 申请进行缓存,其它办法的申请则移除
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
// 缓存有效,将该申请缓存从 client 缓存配置中移除
cache.remove(networkRequest)
} catch (_: IOException) {// The cache cannot be written.}
}
}
return response
}
···省略代码···
ConnectInterceptor
负责实现与服务器真正建设起连贯,
object ConnectInterceptor : Interceptor {@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
// 初始化一个 exchange 对象
val exchange = realChain.call.initExchange(chain)
// 依据这个 exchange 对象来复制创立一个新的连贯责任链
val connectedChain = realChain.copy(exchange = exchange)
// 执行该连贯责任链
return connectedChain.proceed(realChain.request)
}
}
一扫下来,代码非常简略,拦挡办法里就只有三步。
- 初始化一个
exchange
对象。 - 而后依据这个
exchange
对象来复制创立一个新的连贯责任链。 - 执行该连贯责任链。
那这个 exchange
对象又是什么呢?
RealCall.kt
internal fun initExchange(chain: RealInterceptorChain): Exchange {
... 省略代码...
// 这里的 exchangeFinder 就是在 RetryAndFollowUpInterceptor 中创立的
val exchangeFinder = this.exchangeFinder!!
// 返回一个 ExchangeCodec(是个编码器,为 request 编码以及为 response 解码)val codec = exchangeFinder.find(client, chain)
// 依据 exchangeFinder 与 codec 新构建一个 Exchange 对象,并返回
val result = Exchange(this, eventListener, exchangeFinder, codec)
... 省略代码...
return result
}
具体看看 ExchangeFinder.find()
这一步,
ExchangeFinder.kt
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
// 查找合格可用的连贯,返回一个 RealConnection 对象
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
// 依据连贯,创立并返回一个申请响应编码器:Http1ExchangeCodec 或者 Http2ExchangeCodec,别离对应 Http1 协定与 Http2 协定
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {trackFailure(e)
throw RouteException(e)
}
}
持续往下看 findHealthyConnection
办法
ExchangeFinder.kt
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {while (true) {
// 重点:查找连贯
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// 查看该连贯是否合格可用,合格则间接返回该连贯
if (candidate.isHealthy(doExtensiveHealthChecks)) {return candidate}
// 如果该连贯不合格,标记为不可用,从连接池中移除
candidate.noNewExchanges()
... 省略代码...
}
}
简略概括一下就是:通过 findConnection
办法来查找连贯,找到连贯后判断是否是合格可用的,合格就间接返回该连贯。
所以外围办法就是findConnection
,咱们持续深刻看看该办法:
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {if (call.isCanceled()) throw IOException("Canceled")
// 第一次,尝试重连 call 中的 connection,不须要去从新获取连贯
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {toClose = call.releaseConnectionNoEvents()
}
}
// 如果 call 中的 connection 还没有开释,就重用它。if (call.connection != null) {check(toClose == null)
return callConnection
}
// 如果 call 中的 connection 曾经被开释,敞开 Socket.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
// 须要一个新的连贯,所以重置一些状态
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// 第二次,尝试从连接池中获取一个连贯,不领路由,不带多路复用
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// 连接池中是空的,筹备下次尝试连贯的路由
val routes: List<Route>?
val route: Route
... 省略代码...
// 第三次,再次尝试从连接池中获取一个连贯,领路由,不带多路复用
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()}
// 第四次,手动创立一个新连贯
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {call.connectionToCancel = null}
call.client.routeDatabase.connected(newConnection.route())
// 第五次,再次尝试从连接池中获取一个连贯,领路由,带多路复用。// 这一步次要是为了校验一下,比方曾经有了一条连贯了,就能够间接复用,而不必应用手动创立的新连贯。if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
// 将手动创立的新连贯放入连接池
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
在代码中能够看出,一共做了 5 次尝试去失去连贯:
- 第一次,尝试重连 call 中的 connection,不须要去从新获取连贯。
- 第二次,尝试从连接池中获取一个连贯,不领路由,不带多路复用。
- 第三次,再次尝试从连接池中获取一个连贯,领路由,不带多路复用。
- 第四次,手动创立一个新连贯。
- 第五次,再次尝试从连接池中获取一个连贯,领路由,带多路复用。
OK,到了这一步,就算建设起了连贯。
client.networkInterceptors
该拦截器称为 网络拦截器 ,与client.interceptors
一样也是由用户本人定义的,同样是以列表的模式存在 OkHttpClient
中。
那这两个拦截器有什么不同呢?
其实他两的不同都是因为他们所处的地位不同所导致的,利用拦截器处于第一个地位,所以无论如何它 都会被执行,而且只会执行一次 。而网络拦截器处于倒数第二的地位,它 不肯定会被执行,而且可能会被执行屡次 ,比方:在RetryAndFollowUpInterceptor
失败或者 CacheInterceptor
间接返回缓存的状况下,咱们的网络拦截器是不会被执行的。
CallServerInterceptor
到了这里,客户端与服务器曾经建设好了连贯,接着就是将申请头与申请体发送给服务器,以及解析服务器返回的 response
了。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
try {
// 写入申请头
exchange.writeRequestHeaders(request)
// 如果不是 GET 申请,并且申请体不为空
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 当申请头为 "Expect: 100-continue" 时,在发送申请体之前须要期待服务器返回 "HTTP/1.1 100 Continue" 的 response,如果没有等到该 response,就不发送申请体。//POST 申请,先发送申请头,在获取到 100 持续状态后持续发送申请体
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
// 刷新申请,即发送申请头
exchange.flushRequest()
// 解析响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
// 写入申请体
if (responseBuilder == null) {if (requestBody.isDuplex()) {
// 如果申请体是双公体,就先发送申请头,稍后在发送申请体
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
// 写入申请体
requestBody.writeTo(bufferedRequestBody)
} else {
// 如果获取到了 "Expect: 100-continue" 响应,写入申请体
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()}
···省略代码···
// 申请完结,发送申请体
exchange.finishRequest()
···省略代码···
try {if (responseBuilder == null) {
// 读取响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
···省略代码···
// 构建一个 response
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
···省略代码···
return response
···省略代码···
简略概括一下:写入发送申请头,而后依据条件是否写入发送申请体,申请完结。解析服务器返回的申请头,而后构建一个新的 response
,并返回。这里CallServerInterceptor
是拦截器责任链中最初一个拦截器了,所以他不会再调用 chain.proceed()
办法往下执行,而是将这个构建的 response
往上传递给责任链中的每个拦截器。
总结
咱们剖析了申请的流程,包含同步申请与异步申请,还仔细分析了拦截器责任链中的每个拦截器,当初画一个流程图,简略总结一下,你能够对照着流程图,在走一遍流程。
反思
设计模式
- 建造者模式 :不论是在
OkHttpClient
、Request
还是Response
中都用到了建造者模式,因为这几个类中都有很多参数,须要供用户抉择须要的参数来构建其想要的实例,所以在开源库中,Build 模式
是很常见的。 - 工厂办法模式:帮忙生成简单对象,如:
OkHttpClient.newCall(request Request) 来创立 Call 对象
。 - 责任链模式 :这个就用的很绝妙了,将 7 个拦截器形成拦截器责任链,而后按程序从上往下执行,失去
Response
后,从下往上传回去。
线程平安
在 AsyncCall
类中的 callsPerHost
变量,应用了 Volatile
+ AtomicInteger
来润饰,从而保障在多线程下的线程平安。
inner class AsyncCall(private val responseCallback: Callback) : Runnable {
// 同一个域名的申请次数,volatile + AtomicInteger 保障在多线程下及时可见性与原子性
@Volatile var callsPerHost = AtomicInteger(0)
private set
... 省略代码...
数据结构
为什么
readyAsyncCalls
runningAsyncCalls
runningSyncCalls
采纳ArrayDeque
呢?
两个点答复 : 一、他们都是用来寄存网络申请的,这些申请须要做到先到先得,所以采纳队列。二 、依据代码所示,当执行enqueue
时,咱们须要遍历 readyAsyncCalls
,将合乎执行条件的Call
退出到runningAsyncCalls
,这绝对比于链表来说,数组的查找效率要更高,所以采纳ArrayDeque
。
结尾
到此,对于 OkHttp
的源码解析就介绍啦。
其实学习源码的最好形式,就是本人将代码克隆下来,而后对着应用办法,按流程,一步一步往下走。
其实分享文章的最大目标正是期待着有人指出我的谬误,如果你发现哪里有谬误,请毫无保留的指出即可,虚心求教。另外,如果你感觉文章不错,对你有所帮忙,请给我点个赞,就当激励,谢谢~Peace~!
视频:
资深架构师逐题详解 Android 大厂精选高频面试题之 OkHttp
Android(安卓)开发零根底从入门到精通之 OkHttp
原文:https://juejin.cn/post/7033307467199021086