OkHttp(三)
前两篇文章,讲述了 OkHttp 的根底的应用与申请的调度状况,而明天就让咱们来看看 OkHttp 的精华之一 - 责任链模式。
责任链模式
后面的文章中咱们看到,当理论进行网络申请时,无论是同步申请还是异步申请都会应用getResponseWithInterceptorChain() 这个办法,所以咱们先从这个办法开始钻研。
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
// 增加各种拦截器 这个前面逐个介绍
val interceptors = mutableListOf<Interceptor>()
// 自定义的一个拦截器
interceptors += client.interceptors
// 零碎内置的拦截器
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {interceptors += client.networkInterceptors}
interceptors += CallServerInterceptor(forWebSocket)
// 创立责任链
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
// 执行责任链
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {if (!calledNoMoreExchanges) {transmitter.noMoreExchanges(null)
}
}
}
咱们能够看到,该办法中将拦截器逐个增加汇合中,并创立了一个责任链,用 chain.proceed()办法来执行申请。
OkHttp 采纳 责任链的模式 来使每个性能离开,每个 Interceptor 自行实现本人的工作,并且将不属于本人的工作交给下一个,简化了各自的责任和逻辑。
接下来看看 proceed 的办法
override fun proceed(request: Request): Response {return proceed(request, transmitter, exchange)
}
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {if (index >= interceptors.size) throw AssertionError()
calls++
...
// 获取下一个拦截器,链中的拦截器汇合 index+1
// Call the next interceptor in the chain.
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
// 执行以后的拦截器 - 如果在配置 okhttpClient,时没有设置 intercept 默认是先执行:retryAndFollowUpInterceptor 拦截器 `
val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")
...
return response
}
在该办法中咱们能够看到递归调用了下一个拦截器,当所有拦截器调用结束后,返回咱们所得的 Response。每个拦截器都重写了 intercept()办法,用以执行申请。
责任链的一个执行过程如下图
接下来让咱们剖析默认责任链的一个作用,并作出一些源码剖析。
RetryAndFollowUpInterceptor
其创立过程是在 构建 newCall 对象时
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
...
@Override public Call newCall(Request request) {return RealCall.newRealCall(this, request, false /* for web socket */);
}
简略看一下应用的过程
首先创立了 transmitter 对象,他封装了网络申请相干的信息:连接池,地址信息,网络申请,事件回调,负责网络连接的连贯、敞开,开释等操作。
var request = chain.request()
val realChain = chain as RealInterceptorChain
val transmitter = realChain.transmitter()
而后则进入了网络连接的循环
// 计数器 屡次相应的次数是由限度的,不同浏览器举荐的次数不同,还特别强调了 HTTP 1.0 协定举荐 5 次。var followUpCount = 0
var priorResponse: Response? = null
while (true) {
// 筹备连贯
transmitter.prepareToConnect(request)
if (transmitter.isCanceled) {throw IOException("Canceled")
}
var response: Response
var success = false
try {
// 失去最终的后果
response = realChain.proceed(request, transmitter, null)
success = true
} catch (e: RouteException) {
// The attempt to connect via a route failed. The request will not have been sent.
// 连贯地址的异样,判断是否能可能复原,也就是是否要重试
if (!recover(e.lastConnectException, transmitter, false, request)) {throw e.firstConnectException}
continue
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
// 连贯服务器的异样 判断网络申请是否曾经开始
val requestSendStarted = e !is ConnectionShutdownException
// 同上
if (!recover(e, transmitter, requestSendStarted, request)) throw e
continue
} finally {
// The network call threw an exception. Release any resources.
// 开释资源
if (!success) {transmitter.exchangeDoneDueToException()
}
}
// Attach the prior response if it exists. Such responses never have a body.
// 如果不为空保留到 Response 中
if (priorResponse != null) {response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()}
val exchange = response.exchange
val route = exchange?.connection()?.route()
// 判断返回后果 response,是否须要持续欠缺申请,例如证书验证等等
val followUp = followUpRequest(response, route)
// 如果不须要持续欠缺网络申请,返回 response
if (followUp == null) {if (exchange != null && exchange.isDuplex) {transmitter.timeoutEarlyExit()
}
return response
}
// 如果 body 内容只能发送一次 间接放回
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {return response}
response.body?.closeQuietly()
if (transmitter.hasExchange()) {exchange?.detachWithViolence()
}
// 如果曾经超过最大的网络申请追加数,开释连贯,抛出协定异样
if (++followUpCount > MAX_FOLLOW_UPS) {throw ProtocolException("Too many follow-up requests: $followUpCount")
}
// 更新下一次的网络申请对象
request = followUp
// 保留上一次的申请后果
priorResponse = response
}
而后就是重试阶段 recover()的源码了
/**
* Report and attempt to recover from a failure to communicate with a server. Returns true if
* `e` is recoverable, or false if the failure is permanent. Requests with a body can only
* be recovered if the body is buffered or if the failure occurred before the request has been
* sent.
*/
private fun recover(
e: IOException,
transmitter: Transmitter,
requestSendStarted: Boolean,
userRequest: Request
): Boolean {
// The application layer has forbidden retries.
// 设置了不须要重试
if (!client.retryOnConnectionFailure) return false
// We can't send the request body again.
// body 内容只能发送一次
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
// This exception is fatal.
// 判断异样类型,是否要持续尝试,// 不会重试的类型:协定异样、Socketet 异样并且网络状况还没开始,ssl 认证异样
if (!isRecoverable(e, requestSendStarted)) return false
// No more routes to attempt.
// 曾经没有其余可用的路由地址了
if (!transmitter.canRetry()) return false
// For failure recovery, use the same route selector with a new connection.
// 其余放回 true
return true
}
咱们略微屡一下下面源码的流程:
- 首先应用了 transmitter 对象(重要),用以提供相应的网络连接相干的货色
-
而后开始连贯,而后又有着几种状况
- 连贯胜利,且无后续操作(如认证等),间接放回
- 连贯胜利,且有后续操作,则进入下一次循环
- 连贯失败,RouteException 和 IOException 异样,利用 recover()判断是否重试,不要重试则开释资源,要重试则 continue;
- 连贯胜利,然而重试的次数超过限度,则有问题(能够本人创立拦截器来批改重试次数)。
BridgeIntecepter
这个拦截器的性能较为的简略,申请之前对响应头做了一些查看,并增加一些头,而后在申请之后对响应做一些解决(gzip 解压 or 设置 cookie)。
还是让咱们看一下源码。
override fun intercept(chain: Interceptor.Chain): Response {val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
// 如果咱们有 RequestBody, 会写一些 header 信息,如内容长度和内容类型等
val body = userRequest.body
if (body != null) {...}
// 对一些必要的属性进行补充
if (userRequest.header("Host") == null) {requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {requestBuilder.header("Connection", "Keep-Alive")
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
// 默认的编码格局 gzip
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
// 把之前的 cookie 存在 header 里
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {requestBuilder.header("User-Agent", userAgent)
}
// 失去 Response
val networkResponse = chain.proceed(requestBuilder.build())
// 保留新的 cookie
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
// 如果应用的 gzip 编码,并且返回的 response 有 body 信息,对做相应的解决
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()}
CacheIntecepter
在看这个拦截器的源码之前,咱们还得关注一件事件,OkHttp 的缓存是怎么缓存的呢?
OkHttp 中的Cache 类,采纳了 DiskLruCache,外部应用最近起码应用算法,优先淘汰最近工夫内起码次应用的缓存对象,它只有硬存缓存,并没有内存缓存,这是他缓存机制的一大缺点,当然咱们能够通过自定义缓存机制来解决这一问题。
在 OkHttp 中还存在一个 缓存策略 CacheStrategy
CacheStrategy 的外部工厂类 Factory 中有一个 getCandidate 办法,会依据理论的申请生成对应的 CacheStrategy 类返回,是个典型的简略工厂模式。其外部保护一个 request 和 response,通过指定 request 和 response 来通知 CacheInterceptor 是应用缓存还是应用网络申请,亦或两者同时应用。
理解完之后,咱们来看源码:
override fun intercept(chain: Interceptor.Chain): Response {
1. 如果设置缓存并且以后 request 有缓存,则从缓存 Cache 中获取以后申请 request 的缓存 response
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// 2. 传入的申请 request 和获取的缓存 response 通过缓存策略对象 CacheStragy 的工厂类 get 办法依据一些规定获取缓存策略 CacheStrategy
//(这里的规定依据申请的 request 和缓存的 Response 的 header 头部信息生成的,比方是否有 noCache 标记位,是否是 immutable 不可变,缓存是否过期等等)val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
// 3. 生成的 CacheStrategy 有 2 个变量,networkRequest 和 cacheRequest,如果 networkRequest 为 Null 示意不进行网络申请,如果 cacheResponse 为 null,则示意没有无效缓存
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
// 4. 缓存不可用,敞开
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()}
// If we're forbidden from using the network and the cache is insufficient, fail.
// 5. 如果 networkRequest 和 cacheResponse 都为 Null, 则示意不申请网络且缓存为 null,返回 504,申请失败
if (networkRequest == null && cacheResponse == null) {return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()}
// If we don't need the network, we're done.
// 6. 如果不申请网络,但存在缓存,则不申请网络,间接返回缓存,完结,不执行下一个拦截器
if (networkRequest == null) {return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()}
// 7. 否则,申请网络,并调用下一个拦截器链,将申请转发到下一个拦截器
var networkResponse: Response? = null
try {networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {cacheCandidate.body?.closeQuietly()
}
}
//8. 申请网络,并且网络申请返回 HTTP_NOT_MODIFIED,阐明缓存无效,则合并网络响应和缓存后果,同时更新缓存
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {if (networkResponse?.code == HTTP_NOT_MODIFIED) {val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
// 清空之前的缓冲
.cacheResponse(stripBody(cacheResponse))
// 清空申请到的内容,因为内容没有扭转
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response
} else {cacheResponse.body?.closeQuietly()
}
}
//9. 若没有缓存,则写入缓存
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response)
}
// 如果申请的办法不须要缓存,移除缓存,例如 post,put
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {cache.remove(networkRequest)
} catch (_: IOException) {// The cache cannot be written.}
}
}
return response
}
让咱们简略梳理一下缓存流程
- 从以后的 Request 中获取缓存,看是否有缓存
- 非网络申请时,需联合是否有缓存进行判断,如果有缓存,间接返回;如果没有缓存,放回 504
- 是网络申请时,如果放回 304,则做一个小的修补即可;否则依据缓存策略来判断是否要更新缓存(个别要)。
ConnectIntecepter(外围)
获取连贯这个过程较为简单,尽力来梳理这个过程。
首先咱们间接来看这个类的源码,不难发现这个类的源码较为简单,次要外围是 transmitter 的办法。
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val request = realChain.request()
val transmitter = realChain.transmitter()
// We need the network to satisfy this request. Possibly for validating a conditional GET.
val doExtensiveHealthChecks = request.method != "GET"
// 利用重试的责任链生成的 transmitter 类 来获取连贯
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
return realChain.proceed(request, transmitter, exchange)
}
而后咱们来看看这个类,transmitter
/** Returns a new exchange to carry a new request and response. */
internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
...// 做一些查看
// 获取连贯 调配一个 Connection 和 HttpCodec,为最终的申请做筹备
val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
...
}
fun find(
client: OkHttpClient,
chain: Interceptor.Chain,
doExtensiveHealthChecks: Boolean
): ExchangeCodec {val connectTimeout = chain.connectTimeoutMillis()
val readTimeout = chain.readTimeoutMillis()
val writeTimeout = chain.writeTimeoutMillis()
val pingIntervalMillis = client.pingIntervalMillis
val connectionRetryEnabled = client.retryOnConnectionFailure
try {
// 获取连贯
val resultConnection = findHealthyConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled,
doExtensiveHealthChecks = doExtensiveHealthChecks
)
// 设置编码,有 Http1codec 和 Http2codec 两种形式 后者能够复用连贯
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {trackFailure()
throw e
} catch (e: IOException) {trackFailure()
throw RouteException(e)
}
// 获取连贯
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {while (true) {
// 查找新连贯
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// If this is a brand new connection, we can skip the extensive health checks.
// 如果是新连贯 则间接应用
synchronized(connectionPool) {if (candidate.successCount == 0) {return candidate}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
// 判断连接池中连贯是否可用,如果不可用,则开释该连贯并从连接池中移除,并持续寻找可用连贯
if (!candidate.isHealthy(doExtensiveHealthChecks)) {candidate.noNewExchanges()
continue
}
return candidate
}
}
接着就是正式获取连贯这一步了,咱们从正文中能够看到,首先从曾经存在的 Connection 来选取连贯,而后从连接池中寻找,最初才是新建连贯。
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {if (transmitter.isCanceled) throw IOException("Canceled")
hasStreamFailure = false // This is a fresh attempt.
·
// 对现有连贯做一个备份
releasedConnection = transmitter.connection
toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
// 失去要敞开的连贯的 socket
transmitter.releaseConnectionNoEvents()} else {null}
// 如果能够应用 则应用
if (transmitter.connection != null) {
// We had an already-allocated connection and it's good.
result = transmitter.connection
releasedConnection = null
}
// 如果没有能够用的连贯,从连接池中查找
if (result == null) {
// Attempt to get a connection from the pool.
// 以 URL 为 key 查找
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true
result = transmitter.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
} else if (retryCurrentRoute()) {selectedRoute = transmitter.connection!!.route() // 应用路由地址,能够是代理地址
}
}
}
// 敞开之前的 socket
toClose?.closeQuietly()
... // 如果下面找到,间接返回
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result!!
}
// If we need a route selection, make one. This is a blocking operation.
var newRouteSelection = false
// 抉择一个不空的路由
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
newRouteSelection = true
routeSelection = routeSelector.next()}
var routes: List<Route>? = null
synchronized(connectionPool) {if (transmitter.isCanceled) throw IOException("Canceled")
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection!!.routes
// 依据 IP 地址和 Route 从连接池进行第二次查找
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, false)) {
foundPooledConnection = true
result = transmitter.connection
}
}
if (!foundPooledConnection) {if (selectedRoute == null) {selectedRoute = routeSelection!!.next()
}
// 如果没有找到,再应用下一个路由汇合
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {eventListener.connectionAcquired(call, result!!)
return result!!
}
// 到这里还没找到连贯,那就去创立这个连贯
// Do TCP + TLS handshakes. This is a blocking operation.
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
connectionPool.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
// 如果 result 连贯是 http2.0 连贯,http2.0 反对一个连贯同时发动多个申请,这里做去重判断,避免创立多个
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result!!.noNewExchanges = true
socket = result!!.socket()
result = transmitter.connection
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute
} else {connectionPool.put(result!!)
transmitter.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
在这个源码中,呈现了几个新的类,路由 route 类,地址 address 类,咱们简略的来看看这两个类,
Address:封装了所有的能够拜访的地址信息,在这个类中还增加了代理和 dns 的相干信息(在 OkHttpClient 中设置好)proxySelector 能够为一个 URI 设置多个代理,如果地址连贯失败还回调 connectFailed;proxy 设置独自的全局代理,他的优先级高于 proxySelecttor;dns 用法和 proxySelecttor 相似,能够返回多个地址。
private Address createAddress(HttpUrl url) {
SSLSocketFactory sslSocketFactory = null;
HostnameVerifier hostnameVerifier = null;
CertificatePinner certificatePinner = null;
if (url.isHttps()) {sslSocketFactory = client.sslSocketFactory();
hostnameVerifier = client.hostnameVerifier();
certificatePinner = client.certificatePinner();}
return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
}
Route 路由:对地址 Adress 的一个封装类
RouteSelector 路由选择器: 在 OKhttp 中其实其作用也就是返回一个可用的 Route 对象
咱们来大略梳理一下流程
- 首先对以后的流进行一个初步判断,满足则复用
- 不满足则,对连接池进行第一次的查找,此次查找中,route 类为空
connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)
查找失去间接复用
- 查找不到则应用路由进行查找,查找设置的代理和 DNS 是否能找到相干的代理,如果找到则复用
connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, false)
- 上述路线都查找不到,间接新建一个连贯,放入连接池中,并把解析的 host 等信息保留到 Connection 中,不便下次复用。其中还要多做一步判断,如果是 HTTP2 同时发动的申请,要进行一个去重的操作。
下图是一个简要的连贯步骤。
CallServerInterceptor
- 首先取得后面 Intecepter 获取的信息
- 而后利用编码器写入 header 信息
exchange.writeRequestHeaders(request)
- 判断是否要发送申请体,有申请体时,但冀望返回状态码是 100 时,则不发送。否则利用流封装后发送。
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a"Expect: 100-continue"header on the request, wait for a"HTTP/1.1 100
// Continue"response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {exchange.flushRequest()
responseHeadersStarted = true
exchange.responseHeadersStart()
responseBuilder = exchange.readResponseHeaders(true)
}
if (responseBuilder == null) {if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()}
} else {exchange.noRequestBody()
if (!exchange.connection()!!.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()}
}
} else {exchange.noRequestBody()
}
// 创立 response,把握手信息,和 request 等信息保留进去
@Override public Response intercept(Chain chain) throws IOException {
...
// 写入 request 完结
httpCodec.finishRequest();
if (responseBuilder == null) {realChain.eventListener().responseHeadersStart(realChain.call());
// 读取相应 response 的 header 信息
responseBuilder = httpCodec.readResponseHeaders(false);
}
// 创立 response,把握手信息,和 request 等信息保留进去
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
// 开始判断申请码
int code = response.code();
if (code == 100) {
// 如果是 100,间接读取 header
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
// 握手
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();}
...
// 判断申请码
if (forWebSocket && code == 101) {
// 客户端须要转换协定,这里须要设置一个空的 response
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();} else {
// 读取网络的 body
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();}
// 如果 header 申请敞开连贯
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
// 敞开这个链接
streamAllocation.noNewStreams();}
// 非凡 code 判断
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException("HTTP" + code + "had non-zero Content-Length:" + response.body().contentLength());
}
return response;
}
如果想要理解具体的读取和写入流程,以我当初应用的 Http 2.0 为例:
连贯:Http2Connection;
流:Http2Stream;
编解码器:Http2Codec;
读操作:Http2Reader;
写操作:Http2Writer;
他们之间的关系:
1、Http2Connection 调用 Http2Reader 和 Http2Writer 来进行读写;
2、Http2Stream 调用 Http2Connection 进行读写;
3、Http2Codec 调用 Http2Connection 和 Http2Stream 进行操作;
总结
咱们分三个阶段来简要介绍了 OkHttp 这个框架,因为当初程度无限,所以会存在疏漏。当前有些有新的发现,则再对其进行补充。