前言

本文是对OkHttp开源库的一个具体解析,如果你感觉本人不够理解OkHttp,想进一步学习一下,置信本文对你会有所帮忙。

本文蕴含了具体的申请流程剖析、各大拦截器解读以及本人的一点反思总结,文章很长,欢送大家一起交换探讨。

应用办法

应用办法非常简略,别离创立一个OkHttpClient对象,一个Request对象,而后利用他们创立一个Call对象,最初调用同步申请execute()办法或者异步申请enqueue()办法来拿到Response

private final OkHttpClient client = new OkHttpClient();    Request request = new Request.Builder()      .url("https://github.com/")      .build();    //同步申请    Response response = client.newCall(request).execute();    //todo handle response    //异步申请    client.newCall(request).enqueue(new Callback() {      @Override      public void onFailure(@NotNull Call call, @NotNull IOException e) {      //todo handle request failed      }      @Override      public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {          //todo handle Response      }    });

根本对象介绍

正如应用办法中所述,咱们先后构建了 OkHttpClient对象、Request对象、Call对象,那这些对象都是什么意思,有什么作用呢?这个就须要咱们进一步学习理解了。

OkHttpClient

一个申请的配置类,采纳了建造者模式,不便用户配置一些申请参数,如配置callTimeoutcookieinterceptor等等。

open class OkHttpClient internal constructor(  builder: Builder) : Cloneable, Call.Factory, WebSocket.Factory {  constructor() : this(Builder())  class Builder constructor() {    //调度器    internal var dispatcher: Dispatcher = Dispatcher()    //连接池    internal var connectionPool: ConnectionPool = ConnectionPool()    //整体流程拦截器    internal val interceptors: MutableList<Interceptor> = mutableListOf()    //网络流程拦截器    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()    //流程监听器    internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()    //连贯失败时是否重连    internal var retryOnConnectionFailure = true    //服务器认证设置    internal var authenticator: Authenticator = Authenticator.NONE    //是否重定向    internal var followRedirects = true    //是否从HTTP重定向到HTTPS    internal var followSslRedirects = true    //cookie设置    internal var cookieJar: CookieJar = CookieJar.NO_COOKIES    //缓存设置    internal var cache: Cache? = null    //DNS设置    internal var dns: Dns = Dns.SYSTEM    //代理设置    internal var proxy: Proxy? = null    //代理选择器设置    internal var proxySelector: ProxySelector? = null    //代理服务器认证设置    internal var proxyAuthenticator: Authenticator = Authenticator.NONE    //socket配置    internal var socketFactory: SocketFactory = SocketFactory.getDefault()    //https socket配置    internal var sslSocketFactoryOrNull: SSLSocketFactory? = null    internal var x509TrustManagerOrNull: X509TrustManager? = null    internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS    //协定    internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS    //域名校验    internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier    internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT    internal var certificateChainCleaner: CertificateChainCleaner? = null    //申请超时    internal var callTimeout = 0    //连贯超时    internal var connectTimeout = 10_000    //读取超时    internal var readTimeout = 10_000    //写入超时    internal var writeTimeout = 10_000    internal var pingInterval = 0    internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE    internal var routeDatabase: RouteDatabase? = null    ···省略代码···

Request

同样是申请参数的配置类,也同样采纳了建造者模式,但相比于OkHttpClientRequest就非常简略了,只有四个参数,别离是申请URL申请办法申请头申请体

class Request internal constructor(  @get:JvmName("url") val url: HttpUrl,  @get:JvmName("method") val method: String,  @get:JvmName("headers") val headers: Headers,  @get:JvmName("body") val body: RequestBody?,  internal val tags: Map<Class<*>, Any>) {  open class Builder {    //申请的URL    internal var url: HttpUrl? = null    //申请办法,如:GET、POST..    internal var method: String    //申请头    internal var headers: Headers.Builder    //申请体    internal var body: RequestBody? = null  ···省略代码···

Call

申请调用接口,示意这个申请曾经筹备好能够执行,也能够勾销只能执行一次

interface Call : Cloneable {  /** 返回发动此调用的原始申请 */  fun request(): Request  /**   * 同步申请,立刻执行。   *    * 抛出两种异样:   * 1. 申请失败抛出IOException;   * 2. 如果在执行过一回的前提下再次执行抛出IllegalStateException;*/  @Throws(IOException::class)  fun execute(): Response  /**   * 异步申请,将申请安顿在未来的某个工夫点执行。   * 如果在执行过一回的前提下再次执行抛出IllegalStateException */  fun enqueue(responseCallback: Callback)  /** 勾销申请。曾经实现的申请不能被勾销 */  fun cancel()  /** 是否已被执行  */  fun isExecuted(): Boolean  /** 是否被勾销   */  fun isCanceled(): Boolean  /** 一个残缺Call申请流程的超时工夫配置,默认选自[OkHttpClient.Builder.callTimeout] */  fun timeout(): Timeout  /** 克隆这个call,创立一个新的雷同的Call */  public override fun clone(): Call  /** 利用工厂模式来让 OkHttpClient 来创立 Call对象 */  fun interface Factory {    fun newCall(request: Request): Call  }}

RealCall

OkHttpClient 中,咱们利用 newCall 办法来创立一个 Call 对象,但从源码中能够看出,newCall 办法返回的是一个 RealCall 对象。

OkHttpClient.ktoverride fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

RealCallCall接口的具体实现类,是利用端与网络层的连贯桥,展现利用端原始的申请与连贯数据,以及网络层返回的response及其它数据流。 通过应用办法也可知,创立RealCall对象后,就要调用同步或异步申请办法,所以它外面还蕴含同步申请 execute()异步申请 enqueue()办法。(前面具体开展剖析)

AsyncCall

异步申请调用,是RealCall的一个外部类,就是一个Runnable,被调度器中的线程池所执行。

inner class AsyncCall(    //用户传入的响应回调办法    private val responseCallback: Callback  ) : Runnable {    //同一个域名的申请次数,volatile + AtomicInteger 保障在多线程下及时可见性与原子性    @Volatile var callsPerHost = AtomicInteger(0)      private set    fun reuseCallsPerHostFrom(other: AsyncCall) {      this.callsPerHost = other.callsPerHost    }···省略代码···    fun executeOn(executorService: ExecutorService) {      client.dispatcher.assertThreadDoesntHoldLock()      var success = false      try {        //调用线程池执行        executorService.execute(this)        success = true      } catch (e: RejectedExecutionException) {        val ioException = InterruptedIOException("executor rejected")        ioException.initCause(e)        noMoreExchanges(ioException)        //申请失败,调用 Callback.onFailure() 办法        responseCallback.onFailure(this@RealCall, ioException)      } finally {        if (!success) {          //申请失败,调用调度器finish办法          client.dispatcher.finished(this) // This call is no longer running!        }      }    }    override fun run() {      threadName("OkHttp ${redactedUrl()}") {        var signalledCallback = false        timeout.enter()        try {          //申请胜利,获取到服务器返回的response          val response = getResponseWithInterceptorChain()          signalledCallback = true          //调用 Callback.onResponse() 办法,将 response 传递进来          responseCallback.onResponse(this@RealCall, response)        } catch (e: IOException) {          if (signalledCallback) {            // Do not signal the callback twice!            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)          } else {            //申请失败,调用 Callback.onFailure() 办法            responseCallback.onFailure(this@RealCall, e)          }        } catch (t: Throwable) {          //申请出现异常,调用cancel办法来勾销申请          cancel()          if (!signalledCallback) {            val canceledException = IOException("canceled due to $t")            canceledException.addSuppressed(t)            //申请失败,调用 Callback.onFailure() 办法            responseCallback.onFailure(this@RealCall, canceledException)          }          throw t        } finally {          //申请完结,调用调度器finish办法          client.dispatcher.finished(this)        }      }    }  }

Dispatcher

调度器,用来调度Call对象,同时蕴含线程池与异步申请队列,用来寄存与执行AsyncCall对象。

class Dispatcher constructor() {  @get:Synchronized  @get:JvmName("executorService") val executorService: ExecutorService    get() {      if (executorServiceOrNull == null) {        //创立一个缓存线程池,来解决申请调用        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))      }      return executorServiceOrNull!!    }  /** 已筹备好的异步申请队列 */  @get:Synchronized  private val readyAsyncCalls = ArrayDeque<AsyncCall>()  /** 正在运行的异步申请队列, 蕴含勾销然而还未finish的AsyncCall */  private val runningAsyncCalls = ArrayDeque<AsyncCall>()  /** 正在运行的同步申请队列, 蕴含勾销然而还未finish的RealCall */  private val runningSyncCalls = ArrayDeque<RealCall>()···省略代码···}

总结一下

对象作用
Call申请调用接口,示意这个申请曾经筹备好能够执行,也能够被勾销,只能执行一次。
RealCallCall接口的具体实现类,是利用与网络层之间的连贯桥,蕴含OkHttpClientRequest信息。
AsyncCall异步申请调用,其实就是个Runnable,会被放到线程池中进行解决。
Dispatcher调度器,用来调度Call对象,同时蕴含线程池与异步申请队列,用来寄存与执行AsyncCall对象。
Request申请类,蕴含urlmethodheadersbody
Response网络层返回的响应数据。
Callback响应回调函数接口,蕴含onFailureonResponse 两个办法。

流程剖析

介绍完了对象,接下来就依据应用办法,具体看一下源码吧。

同步申请

同步申请的应用办法。

client.newCall(request).execute();

newCall办法就是创立一个RealCall对象,而后执行其execute()办法。

  RealCall.kt    override fun execute(): Response {    //CAS判断是否曾经被执行了, 确保只能执行一次,如果曾经执行过,则抛出异样    check(executed.compareAndSet(false, true)) { "Already Executed" }    //申请超时开始计时    timeout.enter()    //开启申请监听    callStart()    try {      //调用调度器中的 executed() 办法,调度器只是将 call 退出到了runningSyncCalls队列中      client.dispatcher.executed(this)      //调用getResponseWithInterceptorChain 办法拿到 response      return getResponseWithInterceptorChain()    } finally {      //执行结束,调度器将该 call 从 runningSyncCalls队列中移除      client.dispatcher.finished(this)    }  }

调用调度器executed办法,就是将以后的RealCall对象退出到runningSyncCalls队列中,而后调用getResponseWithInterceptorChain办法拿到response

异步申请

在来看看异步申请。

  RealCall.kt  override fun enqueue(responseCallback: Callback) {    //CAS判断是否曾经被执行了, 确保只能执行一次,如果曾经执行过,则抛出异样    check(executed.compareAndSet(false, true)) { "Already Executed" }    //开启申请监听    callStart()    //新建一个AsyncCall对象,通过调度器enqueue办法退出到readyAsyncCalls队列中    client.dispatcher.enqueue(AsyncCall(responseCallback))  }

而后调用调度器的enqueue办法,

  Dispatcher.kt    internal fun enqueue(call: AsyncCall) {    //加锁,保障线程平安    synchronized(this) {      //将该申请调用退出到 readyAsyncCalls 队列中      readyAsyncCalls.add(call)      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to      // the same host.      if (!call.call.forWebSocket) {        //通过域名来查找有没有雷同域名的申请,有则复用。        val existingCall = findExistingCallWithHost(call.host)        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)      }    }    //执行申请    promoteAndExecute()  }  private fun promoteAndExecute(): Boolean {    this.assertThreadDoesntHoldLock()    val executableCalls = mutableListOf<AsyncCall>()    //判断是否有申请正在执行    val isRunning: Boolean    //加锁,保障线程平安    synchronized(this) {      //遍历 readyAsyncCalls 队列      val i = readyAsyncCalls.iterator()      while (i.hasNext()) {        val asyncCall = i.next()        //runningAsyncCalls 的数量不能大于最大并发申请数 64        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.        //同域名最大申请数5,同一个域名最多容许5条线程同时执行申请        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.        //从 readyAsyncCalls 队列中移除,并退出到 executableCalls 及 runningAsyncCalls 队列中        i.remove()        asyncCall.callsPerHost.incrementAndGet()        executableCalls.add(asyncCall)        runningAsyncCalls.add(asyncCall)      }      //通过运行队列中的申请数量来判断是否有申请正在执行      isRunning = runningCallsCount() > 0    }    //遍历可执行队列,调用线程池来执行AsyncCall    for (i in 0 until executableCalls.size) {      val asyncCall = executableCalls[i]      asyncCall.executeOn(executorService)    }    return isRunning  }

调度器的enqueue办法就是将AsyncCall退出到readyAsyncCalls队列中,而后调用promoteAndExecute办法来执行申请,promoteAndExecute办法做的其实就是遍历readyAsyncCalls队列,而后将符合条件的申请用线程池执行,也就是会执行AsyncCall.run()办法。

AsyncCall 办法的具体代码看根本对象介绍 AsyncCall,这边就不在此展现了,简略来说就是调用getResponseWithInterceptorChain办法拿到response,而后通过Callback.onResponse办法传递进来。反之,如果申请失败,捕捉了异样,就通过Callback.onFailure将异样信息传递进来。 最终,申请完结,调用调度器finish办法。

  Dispatcher.kt  /** 异步申请调用完结办法 */  internal fun finished(call: AsyncCall) {    call.callsPerHost.decrementAndGet()    finished(runningAsyncCalls, call)  }  /** 同步申请调用完结办法 */  internal fun finished(call: RealCall) {    finished(runningSyncCalls, call)  }  private fun <T> finished(calls: Deque<T>, call: T) {    val idleCallback: Runnable?    synchronized(this) {      //将以后申请调用从 正在运行队列 中移除      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")      idleCallback = this.idleCallback    }    //继续执行残余申请,将call从readyAsyncCalls中取出退出到runningAsyncCalls,而后执行    val isRunning = promoteAndExecute()    if (!isRunning && idleCallback != null) {      //如果执行完了所有申请,处于闲置状态,调用闲置回调办法      idleCallback.run()    }  }

获取Response

接着就是看看getResponseWithInterceptorChain办法是如何拿到response的。

  internal fun getResponseWithInterceptorChain(): Response {    //拦截器列表    val interceptors = mutableListOf<Interceptor>()    interceptors += client.interceptors    interceptors += RetryAndFollowUpInterceptor(client)    interceptors += BridgeInterceptor(client.cookieJar)    interceptors += CacheInterceptor(client.cache)    interceptors += ConnectInterceptor    if (!forWebSocket) {      interceptors += client.networkInterceptors    }    interceptors += CallServerInterceptor(forWebSocket)    //构建拦截器责任链    val chain = RealInterceptorChain(        call = this,        interceptors = interceptors,        index = 0,        exchange = null,        request = originalRequest,        connectTimeoutMillis = client.connectTimeoutMillis,        readTimeoutMillis = client.readTimeoutMillis,        writeTimeoutMillis = client.writeTimeoutMillis    )    //如果call申请实现,那就意味着交互实现了,没有更多的货色来替换了    var calledNoMoreExchanges = false    try {      //执行拦截器责任链来获取 response      val response = chain.proceed(originalRequest)      //如果被勾销,敞开响应,抛出异样      if (isCanceled()) {        response.closeQuietly()        throw IOException("Canceled")      }      return response    } catch (e: IOException) {      calledNoMoreExchanges = true      throw noMoreExchanges(e) as Throwable    } finally {      if (!calledNoMoreExchanges) {        noMoreExchanges(null)      }    }  }

简略概括一下:这里采纳了责任链设计模式,通过拦截器构建了以RealInterceptorChain责任链,而后执行proceed办法来失去response

那么,这又波及拦截器是什么?拦截器责任链又是什么?

Interceptor

只申明了一个拦截器办法,在子类中具体实现,还蕴含一个Chain接口,外围办法是proceed(request)解决申请来获取response

fun interface Interceptor {  /** 拦挡办法 */  @Throws(IOException::class)  fun intercept(chain: Chain): Response  interface Chain {    /** 原始申请数据 */    fun request(): Request    /** 外围办法,解决申请,获取response */    @Throws(IOException::class)    fun proceed(request: Request): Response        fun connection(): Connection?    fun call(): Call    fun connectTimeoutMillis(): Int    fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain    fun readTimeoutMillis(): Int    fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain    fun writeTimeoutMillis(): Int    fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain  }}

RealInterceptorChain

拦截器链就是实现Interceptor.Chain接口,重点就是复写的proceed办法。

class RealInterceptorChain(  internal val call: RealCall,  private val interceptors: List<Interceptor>,  private val index: Int,  internal val exchange: Exchange?,  internal val request: Request,  internal val connectTimeoutMillis: Int,  internal val readTimeoutMillis: Int,  internal val writeTimeoutMillis: Int) : Interceptor.Chain {···省略代码···  private var calls: Int = 0  override fun call(): Call = call  override fun request(): Request = request  @Throws(IOException::class)  override fun proceed(request: Request): Response {    check(index < interceptors.size)    calls++    if (exchange != null) {      check(exchange.finder.sameHostAndPort(request.url)) {        "network interceptor ${interceptors[index - 1]} must retain the same host and port"      }      check(calls == 1) {        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"      }    }    //index+1, 复制创立新的责任链,也就意味着调用责任链中的下一个解决者,也就是下一个拦截器    val next = copy(index = index + 1, request = request)    //取出以后拦截器    val interceptor = interceptors[index]    //执行以后拦截器的拦挡办法    @Suppress("USELESS_ELVIS")    val response = interceptor.intercept(next) ?: throw NullPointerException(        "interceptor $interceptor returned null")    if (exchange != null) {      check(index + 1 >= interceptors.size || next.calls == 1) {        "network interceptor $interceptor must call proceed() exactly once"      }    }    check(response.body != null) { "interceptor $interceptor returned a response with no body" }    return response  }}

链式调用,最终会执行拦截器列表中的每个拦截器,返回Response

拦截器

OK,接下来就该看看拦截器列表中的具体拦截器了。

先上各类拦截器的总结,按程序:

  1. client.interceptors:这是由开发者设置的,会在所有的拦截器解决之前进行最早的拦挡解决,可用于增加一些公共参数,如自定义header自定义log等等。
  2. RetryAndFollowUpInterceptor:这里会对连贯做一些初始化工作,以及申请失败的重试工作,重定向的后续申请工作。跟他的名字一样,就是做重试工作还有一些连贯跟踪工作。
  3. BridgeInterceptor:是客户端与服务器之间的沟通桥梁,负责将用户构建的申请转换为服务器须要的申请,以及将网络申请返回回来的响应转换为用户可用的响应。
  4. CacheInterceptor:这里次要是缓存的相干解决,会依据用户在OkHttpClient里定义的缓存配置,而后联合申请新建一个缓存策略,由它来判断是应用网络还是缓存来构建response
  5. ConnectInterceptor:这里次要就是负责建设连贯,会建设TCP连贯或者TLS连贯
  6. client.networkInterceptors:这里也是开发者本人设置的,所以实质上和第一个拦截器差不多,然而因为地位不同,所以用途也不同。
  7. CallServerInterceptor:这里就是进行网络数据的申请和响应了,也就是理论的网络I/O操作,将申请头与申请体发送给服务器,以及解析服务器返回的response

接下来咱们按程序,从上往下,对这些拦截器进行一一解读。

client.interceptors

这是用户本人定义的拦截器,称为利用拦截器,会保留在OkHttpClientinterceptors: List<Interceptor>列表中。 他是拦截器责任链中的第一个拦截器,也就是说会第一个执行拦挡办法,咱们能够通过它来增加自定义Header信息,如:

class HeaderInterceptor implements Interceptor {    @Override    public Response intercept(Chain chain) throws IOException {        Request request = chain.request().newBuilder()                .addHeader("device-android", "xxxxxxxxxxx")                .addHeader("country-code", "ZH")                .build();        return chain.proceed(request);    }}//而后在 OkHttpClient 中退出OkHttpClient client = new OkHttpClient.Builder()    .connectTimeout(60, TimeUnit.SECONDS)    .readTimeout(15, TimeUnit.SECONDS)    .writeTimeout(15, TimeUnit.SECONDS)    .cookieJar(new MyCookieJar())    .addInterceptor(new HeaderInterceptor())//增加自定义Header拦截器    .build();

RetryAndFollowUpInterceptor

第二个拦截器,从它的名字也可晓得,它负责申请失败的重试工作与重定向的后续申请工作,同时它会对连贯做一些初始化工作。

class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {  @Throws(IOException::class)  override fun intercept(chain: Interceptor.Chain): Response {    val realChain = chain as RealInterceptorChain    var request = chain.request    val call = realChain.call    var followUpCount = 0    var priorResponse: Response? = null    var newExchangeFinder = true    var recoveredFailures = listOf<IOException>()    while (true) {      //这里会新建一个ExchangeFinder,ConnectInterceptor会应用到      call.enterNetworkInterceptorExchange(request, newExchangeFinder)      var response: Response      var closeActiveExchange = true      try {        if (call.isCanceled()) {          throw IOException("Canceled")        }        try {          response = realChain.proceed(request)          newExchangeFinder = true        } catch (e: RouteException) {          //尝试通过路由连贯失败。该申请将不会被发送。          if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {            throw e.firstConnectException.withSuppressed(recoveredFailures)          } else {            recoveredFailures += e.firstConnectException          }          newExchangeFinder = false          continue        } catch (e: IOException) {          //尝试与服务器通信失败。该申请可能已发送。          if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {            throw e.withSuppressed(recoveredFailures)          } else {            recoveredFailures += e          }          newExchangeFinder = false          continue        }        // Attach the prior response if it exists. Such responses never have a body.        //尝试关联上一个response,留神:body是为null        if (priorResponse != null) {          response = response.newBuilder()              .priorResponse(priorResponse.newBuilder()                  .body(null)                  .build())              .build()        }        val exchange = call.interceptorScopedExchange        //会依据 responseCode 来判断,构建一个新的request并返回来重试或者重定向        val followUp = followUpRequest(response, exchange)        if (followUp == null) {          if (exchange != null && exchange.isDuplex) {            call.timeoutEarlyExit()          }          closeActiveExchange = false          return response        }        //如果申请体是一次性的,不须要再次重试        val followUpBody = followUp.body        if (followUpBody != null && followUpBody.isOneShot()) {          closeActiveExchange = false          return response        }        response.body?.closeQuietly()        //最大重试次数,不同的浏览器是不同的,比方:Chrome为21,Safari则是16        if (++followUpCount > MAX_FOLLOW_UPS) {          throw ProtocolException("Too many follow-up requests: $followUpCount")        }        request = followUp        priorResponse = response      } finally {        call.exitNetworkInterceptorExchange(closeActiveExchange)      }    }  }  /** 判断是否要进行重连,false->不尝试重连;true->尝试重连。*/  private fun recover(    e: IOException,    call: RealCall,    userRequest: Request,    requestSendStarted: Boolean  ): Boolean {    //客户端禁止重试    if (!client.retryOnConnectionFailure) return false    //不能再次发送该申请体    if (requestSendStarted && requestIsOneShot(e, userRequest)) return false    //产生的异样是致命的,无奈复原,如:ProtocolException    if (!isRecoverable(e, requestSendStarted)) return false    //没有更多的路由来尝试重连    if (!call.retryAfterFailure()) return false    // 对于失败复原,应用带有新连贯的雷同路由选择器    return true  }···省略代码··· 

BridgeInterceptor

从它的名字能够看出,他的定位是客户端与服务器之间的沟通桥梁,负责将用户构建的申请转换为服务器须要的申请,比方:增加 Content-Type,增加 Cookie,增加User-Agent等等。再将服务器返回的response做一些解决转换为客户端须要的response。比方:移除响应头中的Content-EncodingContent-Length等等。

class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {  @Throws(IOException::class)  override fun intercept(chain: Interceptor.Chain): Response {    //获取原始申请数据    val userRequest = chain.request()    val requestBuilder = userRequest.newBuilder()    //从新构建申请头,申请体信息    val body = userRequest.body    val contentType = body.contentType()    requestBuilder.header("Content-Type", contentType.toString())    requestBuilder.header("Content-Length", contentLength.toString())    requestBuilder.header("Transfer-Encoding", "chunked")    requestBuilder.header("Host", userRequest.url.toHostHeader())    requestBuilder.header("Connection", "Keep-Alive")   ···省略代码···       //增加cookie    val cookies = cookieJar.loadForRequest(userRequest.url)    if (cookies.isNotEmpty()) {      requestBuilder.header("Cookie", cookieHeader(cookies))    }    //增加user-agent    if (userRequest.header("User-Agent") == null) {      requestBuilder.header("User-Agent", userAgent)    }    //从新构建一个Request,而后执行下一个拦截器来解决该申请    val networkResponse = chain.proceed(requestBuilder.build())    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)    //创立一个新的responseBuilder,目标是将原始申请数据构建到response中    val responseBuilder = networkResponse.newBuilder()        .request(userRequest)    if (transparentGzip &&        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&        networkResponse.promisesBody()) {      val responseBody = networkResponse.body      if (responseBody != null) {        val gzipSource = GzipSource(responseBody.source())        val strippedHeaders = networkResponse.headers.newBuilder()            .removeAll("Content-Encoding")            .removeAll("Content-Length")            .build()        //批改response header信息,移除Content-Encoding,Content-Length信息        responseBuilder.headers(strippedHeaders)        val contentType = networkResponse.header("Content-Type")        //批改response body信息        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))      }    }        return responseBuilder.build()   ···省略代码···

CacheInterceptor

用户能够通过OkHttpClient.cache来配置缓存,缓存拦截器通过CacheStrategy来判断是应用网络还是缓存来构建response

class CacheInterceptor(internal val cache: Cache?) : Interceptor {  @Throws(IOException::class)  override fun intercept(chain: Interceptor.Chain): Response {    val call = chain.call()    //通过request从OkHttpClient.cache中获取缓存    val cacheCandidate = cache?.get(chain.request())    val now = System.currentTimeMillis()    //创立一个缓存策略,用来确定怎么应用缓存    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()    //为空示意不应用网络,反之,则示意应用网络    val networkRequest = strategy.networkRequest    //为空示意不应用缓存,反之,则示意应用缓存    val cacheResponse = strategy.cacheResponse    //追踪网络与缓存的应用状况    cache?.trackResponse(strategy)    val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE    //有缓存但不实用,敞开它    if (cacheCandidate != null && cacheResponse == null) {      cacheCandidate.body?.closeQuietly()    }    //如果网络被禁止,然而缓存又是空的,构建一个code为504的response,并返回    if (networkRequest == null && cacheResponse == null) {      return Response.Builder()          .request(chain.request())          .protocol(Protocol.HTTP_1_1)          .code(HTTP_GATEWAY_TIMEOUT)          .message("Unsatisfiable Request (only-if-cached)")          .body(EMPTY_RESPONSE)          .sentRequestAtMillis(-1L)          .receivedResponseAtMillis(System.currentTimeMillis())          .build().also {            listener.satisfactionFailure(call, it)          }    }    //如果咱们禁用了网络不应用网络,且有缓存,间接依据缓存内容构建并返回response    if (networkRequest == null) {      return cacheResponse!!.newBuilder()          .cacheResponse(stripBody(cacheResponse))          .build().also {            listener.cacheHit(call, it)          }    }    //为缓存增加监听    if (cacheResponse != null) {      listener.cacheConditionalHit(call, cacheResponse)    } else if (cache != null) {      listener.cacheMiss(call)    }    var networkResponse: Response? = null    try {      //责任链往下解决,从服务器返回response 赋值给 networkResponse      networkResponse = chain.proceed(networkRequest)    } finally {      //捕捉I/O或其余异样,申请失败,networkResponse为空,且有缓存的时候,不裸露缓存内容。      if (networkResponse == null && cacheCandidate != null) {        cacheCandidate.body?.closeQuietly()      }    }    //如果有缓存    if (cacheResponse != null) {      //且网络返回response code为304的时候,应用缓存内容新构建一个Response返回。      if (networkResponse?.code == HTTP_NOT_MODIFIED) {        val response = cacheResponse.newBuilder()            .headers(combine(cacheResponse.headers, networkResponse.headers))            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)            .cacheResponse(stripBody(cacheResponse))            .networkResponse(stripBody(networkResponse))            .build()        networkResponse.body!!.close()        // Update the cache after combining headers but before stripping the        // Content-Encoding header (as performed by initContentStream()).        cache!!.trackConditionalCacheHit()        cache.update(cacheResponse, response)        return response.also {          listener.cacheHit(call, it)        }      } else {        //否则敞开缓存响应体        cacheResponse.body?.closeQuietly()      }    }    //构建网络申请的response    val response = networkResponse!!.newBuilder()        .cacheResponse(stripBody(cacheResponse))        .networkResponse(stripBody(networkResponse))        .build()    //如果cache不为null,即用户在OkHttpClient中配置了缓存,则将上一步新构建的网络申请response存到cache中    if (cache != null) {      //依据response的code,header以及CacheControl.noStore来判断是否能够缓存      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {        // 将该response存入缓存        val cacheRequest = cache.put(response)        return cacheWritingResponse(cacheRequest, response).also {          if (cacheResponse != null) {            listener.cacheMiss(call)          }        }      }      //依据申请办法来判断缓存是否无效,只对Get申请进行缓存,其它办法的申请则移除      if (HttpMethod.invalidatesCache(networkRequest.method)) {        try {          //缓存有效,将该申请缓存从client缓存配置中移除          cache.remove(networkRequest)        } catch (_: IOException) {          // The cache cannot be written.        }      }    }    return response  }  ···省略代码···  

ConnectInterceptor

负责实现与服务器真正建设起连贯,

object ConnectInterceptor : Interceptor {  @Throws(IOException::class)  override fun intercept(chain: Interceptor.Chain): Response {    val realChain = chain as RealInterceptorChain    //初始化一个exchange对象    val exchange = realChain.call.initExchange(chain)    //依据这个exchange对象来复制创立一个新的连贯责任链    val connectedChain = realChain.copy(exchange = exchange)    //执行该连贯责任链    return connectedChain.proceed(realChain.request)  }}

一扫下来,代码非常简略,拦挡办法里就只有三步。

  1. 初始化一个exchange对象。
  2. 而后依据这个exchange对象来复制创立一个新的连贯责任链。
  3. 执行该连贯责任链。

那这个exchange对象又是什么呢?

RealCall.ktinternal fun initExchange(chain: RealInterceptorChain): Exchange {    ...省略代码...    //这里的exchangeFinder就是在RetryAndFollowUpInterceptor中创立的    val exchangeFinder = this.exchangeFinder!!    //返回一个ExchangeCodec(是个编码器,为request编码以及为response解码)    val codec = exchangeFinder.find(client, chain)    //依据exchangeFinder与codec新构建一个Exchange对象,并返回    val result = Exchange(this, eventListener, exchangeFinder, codec)  ...省略代码...    return result  }

具体看看ExchangeFinder.find()这一步,

ExchangeFinder.ktfun find(    client: OkHttpClient,    chain: RealInterceptorChain  ): ExchangeCodec {    try {      //查找合格可用的连贯,返回一个 RealConnection 对象      val resultConnection = findHealthyConnection(          connectTimeout = chain.connectTimeoutMillis,          readTimeout = chain.readTimeoutMillis,          writeTimeout = chain.writeTimeoutMillis,          pingIntervalMillis = client.pingIntervalMillis,          connectionRetryEnabled = client.retryOnConnectionFailure,          doExtensiveHealthChecks = chain.request.method != "GET"      )      //依据连贯,创立并返回一个申请响应编码器:Http1ExchangeCodec 或者 Http2ExchangeCodec,别离对应Http1协定与Http2协定      return resultConnection.newCodec(client, chain)    } catch (e: RouteException) {      trackFailure(e.lastConnectException)      throw e    } catch (e: IOException) {      trackFailure(e)      throw RouteException(e)    }  }

持续往下看findHealthyConnection办法

ExchangeFinder.kt  private fun findHealthyConnection(    connectTimeout: Int,    readTimeout: Int,    writeTimeout: Int,    pingIntervalMillis: Int,    connectionRetryEnabled: Boolean,    doExtensiveHealthChecks: Boolean  ): RealConnection {    while (true) {      //重点:查找连贯      val candidate = findConnection(          connectTimeout = connectTimeout,          readTimeout = readTimeout,          writeTimeout = writeTimeout,          pingIntervalMillis = pingIntervalMillis,          connectionRetryEnabled = connectionRetryEnabled      )      //查看该连贯是否合格可用,合格则间接返回该连贯      if (candidate.isHealthy(doExtensiveHealthChecks)) {        return candidate      }      //如果该连贯不合格,标记为不可用,从连接池中移除      candidate.noNewExchanges()    ...省略代码...    }  }

简略概括一下就是:通过findConnection办法来查找连贯,找到连贯后判断是否是合格可用的,合格就间接返回该连贯。

所以外围办法就是findConnection,咱们持续深刻看看该办法:

private fun findConnection(    connectTimeout: Int,     readTimeout: Int,    writeTimeout: Int,    pingIntervalMillis: Int,    connectionRetryEnabled: Boolean  ): RealConnection {    if (call.isCanceled()) throw IOException("Canceled")    //第一次,尝试重连 call 中的 connection,不须要去从新获取连贯    val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!    if (callConnection != null) {      var toClose: Socket? = null      synchronized(callConnection) {        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {          toClose = call.releaseConnectionNoEvents()        }      }      //如果 call 中的 connection 还没有开释,就重用它。      if (call.connection != null) {        check(toClose == null)        return callConnection      }      //如果 call 中的 connection 曾经被开释,敞开Socket.      toClose?.closeQuietly()      eventListener.connectionReleased(call, callConnection)    }    //须要一个新的连贯,所以重置一些状态    refusedStreamCount = 0    connectionShutdownCount = 0    otherFailureCount = 0    //第二次,尝试从连接池中获取一个连贯,不领路由,不带多路复用    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {      val result = call.connection!!      eventListener.connectionAcquired(call, result)      return result    }    //连接池中是空的,筹备下次尝试连贯的路由    val routes: List<Route>?    val route: Route        ...省略代码...      //第三次,再次尝试从连接池中获取一个连贯,领路由,不带多路复用      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {        val result = call.connection!!        eventListener.connectionAcquired(call, result)        return result      }      route = localRouteSelection.next()    }    //第四次,手动创立一个新连贯    val newConnection = RealConnection(connectionPool, route)    call.connectionToCancel = newConnection    try {      newConnection.connect(          connectTimeout,          readTimeout,          writeTimeout,          pingIntervalMillis,          connectionRetryEnabled,          call,          eventListener      )    } finally {      call.connectionToCancel = null    }    call.client.routeDatabase.connected(newConnection.route())    //第五次,再次尝试从连接池中获取一个连贯,领路由,带多路复用。    //这一步次要是为了校验一下,比方曾经有了一条连贯了,就能够间接复用,而不必应用手动创立的新连贯。    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {      val result = call.connection!!      nextRouteToTry = route      newConnection.socket().closeQuietly()      eventListener.connectionAcquired(call, result)      return result    }    synchronized(newConnection) {      //将手动创立的新连贯放入连接池      connectionPool.put(newConnection)      call.acquireConnectionNoEvents(newConnection)    }    eventListener.connectionAcquired(call, newConnection)    return newConnection  }

在代码中能够看出,一共做了5次尝试去失去连贯:

  1. 第一次,尝试重连 call 中的 connection,不须要去从新获取连贯。
  2. 第二次,尝试从连接池中获取一个连贯,不领路由,不带多路复用。
  3. 第三次,再次尝试从连接池中获取一个连贯,领路由,不带多路复用。
  4. 第四次,手动创立一个新连贯。
  5. 第五次,再次尝试从连接池中获取一个连贯,领路由,带多路复用。

OK,到了这一步,就算建设起了连贯。

client.networkInterceptors

该拦截器称为网络拦截器,与client.interceptors一样也是由用户本人定义的,同样是以列表的模式存在OkHttpClient中。

那这两个拦截器有什么不同呢?

其实他两的不同都是因为他们所处的地位不同所导致的,利用拦截器处于第一个地位,所以无论如何它都会被执行,而且只会执行一次。而网络拦截器处于倒数第二的地位,它不肯定会被执行,而且可能会被执行屡次,比方:在RetryAndFollowUpInterceptor失败或者CacheInterceptor间接返回缓存的状况下,咱们的网络拦截器是不会被执行的。

CallServerInterceptor

到了这里,客户端与服务器曾经建设好了连贯,接着就是将申请头与申请体发送给服务器,以及解析服务器返回的response了。

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {  @Throws(IOException::class)  override fun intercept(chain: Interceptor.Chain): Response {    val realChain = chain as RealInterceptorChain    val exchange = realChain.exchange!!    val request = realChain.request    val requestBody = request.body    var invokeStartEvent = true    var responseBuilder: Response.Builder? = null    try {      //写入申请头      exchange.writeRequestHeaders(request)      //如果不是GET申请,并且申请体不为空      if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {        //当申请头为"Expect: 100-continue"时,在发送申请体之前须要期待服务器返回"HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送申请体。        //POST申请,先发送申请头,在获取到100持续状态后持续发送申请体        if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {          //刷新申请,即发送申请头          exchange.flushRequest()          //解析响应头          responseBuilder = exchange.readResponseHeaders(expectContinue = true)          exchange.responseHeadersStart()          invokeStartEvent = false        }        //写入申请体        if (responseBuilder == null) {          if (requestBody.isDuplex()) {            //如果申请体是双公体,就先发送申请头,稍后在发送申请体            exchange.flushRequest()            val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()            //写入申请体            requestBody.writeTo(bufferedRequestBody)          } else {            //如果获取到了"Expect: 100-continue"响应,写入申请体            val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()            requestBody.writeTo(bufferedRequestBody)            bufferedRequestBody.close()          }       ···省略代码···        //申请完结,发送申请体        exchange.finishRequest()    ···省略代码···    try {      if (responseBuilder == null) {        //读取响应头        responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!        ···省略代码···      //构建一个response      var response = responseBuilder          .request(request)          .handshake(exchange.connection.handshake())          .sentRequestAtMillis(sentRequestMillis)          .receivedResponseAtMillis(System.currentTimeMillis())          .build()      var code = response.code      ···省略代码···      return response···省略代码···

简略概括一下:写入发送申请头,而后依据条件是否写入发送申请体,申请完结。解析服务器返回的申请头,而后构建一个新的response,并返回。 这里CallServerInterceptor是拦截器责任链中最初一个拦截器了,所以他不会再调用chain.proceed()办法往下执行,而是将这个构建的response往上传递给责任链中的每个拦截器。

总结

咱们剖析了申请的流程,包含同步申请与异步申请,还仔细分析了拦截器责任链中的每个拦截器,当初画一个流程图,简略总结一下,你能够对照着流程图,在走一遍流程。

反思

设计模式

  1. 建造者模式:不论是在OkHttpClientRequest还是Response中都用到了建造者模式,因为这几个类中都有很多参数,须要供用户抉择须要的参数来构建其想要的实例,所以在开源库中,Build模式是很常见的。
  2. 工厂办法模式:帮忙生成简单对象,如: OkHttpClient.newCall(request Request) 来创立 Call 对象
  3. 责任链模式:这个就用的很绝妙了,将7个拦截器形成拦截器责任链,而后按程序从上往下执行,失去Response后,从下往上传回去。

线程平安

AsyncCall 类中的 callsPerHost 变量,应用了 Volatile + AtomicInteger来润饰,从而保障在多线程下的线程平安。

inner class AsyncCall(    private val responseCallback: Callback  ) : Runnable {    //同一个域名的申请次数,volatile + AtomicInteger 保障在多线程下及时可见性与原子性    @Volatile var callsPerHost = AtomicInteger(0)      private set    ...省略代码...

数据结构

为什么readyAsyncCalls runningAsyncCalls runningSyncCalls采纳ArrayDeque呢?

两个点答复、他们都是用来寄存网络申请的,这些申请须要做到先到先得,所以采纳队列。、依据代码所示,当执行enqueue时,咱们须要遍历readyAsyncCalls,将合乎执行条件的Call退出到runningAsyncCalls,这绝对比于链表来说,数组的查找效率要更高,所以采纳ArrayDeque

结尾

到此,对于OkHttp的源码解析就介绍啦。

其实学习源码的最好形式,就是本人将代码克隆下来,而后对着应用办法,按流程,一步一步往下走。

其实分享文章的最大目标正是期待着有人指出我的谬误,如果你发现哪里有谬误,请毫无保留的指出即可,虚心求教。 另外,如果你感觉文章不错,对你有所帮忙,请给我点个赞,就当激励,谢谢~Peace~!
视频:
资深架构师逐题详解Android大厂精选高频面试题之OkHttp
Android(安卓)开发零根底从入门到精通之OkHttp
原文: https://juejin.cn/post/7033307467199021086