一、引言
在现代网络应用开发中,高效的任务调度机制对于提升系统性能和用户体验至关重要。OkHttp 作为一款广泛使用的高性能 HTTP 客户端库,其任务调度模块在处理网络请求的并发、排队和执行等方面发挥着关键作用。本文将深入 OkHttp 源码,详细剖析其任务调度模块的实现原理和工作流程。
二、任务调度模块概述
2.1 核心功能
OkHttp 的任务调度模块主要负责管理和调度 HTTP 请求任务,其核心功能包括:
- 并发控制:限制同时执行的请求数量,避免过多请求耗尽系统资源。
- 任务排队:当并发请求数量达到上限时,将新请求放入队列等待执行。
- 异步执行:支持异步请求,通过线程池管理请求的执行,避免阻塞主线程。
2.2 主要组件
任务调度模块的主要组件包括:
- Dispatcher:调度器,负责任务的分发、排队和执行管理。
- RealCall:表示一个实际的 HTTP 请求调用。
- AsyncCall:RealCall 的异步执行包装类。
- 线程池:用于执行异步请求任务。
2.3 整体架构
OkHttp 任务调度模块的整体架构可以分为以下几个层次:
客户端层:应用代码通过
OkHttpClient
发起 HTTP 请求。调度层:
Dispatcher
类负责接收请求任务,对任务进行调度和管理。执行层:
RealCall
和AsyncCall
类负责具体的请求执行。
三、Dispatcher 类源码分析
3.1 类定义及属性
java
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// Dispatcher 类,负责 HTTP 请求任务的调度和管理
public final class Dispatcher {
// 最大并发请求数,默认值为 64
private int maxRequests = 64;
// 每个主机的最大并发请求数,默认值为 5
private int maxRequestsPerHost = 5;
// 空闲线程的存活时间,默认值为 5 分钟
private long keepAliveDurationNs = TimeUnit.MINUTES.toNanos(5);
// 线程池,用于执行异步请求任务
private ExecutorService executorService;
// 准备执行的异步请求队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
// 正在执行的异步请求队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
// 正在执行的同步请求队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
// 空闲回调函数,当没有正在执行的请求时会被调用
private Runnable idleCallback;
}
在 Dispatcher
类中,定义了一些重要的属性:
maxRequests
和maxRequestsPerHost
用于控制并发请求数量,防止资源过度占用。keepAliveDurationNs
用于设置空闲线程的存活时间,避免频繁创建和销毁线程。executorService
是线程池,用于执行异步请求任务。readyAsyncCalls
、runningAsyncCalls
和runningSyncCalls
分别用于存储准备执行的异步请求、正在执行的异步请求和正在执行的同步请求。idleCallback
是一个回调函数,当没有正在执行的请求时会被调用。
3.2 构造函数
java
/**
* 默认构造函数,使用默认参数初始化 Dispatcher
*/
public Dispatcher() {
this(64, 5, 5, TimeUnit.MINUTES);
}
/**
* 自定义参数的构造函数,允许用户指定最大并发请求数、每个主机的最大并发请求数、
* 空闲线程的存活时间和时间单位
*
* @param maxRequests 最大并发请求数
* @param maxRequestsPerHost 每个主机的最大并发请求数
* @param keepAliveDuration 空闲线程的存活时间
* @param timeUnit 时间单位
*/
public Dispatcher(int maxRequests, int maxRequestsPerHost, long keepAliveDuration, TimeUnit timeUnit) {
this.maxRequests = maxRequests;
this.maxRequestsPerHost = maxRequestsPerHost;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
}
构造函数提供了两种初始化方式,一种是使用默认参数,另一种是允许用户自定义参数。
3.3 线程池的获取与创建
java
/**
* 获取线程池,如果线程池未初始化,则进行初始化
*
* @return 线程池实例
*/
public synchronized ExecutorService executorService() {
// 检查线程池是否已经初始化
if (executorService == null) {
// 创建一个线程池,核心线程数为 0,最大线程数为 Integer.MAX_VALUE
// 使用 SynchronousQueue 作为任务队列,空闲线程存活时间为 60 秒
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
executorService()
方法用于获取线程池。如果线程池未初始化,会创建一个新的线程池。这里使用 SynchronousQueue
作为任务队列,意味着线程池会立即为新任务创建线程,直到达到最大线程数。
3.4 异步请求的调度
3.4.1 enqueue 方法
java
/**
* 异步执行请求的方法
*
* @param call 异步请求对象
*/
synchronized void enqueue(AsyncCall call) {
// 检查当前正在执行的异步请求数量和每个主机的并发请求数量是否满足条件
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
// 如果满足条件,将请求添加到正在执行的异步请求队列中
runningAsyncCalls.add(call);
// 使用线程池执行请求
executorService().execute(call);
} else {
// 如果不满足条件,将请求添加到准备执行的异步请求队列中
readyAsyncCalls.add(call);
}
}
/**
* 计算指定异步请求对应的主机的正在执行的请求数量
*
* @param call 异步请求
* @return 该主机的正在执行的请求数量
*/
private int runningCallsForHost(AsyncCall call) {
int result = 0;
// 遍历正在执行的异步请求队列
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) {
// 如果主机相同,则计数器加 1
result++;
}
}
return result;
}
enqueue
方法用于异步执行请求。它会检查当前正在执行的异步请求数量和每个主机的并发请求数量是否满足条件。如果满足条件,将请求添加到 runningAsyncCalls
队列中,并使用线程池执行该请求;如果不满足条件,将请求添加到 readyAsyncCalls
队列中等待执行。runningCallsForHost
方法用于计算指定异步请求对应的主机的正在执行的请求数量。
3.4.2 finished 方法
java
/**
* 异步请求执行完成的回调方法
*
* @param call 完成的异步请求对象
*/
void finished(AsyncCall call) {
// 调用内部的 finished 方法处理请求完成逻辑
finished(runningAsyncCalls, call, true);
}
/**
* 处理请求完成的通用方法
*
* @param calls 正在执行的请求队列
* @param call 完成的请求
* @param promoteCalls 是否尝试从准备队列中取出请求执行
*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
// 同步操作,确保线程安全
synchronized (this) {
// 从正在执行的请求队列中移除完成的请求
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
// 计算当前正在执行的请求数量
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
// 如果需要尝试从准备队列中取出请求执行
if (promoteCalls) {
// 调用 promoteCalls 方法尝试从准备队列中取出请求执行
promoteCalls();
}
// 如果没有正在执行的请求,且设置了空闲回调,则执行回调
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
/**
* 尝试从准备队列中取出请求执行
*/
private void promoteCalls() {
// 如果正在执行的异步请求数量已经达到最大并发请求数,直接返回
if (runningAsyncCalls.size() >= maxRequests) return;
// 如果准备执行的异步请求队列为空,直接返回
if (readyAsyncCalls.isEmpty()) return;
// 遍历准备执行的异步请求队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
// 检查该请求对应的主机的并发请求数量是否满足条件
if (runningCallsForHost(call) < maxRequestsPerHost) {
// 从准备队列中移除该请求
i.remove();
// 将请求添加到正在执行的异步请求队列中
runningAsyncCalls.add(call);
// 使用线程池执行该请求
executorService().execute(call);
}
// 如果正在执行的异步请求数量已经达到最大并发请求数,停止遍历
if (runningAsyncCalls.size() >= maxRequests) return;
}
}
/**
* 计算当前正在执行的请求数量
*
* @return 正在执行的请求数量
*/
private int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
finished
方法用于处理异步请求执行完成的情况。它会从 runningAsyncCalls
队列中移除完成的请求,并调用 promoteCalls
方法尝试从 readyAsyncCalls
队列中取出请求执行。promoteCalls
方法会遍历 readyAsyncCalls
队列,检查每个请求对应的主机的并发请求数量是否满足条件,如果满足条件,则将请求从 readyAsyncCalls
队列中移除,添加到 runningAsyncCalls
队列中,并使用线程池执行该请求。
3.5 同步请求的调度
3.5.1 executed 方法
java
/**
* 同步执行请求的方法
*
* @param call 同步请求对象
*/
synchronized void executed(RealCall call) {
// 将请求添加到正在执行的同步请求队列中
runningSyncCalls.add(call);
}
/**
* 同步请求执行完成的回调方法
*
* @param call 完成的同步请求对象
*/
void finished(RealCall call) {
// 调用内部的 finished 方法处理请求完成逻辑
finished(runningSyncCalls, call, false);
}
executed
方法用于同步执行请求,它会将请求添加到 runningSyncCalls
队列中。finished
方法用于处理同步请求执行完成的情况,它会从 runningSyncCalls
队列中移除完成的请求。
3.6 其他方法
java
/**
* 设置空闲回调函数
*
* @param idleCallback 空闲回调函数
*/
public void setIdleCallback(Runnable idleCallback) {
this.idleCallback = idleCallback;
}
/**
* 获取最大并发请求数
*
* @return 最大并发请求数
*/
public int getMaxRequests() {
return maxRequests;
}
/**
* 设置最大并发请求数
*
* @param maxRequests 最大并发请求数
*/
public void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("maxRequests < 1: " + maxRequests);
}
synchronized (this) {
this.maxRequests = maxRequests;
}
promoteCalls();
}
/**
* 获取每个主机的最大并发请求数
*
* @return 每个主机的最大并发请求数
*/
public int getMaxRequestsPerHost() {
return maxRequestsPerHost;
}
/**
* 设置每个主机的最大并发请求数
*
* @param maxRequestsPerHost 每个主机的最大并发请求数
*/
public void setMaxRequestsPerHost(int maxRequestsPerHost) {
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("maxRequestsPerHost < 1: " + maxRequestsPerHost);
}
synchronized (this) {
this.maxRequestsPerHost = maxRequestsPerHost;
}
promoteCalls();
}
/**
* 获取准备执行的异步请求队列
*
* @return 准备执行的异步请求队列
*/
public synchronized List<Call> readyCalls() {
List<Call> result = new ArrayList<>();
for (AsyncCall asyncCall : readyAsyncCalls) {
result.add(asyncCall.get());
}
return result;
}
/**
* 获取正在执行的异步请求队列
*
* @return 正在执行的异步请求队列
*/
public synchronized List<Call> runningCalls() {
List<Call> result = new ArrayList<>();
result.addAll(runningSyncCalls);
for (AsyncCall asyncCall : runningAsyncCalls) {
result.add(asyncCall.get());
}
return result;
}
/**
* 获取准备执行的异步请求数量
*
* @return 准备执行的异步请求数量
*/
public synchronized int readyCallsCount() {
return readyAsyncCalls.size();
}
/**
* 获取正在执行的请求数量
*
* @return 正在执行的请求数量
*/
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
这些方法提供了对 Dispatcher
类的一些属性的访问和修改功能,例如设置最大并发请求数、每个主机的最大并发请求数,获取准备执行的请求队列、正在执行的请求队列等。
四、RealCall 类源码分析
4.1 类定义及属性
java
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.internal.NamedRunnable;
import okhttp3.internal.cache.CacheInterceptor;
import okhttp3.internal.connection.ConnectInterceptor;
import okhttp3.internal.connection.StreamAllocation;
import okhttp3.internal.http.BridgeInterceptor;
import okhttp3.internal.http.CallServerInterceptor;
import okhttp3.internal.http.RealInterceptorChain;
import okhttp3.internal.http.RetryAndFollowUpInterceptor;
import okhttp3.internal.platform.Platform;
import java.io.IOException;
import java.util.List;
// RealCall 类,表示一个实际的 HTTP 请求调用
final class RealCall implements Call {
// OkHttpClient 实例,用于获取配置信息
final OkHttpClient client;
// 重试和重定向拦截器
final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;
// 原始请求对象
final Request originalRequest;
// 标记该请求是否已经被执行
boolean executed;
// 用于 WebSocket 请求的标记
final boolean forWebSocket;
// 事件监听器,用于监听请求的各个阶段事件
private EventListener eventListener;
}
RealCall
类表示一个实际的 HTTP 请求调用。它包含了 OkHttpClient
实例、原始请求对象、重试和重定向拦截器等重要属性。executed
标记用于确保一个请求只能被执行一次。forWebSocket
标记用于区分是否是 WebSocket 请求。eventListener
用于监听请求的各个阶段事件。
4.2 构造函数
java
/**
* 构造函数,初始化 RealCall 实例
*
* @param client OkHttpClient 实例
* @param originalRequest 原始请求对象
* @param forWebSocket 是否为 WebSocket 请求
*/
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
// 创建重试和重定向拦截器
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
构造函数用于初始化 RealCall
实例,同时创建重试和重定向拦截器。
4.3 同步请求执行方法
java
@Override
public Response execute() throws IOException {
// 同步操作,确保该请求只被执行一次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
// 标记请求开始
captureCallStackTrace();
// 记录请求开始时间
eventListener.callStart(this);
try {
// 调用 Dispatcher 的 executed 方法,将该请求添加到正在执行的同步请求队列中
client.dispatcher().executed(this);
// 调用 getResponseWithInterceptorChain 方法获取响应
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
// 记录请求失败事件
eventListener.callFailed(this, e);
throw e;
} finally {
// 调用 Dispatcher 的 finished 方法,处理请求完成逻辑
client.dispatcher().finished(this);
}
}
/**
* 捕获请求调用的堆栈信息
*/
private void captureCallStackTrace() {
Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}
/**
* 通过拦截器链获取响应
*
* @return 响应对象
* @throws IOException 如果请求过程中出现 I/O 异常
*/
Response getResponseWithInterceptorChain() throws IOException {
// 创建拦截器列表
List<Interceptor> interceptors = new ArrayList<>();
// 添加用户自定义的拦截器
interceptors.addAll(client.interceptors());
// 添加重试和重定向拦截器
interceptors.add(retryAndFollowUpInterceptor);
// 添加桥接拦截器,将用户请求转换为符合 HTTP 协议的请求
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// 添加缓存拦截器,处理缓存逻辑
interceptors.add(new CacheInterceptor(client.internalCache()));
// 添加连接拦截器,建立与服务器的连接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
// 如果不是 WebSocket 请求,添加用户自定义的网络拦截器
interceptors.addAll(client.networkInterceptors());
}
// 添加调用服务器拦截器,负责与服务器进行实际的交互
interceptors.add(new CallServerInterceptor(forWebSocket));
// 创建拦截器链
RealInterceptorChain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 调用拦截器链的 proceed 方法开始处理请求
return chain.proceed(originalRequest);
}
execute
方法用于同步执行请求。首先,它会检查该请求是否已经被执行,如果已经执行则抛出异常。然后,调用 captureCallStackTrace
方法捕获请求调用的堆栈信息,记录请求开始时间。接着,将该请求添加到正在执行的同步请求队列中,并调用 getResponseWithInterceptorChain
方法通过拦截器链获取响应。如果请求过程中出现异常,会记录请求失败事件。最后,调用 finished
方法处理请求完成逻辑。
getResponseWithInterceptorChain
方法会创建一个拦截器列表,依次添加用户自定义的拦截器、重试和重定向拦截器、桥接拦截器、缓存拦截器、连接拦截器、网络拦截器和调用服务器拦截器。然后创建一个 RealInterceptorChain
实例,并调用其 proceed
方法开始处理请求。
4.4 异步请求执行方法
java
@Override
public void enqueue(Callback responseCallback) {
// 同步操作,确保该请求只被执行一次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
// 标记请求开始
captureCallStackTrace();
// 记录请求开始时间
eventListener.callStart(this);
// 创建 AsyncCall 实例,用于异步执行请求
AsyncCall asyncCall = new AsyncCall(responseCallback);
// 调用 Dispatcher 的 enqueue 方法,将异步请求添加到调度队列中
client.dispatcher().enqueue(asyncCall);
}
// AsyncCall 类,继承自 NamedRunnable,用于异步执行请求
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
/**
* 构造函数,初始化 AsyncCall 实例
*
* @param responseCallback 响应回调函数
*/
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
/**
* 获取请求的主机地址
*
* @return 请求的主机地址
*/
String host() {
return originalRequest.url().host();
}
/**
* 获取原始请求对象
*
* @return 原始请求对象
*/
Request request() {
return originalRequest;
}
/**
* 获取 RealCall 实例
*
* @return RealCall 实例
*/
RealCall get() {
return RealCall.this;
}
@Override
protected void execute() {
boolean signalledCallback = false;
try {
// 调用 getResponseWithInterceptorChain 方法获取响应
Response response = getResponseWithInterceptorChain();
// 检查请求是否被取消
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
// 调用回调函数的 onFailure 方法
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
// 调用回调函数的 onResponse 方法
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
// 记录请求失败事件
eventListener.callFailed(RealCall.this, e);
// 调用回调函数的 onFailure 方法
responseCallback.onFailure(RealCall.this, e);
}
} finally {
// 调用 Dispatcher 的 finished 方法,处理请求完成逻辑
client.dispatcher().finished(this);
}
}
}
enqueue
方法用于异步执行请求。首先,它会检查该请求是否已经被执行,如果已经执行则抛出异常。然后,调用 captureCallStackTrace
方法捕获请求调用的堆栈信息,记录请求开始时间。接着,创建一个 AsyncCall
实例,并将其添加到调度队列中。
AsyncCall
类继承自 NamedRunnable
,用于异步执行请求。在 execute
方法中,它会调用 getResponseWithInterceptorChain
方法获取响应,并根据请求是否被取消调用回调函数的 onFailure
或 onResponse
方法。如果请求过程中出现异常,会记录请求失败事件。最后,调用 finished
方法处理请求完成逻辑。
五、线程池的使用和管理
5.1 线程池的创建和配置
java
// 在 Dispatcher 类中
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
这里使用了 ThreadPoolExecutor
来创建线程池,具体配置如下:
- 核心线程数:0,表示线程池在没有任务时不会保留任何线程。
- 最大线程数:
Integer.MAX_VALUE
,表示线程池可以创建的最大线程数没有限制。 - 空闲线程存活时间:60 秒,当线程空闲超过 60 秒时,会被销毁。
- 任务队列:
SynchronousQueue
,这是一个没有容量的阻塞队列,每个插入操作必须等待另一个线程的移除操作,反之亦然。这意味着线程池会立即为新任务创建线程,直到达到最大线程数。 - 线程工厂:
Util.threadFactory("OkHttp Dispatcher", false)
,用于创建线程,线程名称为OkHttp Dispatcher
。
5.2 线程池的任务执行和管理
在 Dispatcher
类的 enqueue
方法中,当满足条件时,会将异步请求添加到正在
接着继续分析
六、任务调度的并发控制原理
6.1 并发控制的整体逻辑
OkHttp 的任务调度模块通过 Dispatcher
类实现了精细的并发控制,主要体现在对最大并发请求数(maxRequests
)和每个主机的最大并发请求数(maxRequestsPerHost
)的限制上。这两个参数的设置可以避免过多的请求同时发起,从而防止系统资源被过度占用,确保应用的稳定性和性能。
6.2 最大并发请求数的控制
在 Dispatcher
类的 enqueue
方法中,会检查当前正在执行的异步请求数量是否超过 maxRequests
:
java
synchronized void enqueue(AsyncCall call) {
// 检查当前正在执行的异步请求数量和每个主机的并发请求数量是否满足条件
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
// 如果满足条件,将请求添加到正在执行的异步请求队列中
runningAsyncCalls.add(call);
// 使用线程池执行请求
executorService().execute(call);
} else {
// 如果不满足条件,将请求添加到准备执行的异步请求队列中
readyAsyncCalls.add(call);
}
}
当 runningAsyncCalls
的大小小于 maxRequests
时,说明当前并发请求数量未达到上限,新的请求可以被立即执行;否则,新请求会被放入 readyAsyncCalls
队列中等待。
6.3 每个主机的最大并发请求数的控制
runningCallsForHost
方法用于计算指定异步请求对应的主机的正在执行的请求数量:
java
private int runningCallsForHost(AsyncCall call) {
int result = 0;
// 遍历正在执行的异步请求队列
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) {
// 如果主机相同,则计数器加 1
result++;
}
}
return result;
}
在 enqueue
方法中,除了检查最大并发请求数,还会调用 runningCallsForHost
方法检查当前主机的并发请求数量是否超过 maxRequestsPerHost
。如果超过,则新请求同样会被放入 readyAsyncCalls
队列中等待。
6.4 并发控制的动态调整
Dispatcher
类提供了 setMaxRequests
和 setMaxRequestsPerHost
方法,允许开发者动态调整最大并发请求数和每个主机的最大并发请求数:
java
public void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("maxRequests < 1: " + maxRequests);
}
synchronized (this) {
this.maxRequests = maxRequests;
}
promoteCalls();
}
public void setMaxRequestsPerHost(int maxRequestsPerHost) {
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("maxRequestsPerHost < 1: " + maxRequestsPerHost);
}
synchronized (this) {
this.maxRequestsPerHost = maxRequestsPerHost;
}
promoteCalls();
}
在调整这些参数后,会调用 promoteCalls
方法,尝试从 readyAsyncCalls
队列中取出满足条件的请求执行,以确保并发控制的动态调整能够及时生效。
七、任务排队和优先级处理
7.1 任务排队机制
当新的请求不满足并发条件时,会被添加到 readyAsyncCalls
队列中等待执行。readyAsyncCalls
是一个 ArrayDeque
,它是一个双端队列,具有先进先出(FIFO)的特性。这意味着先进入队列的请求会先被处理,保证了请求的公平性。
7.2 任务优先级处理
OkHttp 本身并没有直接提供任务优先级处理的机制,所有请求默认按照 FIFO 的顺序执行。不过,开发者可以通过自定义拦截器或者对请求进行包装,来实现简单的优先级处理。例如,可以为每个请求添加一个优先级标记,在 Dispatcher
类的 promoteCalls
方法中,根据优先级从 readyAsyncCalls
队列中选择请求执行。
以下是一个简单的示例,展示如何实现基于优先级的任务调度:
java
// 自定义一个带有优先级的 AsyncCall 类
class PriorityAsyncCall extends AsyncCall {
private final int priority;
public PriorityAsyncCall(Callback responseCallback, int priority) {
super(responseCallback);
this.priority = priority;
}
public int getPriority() {
return priority;
}
}
// 修改 Dispatcher 类的 promoteCalls 方法
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return;
if (readyAsyncCalls.isEmpty()) return;
// 对准备执行的队列按照优先级排序
List<PriorityAsyncCall> sortedCalls = new ArrayList<>();
for (AsyncCall call : readyAsyncCalls) {
if (call instanceof PriorityAsyncCall) {
sortedCalls.add((PriorityAsyncCall) call);
}
}
sortedCalls.sort((c1, c2) -> Integer.compare(c2.getPriority(), c1.getPriority()));
for (PriorityAsyncCall call : sortedCalls) {
if (runningCallsForHost(call) < maxRequestsPerHost) {
readyAsyncCalls.remove(call);
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return;
}
}
在这个示例中,我们自定义了一个 PriorityAsyncCall
类,为每个请求添加了一个优先级标记。然后修改了 Dispatcher
类的 promoteCalls
方法,在从 readyAsyncCalls
队列中选择请求时,按照优先级进行排序,优先执行优先级高的请求。
八、任务调度与拦截器链的协同工作
8.1 拦截器链的作用
OkHttp 的拦截器链是一个强大的机制,它允许开发者在请求发送和响应返回的过程中插入自定义的逻辑。拦截器链由多个拦截器组成,每个拦截器都可以对请求进行修改、记录日志、处理缓存等操作。
8.2 任务调度与拦截器链的结合点
在 RealCall
类的 execute
和 enqueue
方法中,都会调用 getResponseWithInterceptorChain
方法来获取响应。这个方法会创建一个拦截器链,并依次调用每个拦截器的 intercept
方法:
java
Response getResponseWithInterceptorChain() throws IOException {
// 创建拦截器列表
List<Interceptor> interceptors = new ArrayList<>();
// 添加用户自定义的拦截器
interceptors.addAll(client.interceptors());
// 添加重试和重定向拦截器
interceptors.add(retryAndFollowUpInterceptor);
// 添加桥接拦截器,将用户请求转换为符合 HTTP 协议的请求
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// 添加缓存拦截器,处理缓存逻辑
interceptors.add(new CacheInterceptor(client.internalCache()));
// 添加连接拦截器,建立与服务器的连接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
// 如果不是 WebSocket 请求,添加用户自定义的网络拦截器
interceptors.addAll(client.networkInterceptors());
}
// 添加调用服务器拦截器,负责与服务器进行实际的交互
interceptors.add(new CallServerInterceptor(forWebSocket));
// 创建拦截器链
RealInterceptorChain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 调用拦截器链的 proceed 方法开始处理请求
return chain.proceed(originalRequest);
}
在任务调度的过程中,拦截器链会在请求执行的各个阶段发挥作用。例如,缓存拦截器可以在请求发送之前检查缓存,如果缓存命中,则直接返回缓存的响应,避免了不必要的网络请求;重试和重定向拦截器可以在请求失败或者需要重定向时,自动进行重试或者重定向操作。
8.3 拦截器对任务调度的影响
拦截器的执行时间和逻辑会影响任务调度的性能。如果某个拦截器的执行时间过长,会导致整个请求的处理时间增加,从而影响后续请求的执行。因此,在编写拦截器时,需要注意优化拦截器的逻辑,避免在拦截器中进行耗时的操作。
九、异常处理和重试机制
9.1 异常处理
在 RealCall
类的 execute
和 enqueue
方法中,都会对请求过程中可能出现的异常进行捕获和处理。例如,在 execute
方法中:
java
@Override
public Response execute() throws IOException {
// 同步操作,确保该请求只被执行一次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
// 标记请求开始
captureCallStackTrace();
// 记录请求开始时间
eventListener.callStart(this);
try {
// 调用 Dispatcher 的 executed 方法,将该请求添加到正在执行的同步请求队列中
client.dispatcher().executed(this);
// 调用 getResponseWithInterceptorChain 方法获取响应
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
// 记录请求失败事件
eventListener.callFailed(this, e);
throw e;
} finally {
// 调用 Dispatcher 的 finished 方法,处理请求完成逻辑
client.dispatcher().finished(this);
}
}
如果在请求过程中出现 IOException
,会记录请求失败事件,并将异常抛出。在 AsyncCall
类的 execute
方法中,也会对异常进行类似的处理:
java
@Override
protected void execute() {
boolean signalledCallback = false;
try {
// 调用 getResponseWithInterceptorChain 方法获取响应
Response response = getResponseWithInterceptorChain();
// 检查请求是否被取消
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
// 调用回调函数的 onFailure 方法
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
// 调用回调函数的 onResponse 方法
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
// 记录请求失败事件
eventListener.callFailed(RealCall.this, e);
// 调用回调函数的 onFailure 方法
responseCallback.onFailure(RealCall.this, e);
}
} finally {
// 调用 Dispatcher 的 finished 方法,处理请求完成逻辑
client.dispatcher().finished(this);
}
}
9.2 重试机制
OkHttp 的重试机制主要由 RetryAndFollowUpInterceptor
类实现。该拦截器会在请求失败或者需要重定向时,自动进行重试或者重定向操作。在 RetryAndFollowUpInterceptor
类的 intercept
方法中,会对响应的状态码进行检查,如果需要重试或者重定向,会重新发起请求:
java
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// 处理路由异常,尝试重试
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// 处理其他 I/O 异常,尝试重试
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) {
throw e;
}
releaseConnection = false;
continue;
} finally {
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// 处理重定向
Request followUp = followUpRequest(response, streamAllocation.route());
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
在这个方法中,会捕获 RouteException
和其他 IOException
,并调用 recover
方法尝试重试。同时,会检查响应的状态码,如果需要重定向,会生成一个新的请求并重新发起。
十、任务调度模块的性能优化和注意事项
10.1 性能优化建议
- 合理配置线程池:线程池的配置对任务调度的性能有很大影响。可以根据应用的实际情况,调整线程池的核心线程数、最大线程数和空闲线程存活时间,避免过多的线程创建和销毁带来的性能开销。
- 优化拦截器逻辑:拦截器的执行时间会影响请求的处理速度。在编写拦截器时,需要注意优化拦截器的逻辑,避免在拦截器中进行耗时的操作。
- 控制并发请求数量:合理设置
maxRequests
和maxRequestsPerHost
参数,避免过多的并发请求对服务器和系统资源造成过大的压力。
10.2 注意事项
- 线程安全问题:
Dispatcher
类中的部分方法使用了synchronized
关键字进行同步,以确保线程安全。在使用和扩展任务调度模块时,需要注意线程安全问题,避免出现数据不一致的情况。 - 异常处理:在请求过程中可能会出现各种异常,需要在代码中进行充分的异常处理,避免应用崩溃。同时,要注意对异常信息的记录和分析,以便及时发现和解决问题。
- 资源管理:在请求完成后,需要及时释放相关的资源,如网络连接、缓存等,避免资源泄漏。
十一、总结
OkHttp 的任务调度模块通过 Dispatcher
类实现了高效的任务调度和并发控制。它利用线程池来执行异步请求,通过队列来管理请求的排队和执行顺序,同时结合拦截器链实现了请求的预处理、缓存处理和重试重定向等功能。在使用 OkHttp 的任务调度模块时,开发者需要深入理解其原理和机制,合理配置参数,优化代码逻辑,以确保应用的性能和稳定性。随着网络技术的不断发展,OkHttp 的任务调度模块也可能会不断优化和改进,为开发者提供更强大、更灵活的功能。