乐趣区

OkHttp源码分析二OkHttpClientRequestCallDispatcher详解

文章首发在个人博客 https://www.nullobject.cn,公众号 NullObject 同步更新。

这篇文章主要介绍 OkHttpClient、Request、Call、Dispatcher、Response 等类

文章基于 OkHttp3.14.3 版本

0. 说明

上一篇 OkHttp 源码分析 (一) 请求和响应过程简单分析中我们简单分析了 OkHttp 从请求到响应的过程,这篇就来深入学习下其中涉及到的比较关键的类:

  • OkHttpClient: OkHttp 客户端参数配置、Call 工厂
  • Request: 请求链接、方法、参数配置
  • Call 实现类 Realcall: 请求任务
  • Dispatcher: 任务调度器

1. OkHttpClient

OkHttpClient类主要应用了外观模式和建造者模式两种设计模式来设计,结合外观模式的思想,将许多对应 OkHttp 中各个功能模块的对象包含到类中,并为这些功能对象的配置提供了共同的对外接口。同时使用建造者模式,提供一个 Builder 类为这些众多的功能模块提供链式的配置方式,使得繁杂的功能模块配置变得简洁。首先,创建一个 OkHttpClient.builder 对象,接着按需设置 builder 各个参数:

OkHttpClient.Builder builder = new OkHttpClient.Builder();

1.1 超时、失败重连

OkHttpClient 中超时时间参数有以下几种:

  • callTimeout: 设置完整的请求过程超时时间。该参数计算的是整个请求过程的时间:从解析 DNS、与 Server 建立连接、发送请求、Server 响应处理到读取请求结果。同时,如果请求中包含重定向和失败重连,这两个过程执行时间也包含在 callTimeOut 计时时间内。取值范围:0~Integer.MAX_VALUE,其中 0 是默认值,表示不设置超时时间;
// 设置整个请求过程最大超时时间为 60s
builder.callTimeout(Duratino.ofSeconds(60));
  • connectTimeout: 设置连接建立的超时时间。该参数计算的是建立与服务端之间 tcp socket 连接的时间。取值范围:0~Integer.MAX_VALUE,其中 0 表示不设置超时时间,默认值为10s
// 设置连接建立超时时间
builder.connectTimeout(Duration.ofSeconds(10));
  • readTimeout: 设置连接的 IO 读操作超时时间。该参数应用于请求中的 TCP socket 和各个 IO 读操作,包括对 Source 和 Response 的读操作。其中 0 表示不设置超时时间,默认值为10s
// 设置读超时时间
builder.readTimeout(Duration.ofSeconds(10));
  • writeTimeout: 设置连接的 IO 写操作超时时间。该参数应用于请求中的各个 IO 写操作,其中 0 表示不设置超时时间,默认值为10s
// 设置写超时时间
builder.writeTimeout(Duration.ofSeconds(10));
  • retryOnConnectionFailure: 是否允许 OkHttp 自动执行失败重连,默认为true。当设置为 true 时,okhttp 会在以下几种可能的请求失败的情况下恢复连接并重新请求:1.IP 地址不可达;2. 过久的池化连接;3. 代理服务器不可达。
builder.retryOnConnectionFailure(true);

1.2 拦截器

OkHttpClient 支持添加多个 HTTP/HTTPS 请求拦截器和 WebSocket 拦截器:

  • addInterceptor: 添加自定义的 HTTP/HTTPS 请求拦截器:
builder.addInterceptor(chain -> chain.proceed(chain.request()));
  • addNetworkInterceptor: 添加自定义的 WebSocket 请求拦截器,该方法添加的拦截器只在请求为 websocket 的情况下有效:
builder.addNetworkInterceptor(chain -> chain.proceed(chain.request()));

1.3 缓存和 Cookie

OkHttp 支持自定义缓存的路径和大小,以及 Cookie 的缓存处理:

  • cache: 设置缓存的路径和缓存空间大小,用于读取和写入已缓存的响应信息 Response
// 设置缓存文件,用于将 HTTP/HTTPS 响应缓存到文件系统从而达到重用的目的以节省时间和网络带宽
builder.cache(new Cache(new File("cache_path"), 1024 * 1024))
  • cookieJar: 设置 Cookie 处理器,用于从 HTTP 响应中接收 Cookie,并且可以将 Cookie 提供给即将发起的请求。该参数默认值为CookieJar.NO_COOKIES,即不处理 Cookie。
// 将 cookie 缓存到内存中
builder.cookieJar(new CookieJar() {private final HashMap<String, List<Cookie>> cookieStore = new HashMap<>();
    @Override
    public void saveFromResponse(final HttpUrl url, final List<Cookie> cookies) {cookieStore.put(url.host(), cookies);
    }
    @Override
    public List<Cookie> loadForRequest(final HttpUrl url) {List<Cookie> cookies = cookieStore.get(url.host());
        return null != cookies ? cookies : new ArrayList<Cookie>();}
})

1.4 域名解析、重定向

  • dns: 设置域名解析服务。默认情况下,OkHttp 内部使用系统提供的域名解析服务。也可以通过该方法设置自定义的域名解析规则,比如屏蔽某些域名请求、或强制解析到固定的 IP 下。
// 自定义 dns 解析,屏蔽百度用域名解析并使用系统提供的 DNS 解析服务解析其他域名
builder.dns(hostname -> {
    // 屏蔽百度链接
    if (hostname.contains("baidu.com")) {List<InetAddress> addresses = new ArrayList<>();
        addresses.add(InetAddress.getByAddress(new byte[]{(byte) 127, (byte) 0, (byte) 0, (byte) 1}));
        return addresses;
    }
    return Dns.SYSTEM.lookup(hostname);
})
  • followRedirects: 设置是否允许请求重定向,默认为 true 允许;
  • followSslRedirects: 设置是否允许 HTTP 与 HTTPS 请求之间互相重定向,默认为 true 允许。区别于HttpURLConnection d 呃这个选项默认是不允许的。
builder.followRedirects(true)
             .followSslRedirects(true);

1.5 ping 心跳机制和协议设置

  • pingInterval: 设置 ping 信号发送时间间隔,该选项一般用于维持 Websocket/Http2 长连接,发送心跳包。默认值为 0 表示禁用心跳机制。
builder.pingInterval(Duration.ofSeconds(59));
  • protocols: 设置 OkHttpClient 使用的协议,默认为 HTTP/ 2 和 HTTP/1.1
builder.protocols(Util.immutableList(Protocol.HTTP_2, Protocol.HTTP_1_1));

1.6 配置连接池、请求调度器

  • connectionPool: 手动配置连接池。当前 OkHttp 版本的连接池默认为容纳最大 5 个连接数,并在连接空闲超时 5 分钟后将其从池中移除。
  • connectionPool: 自定义连接池,当前 OkHttp 版本默认的连接池最大容纳 5 个连接数,并在连接空闲超过 5 分钟后将其回收,从连接池中移除。
// 手动配置连接池,其中 ConnectionPool 第一个参数表示池内容纳的最大连接
builder.connectionPool(new ConnectionPool(5, 5, TimeUnit.MINUTES));
  • dispatcher: 自定义请求任务调度器,多数情况下使用默认的请求调度器即可。当需要手动配置执行请求任务的线程池时可以通过此选项设置实现。
builder.dispatcher(new Dispatcher());
// 手动配置执行请求任务的线程池
builder.dispatcher(new Dispatcher(Executors.newFixedThreadPool(64)));

1.7 为请求设置“上帝视角”:事件监听器

OkHttp 中的EventListener,对于每次请求,犹如 ” 上帝视角 ” 般的存在:如果为 OkHttpClient 设置了 EventListener,则一个请求从发起到结束的所有步骤都会被 EventListener“看”到,请求的完整生命周期事件都会通过 EventListener 对应的接口回调给上层,因此,在开发 debug 阶段,或想要了解一个请求需要经历哪些流程时,也可以通过设置 EventListener 来获取相应信息。

  • eventListenerFactory: 为每一个 Call 指定单独的事件监听器。
// EventListener.NONE 不监听任何事件
builder.eventListenerFactory(call -> EventListener.NONE);
  • eventListener: 为 OkHttpClient 设置统一的事件监听器,该选项对同一个 OkHttpClient 实例的所有 Call 都生效;实际上该选项内部也调用了 eventListenerFactory 方法,为每一个 call 设置了相同的事件监听器:
public Builder eventListener(EventListener eventListener) {if (eventListener == null) throw new NullPointerException("eventListener == null");
  this.eventListenerFactory = EventListener.factory(eventListener);
  return this;
}

1.8 代理设置

可以通过以下三个选项设置请求代理:

  • proxySelector: 设置代理选择策略
builder.proxySelector(ProxySelector.getDefault());
  • proxyAuthenticator: 设置代理身份验证
builder.proxyAuthenticator(Authenticator.NONE);
  • proxy: 设置使用指定代理。该选项优先级高于 proxySelector
builder.proxy(Proxy.NO_PROXY);

1.9 socket 相关

  • socketFactory: 设置自定义的用于创建 socket 连接的 socket 工厂对象,OkHttp 默认使用无参的 SocketFactory.createSocket()重载方法来创建未连接状态的 socket。可以通过该参数设置实现将创建的 socket 绑定到特定的地址。
builder.socketFactory(SocketFactory.getDefault());

1.10 HTTPS 相关

  • certificatePinner: 设置固定证书
// 设置默认固定证书
builder.certificatePinner(CertificatePinner.DEFAULT);
  • connectionSpecs: 设置 TLS 连接规格,主要包含 TLS 版本和密码套件,参考 https://www.jianshu.com/p/f7972c30fc52
builder.connectionSpecs(Util.immutableList(ConnectionSpec.MODERN_TLS, ConnectionSpec.CLEARTEXT));
  • sslSocketFactory: 设置用于创建 SSLSocket 连接的工厂对象和 X509 信任管理:
builder.sslSocketFactory((SSLSocketFactory) SSLSocketFactory.getDefault(), Util.platformTrustManager());

最后,生成 OkHttpClient 对象:

OkHttpClient client = builder.build();

接下来看看 Request 类。

2. Request

Request 封装了请求的内容,包括链接、请求体参数、请求头等,以及请求 tag,比较简单。Request 也是通过 Builder 构建:

// Step 2. 构建一个 Request 用于封装请求地址、请求类型、参数等信息
Request request = new Request.Builder().get()
                                       .url("https://www.baidu.com")
                                       .build();

创建好 OkHttpClientRequest之后,就可以生成请求任务,发起请求了,接下来看 Call 和其实现类RealCall

3. Call 和 RealCall

当前版本的 OkHttp 中,接口 Call 只有 RealCall 这一个实现类。Call 表示一个已经准备好,可以执行的请求任务,Call 执行时可以取消,但一个 Call 只能被执行一次。Call 除了封装分别用于执行 同步 异步 请求的 execute()enqueue(callback) 两个接口外,还封装了其他几个请求相关的接口:

  • Request request(); 获取发起本次请求任务的请求对象 Request;
  • void cancel(); 取消正在执行的本次请求,已经结束的请求不能取消;
  • boolean isExecuted(); 判断请求任务是否已经执行过了,避免同个请求任务调用一次以上;
  • boolean isCanceled(); 判断请求是否已取消;
  • Timeout timeout(); 获取请求任务的超时时间对象,该参数对应于 OkHttpClient.Builder.callTimeOut 方法设置的超时参数;
  • Call clone(); 克隆本次请求为一个新的请求,如果同一个请求任务需要重复执行,可以通过该方法克隆出一个新的 Call 来实现。

接下来看看 RealCall 类源码。

我们在发起 Http 请求时会通过 OkHttpClient 和 Request 对象来创建 Call:

// 创建一个新的请求任务 Call
Call call = httpClient.newCall(request);

而 httpClient.newCall()方法内部通过 RealCall 的静态方法 newCall 创建并返回一个 RealCall 对象,所以在执行 同步 / 异步 请求时实际调用的是 RealCall 中的实现方法:

// OkHttpClient.newCall 
@Override 
public Call newCall(Request request) {return RealCall.newRealCall(this, request, false /* for web socket */);
}

打开 RealCall 源码可以看到,RealCall 内部持有本次请求的 Request 对象和 OkHttpClient 对象,同时还发现,RealCall 还声明了一个 Transmitter 类型的对象并随着 RealCall 的创建而创建。Transmitter 作为 OkHttp 的应用层和网络层的连接,负责对外暴露 OkHttp 中的高级应用程序层原语,包括连接、请求、响应和流等。结合 RealCall 源码可以发现,RealCall 负责的是请求发起和执行,Transmitter 则负责请求任务的状态、超时时间、生命周期事件的更新以及请求任务背后的连接、连接池的维护管理等。

/*RealCall.java: 通过 transmitter 控制超时时间的计算、生命周期步骤更新、取消请求等 */
@Override public Response execute() throws IOException {
  // ... 忽略其他代码
  transmitter.timeoutEnter();
  transmitter.callStart();
  // ... 忽略其他代码
}

@Override public void enqueue(Callback responseCallback) {
 // ... 忽略其他代码
 transmitter.callStart();
 // ... 忽略其他代码
}

@Override public void cancel() {transmitter.cancel();
}

@Override public Timeout timeout() {return transmitter.timeout();
}

再看看 clone()方法实现:

@SuppressWarnings("CloneDoesntCallSuperClone") // We are a final type & this saves clearing state.
@Override public RealCall clone() {return RealCall.newRealCall(client, originalRequest, forWebSocket);
}

可以看到,clone 内部创建了一个新的 Call,相当于调用了(RealCall)client.newCall(request),因此可以用这个克隆的 Call 继续发起请求。

RealCall 还封装了个内部类 AsyncCall 用于执行异步请求,AsyncCall 声明了 CallBack 变量用于回调通知异步请求结果,以及一个线程安全的 AtomicInteger 类型变量 callsPerHost 用于计量同一主机的请求数。通过上一篇的分析知道,AsyncCall 是一个 Runnable 并且最终通过 AsyncCall.execute() 方法执行网络请求。那 RealCall 内部是怎样执行到这个这个方法的呢?我们之前是通过快速跳转实现的方式找到了这个方法的,而 AsyncCall 封装的异步请求任务是在 RealCall.enqueue 执行时被添加到 Dispatch 中的请求队列:

// RealCall.enqueue()
@Override public void enqueue(Callback responseCallback) {
 // ... 忽略无关代码
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

那么只要搞清楚 Dispatcher.enqueue()背后的队列中的任务何时何地执行的,上面的问题就有答案了。

4. Dispatcher

Dispatcher是 OkHttp 中的请求任务调度器,内部维护了一个线程池和相关的请求队列用于实现高并发的异步请求:

public final class Dispatcher {
  // 最大并发请求数,默认为 64 个
  private int maxRequests = 64;
  // 相同服务器主机的最大并发请求数,默认为 5 个
  private int maxRequestsPerHost = 5;
  // 空闲回调,如果设置,则当该调度器空闲时 (正在执行的任务数变为 0) 时回调通知 idleCallback.run()
  private @Nullable Runnable idleCallback;
  // 执行异步任务的线程池
  private @Nullable ExecutorService executorService;
  // 存放已经准备就绪可以执行,但尚未执行的异步请求任务的双向队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  // 存放正在执行的异步请求任务,包括已经取消但未完全结束的请求的双向队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  // 存放正在执行的同步请求任务,包括已经取消但未完全结束的请求的双向队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  //... 其他代码
}

其中线程池对象通过懒加载方式创建:

public synchronized ExecutorService executorService() {if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                                             new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

可以看到,executorService 实际创建的是一个无边界、核心线程数为 0 的线程池。其中池内线程空闲等待时长为 60s,超过空闲时间自动结束;工作队列 workQueue 为SynchronousQueue 类型的队列,该队列是一个同步队列,保证并发的任务顺序执行,且该类型队列内部不存储值,只作传递,由于 executorService 创建的线程数无限制,不会有队列等待,所以使用 SynchronousQueue(参考 Executors.newCachedThread()创建缓存线程池);

参考 https://www.jianshu.com/p/074dff0f4ecb:

SynchronousQueue 每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此队列内部其实没有任何一个元素,或者说容量为 0,严格说并不是一种容器,由于队列没有容量,因此不能调用 peek 等操作,因此只有移除元素才有元素,显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入者 (生产者) 传递给移除者(消费者), 这在多任务队列中最快的处理任务方式。对于高频请求场景,无疑是最合适的。

在 OKHttp 中,创建了一个阀值是 Integer.MAX_VALUE 的线程池,它不保留任何最小线程,随时创建更多的线程数,而且如果线程空闲后,只能多活 60 秒。所以也就说如果收到 20 个并发请求,线程池会创建 20 个线程,当完成后的 60 秒后会自动关闭所有 20 个线程。他这样设计成不设上限的线程,以保证 I / O 任务中高阻塞低占用的过程,不会长时间卡在阻塞上。

接着第 3 节 RealCall 分析最后的问题,来看看 Dispatcher.enqueue()方法的实现:

// Dispatcher.enqueue()
void enqueue(AsyncCall call) {synchronized (this) {
    // 将新的异步请求任务添加到 readyAsyncCalls 队列
    readyAsyncCalls.add(call);
        // 修改 call,使其共享 runningAsyncCalls 或 readyAsyncCalls 中现有的请求相同主机的 call.callsPerHost 变量
    if (!call.get().forWebSocket) {AsyncCall existingCall = findExistingCallWithHost(call.host());
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
    }
  }
  // 执行 AsyncCall 请求调度策略,发起 call 请求
  promoteAndExecute();}

对于新请求入列的异步请求任务 call,首先将其添加到 readyAsyncCalls 队列,以表示这个 call 准备就绪,可以执行请求;接着修改这个 call 的callsPerHost 属性为与先前添加的相同主机请求任务的 call 共享,从而实现对相同主机的请求计数,对相同主机的最大并发请求数进行限制。接着调用 promoteAndExecute() 方法,将 readyAsyncCalls 队列中的任务提升到 runningAsyncCalls,并执行请求:

private boolean promoteAndExecute() {assert (!Thread.holdsLock(this));
  // 创建可执行任务列表,用于筛选出 readyAsyncCalls 中可执行的任务
  List<AsyncCall> executableCalls = new ArrayList<>();
  boolean isRunning;
  synchronized (this) {
    // 遍历筛选 readyAsyncCalls
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext();) {AsyncCall asyncCall = i.next();
            // 如果 runningAsyncCalls 中的请求任务数超过最大并发请求数限制 maxRequests 则任务继续放在 readyAsyncCalls 中等待执行
      if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      // 如果与 asyncCall 相同主机的请求数超过最大并发同主机请求数则,则不执行该请求
      if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
            // 从 readyAsyncCalls 中移除符合条件的请求任务
      i.remove();
      // 将 asyncCall 关联的主机请求数增 1
      asyncCall.callsPerHost().incrementAndGet();
      // 加入可执行请求任务列表
      executableCalls.add(asyncCall);
      // 加入 runningAsyncCalls 任务队列
      runningAsyncCalls.add(asyncCall);
    }
    isRunning = runningCallsCount() > 0;}
    // 遍历执行请求任务
  for (int i = 0, size = executableCalls.size(); i < size; i++) {AsyncCall asyncCall = executableCalls.get(i);
    asyncCall.executeOn(executorService());
  }

  return isRunning;
}

可以看到,Dispatcher 请求调度器最终是在 promoteAndExecute()方法中实现最大并发请求数量和最大并发同主机请求数量限制的。在方法的最后遍历执行请求任务,调用了每一个 AsyncCall 的 executeOn()方法并将当前 Dispatcher 的线程池作为参数传入,在看看这个 AsyncCall.executeOn(executorService)方法的实现:

// RealCall.AsyncCall.executeOn()
void executeOn(ExecutorService executorService) {assert (!Thread.holdsLock(client.dispatcher()));
  boolean success = false;
  try {
    // 通过 executorService 线程池执行请求任务
    executorService.execute(this);
    success = true;
  } catch (RejectedExecutionException e) {InterruptedIOException ioException = new InterruptedIOException("executor rejected");
    ioException.initCause(e);
    // 发生异常时关闭连接
    transmitter.noMoreExchanges(ioException);
    responseCallback.onFailure(RealCall.this, ioException);
  } finally {
    // 最后如果请求任务执行失败则结束任务,从 runningAsyncCalls 中将任务移除
    if (!success) {client.dispatcher().finished(this); // This call is no longer running!
    }
  }
}

AsyncCall.executeOn()方法内部调用线程池的 execute 方法执行本次任务,由此触发 AsyncCall 父类的 run 方法,并执行到 AsyncCall 的 execute()方法,完成了本次请求!因此,AsyncCall.execute()调用过程大致如下:

最后,调用 Dispatcher.finished(call)方法结束本次请求:

@Override protected void execute() {
  // ... 忽略无关代码
  try {// ... 忽略无关代码} catch (IOException e) {// ... 忽略无关代码} finally {
    // 结束本次请求
    client.dispatcher().finished(this);
  }
}
// Dispatcher.finished(call)
void finished(AsyncCall call) {
  // 将 call 同主机并发请求数减 1
  call.callsPerHost().decrementAndGet();
  // 结束本次请求任务,主动将 call 从 runningAsyncCalls 队列移除
  finished(runningAsyncCalls, call);
}

以上从 Dispatcher.enqueue()开始到 Dispatcher.finished(call)结束就是 Dispatcher 调度异步请求的过程。Dispatcher 对同步请求的调度执行就简单多了,单线程任务,直接从 RealCall.execute 跟进分析即可。不管是同步还是异步请求,最终都是通过 RealCall 的 getResponseWithInterceptorChain()方法完成请求和获取响应结果的,那 getResponseWithInterceptorChain()方法内部是如何通过拦截器链完成请求的呢?下一篇就来分析分析 OkHttp 中的拦截器 Interceptor。

5. The End :)

欢迎关注个人公众号:

退出移动版