OkHttp主流程分析

OkHttp介绍

https://github.com/square/okhttp

An HTTP & HTTP/2 client for Android and Java applications.

OkHttp是支持HTTP、HTTP2的网络请求库。这里解析的是OkHttp 3.11.0版本的代码。

使用方式:

  1. 创建client

    1
    OkHttpClient client = new OkHttpClient();
  2. 创建网络请求

    1
    2
    3
    Request request = new Request.Builder()
    .url(ENDPOINT)
    .build();
  3. 发送请求

    1
    2
    3
    4
    5
    // 同步方式
    Response response = client.newCall(request).execute()

    // 异步方式
    client.newCall(request).enqueue(callback)

OkHttpClient

1
2
3
public OkHttpClient() {
this(new Builder());
}

OkHttpClient使用Builder模式创建,负责管理全局需要的配置。

Request的创建

Request的创建也使用了Builder模式,负责单个请求的配置信息。

1
2
3
4
5
6
7
Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tags = Util.immutableMap(builder.tags);
}

发起请求

client.newCall(request)负责构建一个RealCall,RealCall里存储了一个Request和OkHttpClient,以及请求的事件回调EventListener等信息。

1
2
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);

RealCall实现了OkHttp3.Call接口,实现了请求的发起、执行、取消、状态判断等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Call extends Cloneable {

Request request();

Response execute() throws IOException;

void enqueue(Callback responseCallback);

void cancel();

boolean isExecuted();

boolean isCanceled();

Call clone();

interface Factory {
Call newCall(Request request);
}
}

Dispathcer

无论同步和异步请求均使用到了Dispathcer,负责进行请求的管理,例如取消请求、返回请求和数量等处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 最大请求数
private int maxRequests = 64;

// 同Host最大请求数
private int maxRequestsPerHost = 5;

// 异步请求处于准备的队列(超过上述请求限制则的异步请求会放入此队列)
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

// 异步请求正在执行的队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

// 同步请求队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

对于异步请求队列,Dispatcher会开启一个线程池进行处理。

1
2
3
4
5
6
7
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}

同步请求

execute()方法用于发送同步请求,等待请求响应返回一个Response。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
// 核心代码下面两行利用Dispatcher执行
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}

异步请求

enqueue()则是发送异步请求,每个请求是一个AsyncCall,将请求加入队列,在Dispatcher的线程池进行处理。

1
2
3
4
5
6
7
8
9
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

AsyncCall持有一个Callback,在execute处理请求,回调给调用方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}

getResponseWithInterceptorChain()

无论同步还是异步请求,最终都由getResponseWithInterceptorChain方法返回响应Response。这里用到了责任链模式,通过一系列拦截器对请求进行加工处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());

return chain.proceed(originalRequest);
}

此处借用Piasy的图:

  1. 在配置 OkHttpClient 时设置的 interceptors;
  2. 负责失败重试以及重定向的 RetryAndFollowUpInterceptor;
  3. 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应的 BridgeInterceptor;
  4. 负责读取缓存直接返回、更新缓存的 CacheInterceptor;
  5. 负责和服务器建立连接的 ConnectInterceptor;
  6. 配置 OkHttpClient 时设置的 networkInterceptors;
  7. 负责向服务器发送请求数据、从服务器读取响应数据的 CallServerInterceptor。

至此OkHttp请求发送和响应解析完毕。

OkHttp拦截器

拦截器的调用顺序:

  1. AppInterceptor 自定义拦截器
  2. RetryAndFollowUpInterceptor 重试
  3. BridgeInterceptor 处理header、cookie和连接封装
  4. CacheInteceptor 缓存处理
  5. ConnectInterceptor 建立连接
  6. NetworkInterceptor 网络拦截器
  7. CallServerInterceptor 数据交换

RetryAndFollowUpInterceptor

RetryAndFollowUpInterceptor负责创建StreamAllocation和失败重连。最大重连次数是20次。

1
private static final int MAX_FOLLOW_UPS = 20;

具体看看intercept的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 创建StreamAllocation
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;

while (true) {
try {
// 正常处理请求
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// RouteException,判断失败重连
} catch (IOException e) {
// IOException,判断失败重连
} finally {
// 释放连接
}

// 负责响应码检测
Request followUp;
try {
followUp = followUpRequest(response, streamAllocation.route());
} catch (IOException e) {
streamAllocation.release();
throw e;
}

recover负责检测异常。

1
2
3
4
5
6
7
8
9
10
11
12
private boolean recover(IOException e, boolean routeException, Request userRequest) {
streamAllocation.streamFailed(e);
// 判断 OkHttpClient 是否支持失败重连的机制
if (!client.retryOnConnectionFailure()) return false;
// 在该方法中传入的 routeException值 为 true
if (!routeException && userRequest.body() instanceof UnrepeatableRequestBody) return false;
// isRecoverable检测该异常是否可恢复
if (!isRecoverable(e, routeException)) return false;
// 是否有更多的路由路线
if (!streamAllocation.hasMoreRoutes()) return false;
return true;
}

BridgeInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Response networkResponse = chain.proceed(requestBuilder.build());

// 处理Header,省略部分代码

// 处理Cookie
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}

// 收到结果之后,是否保存cookie
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);

if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
// 处理Gzip
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}

CacheInteceptor

Http的缓存主要通过Cache-control控制。

强制缓存与对比缓存

强制缓存:

  • private: 客户端可以缓存
  • public: 客户端和代理服务器都可缓存
  • max-age=xxx: 缓存失效时间
  • no-cache: 使用对比缓存
  • no-store: 所有内容都不会缓存,强制缓存,对比缓存都不会触发

对比缓存:

  • Last-Modified:服务器在响应请求时,告诉浏览器资源的最后修改时间
  • If-Modified-Since:通过此字段通知服务器上次请求时,服务器返回的资源最后修改时间
  • Etag:服务器响应请求时,告诉浏览器当前资源在服务器的唯一标识

对于强制缓存,服务器通知浏览器一个缓存时间,在缓存时间内,下次请求,直接用缓存,不在时间内,执行比较缓存策略。
对于比较缓存,将缓存信息中的Etag和Last-Modified通过请求发送给服务器,由服务器校验,返回304状态码时,浏览器直接使用缓存。

缓存配置:

1
2
3
4
5
6
7
// 缓存目录
File file = new File(Environment.getExternalStorageDirectory(), "a_cache");
// 缓存大小
int cacheSize = 10 * 1024 * 1024;
client = new OkHttpClient.Builder()
.cache(new Cache(file, cacheSize)) // 配置缓存
.build();

CacheInteceptor会根据缓存策略获取缓存进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// 查询缓存策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

if (cache != null) {
cache.trackResponse(strategy);
}

// 没缓存则不需要cacheCandidate
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body());
}

// 根据策略,不使用网络,缓存又没有返回504
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

// 无需网络请求,有缓存直接返回
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}

Response networkResponse = null;
try {
// 继续后续处理
networkResponse = chain.proceed(networkRequest);
} finally {
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

// 存在缓存,304则使用缓存并返回结果
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}

// 读取网络结果
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();

// 对比缓存
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}

ConnectInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
RealInterceptorChain realChain = (RealInterceptorChain) chain;
// 获取Request
Request request = realChain.request();

// 获取StreamAllocation
StreamAllocation streamAllocation = realChain.streamAllocation();

boolean doExtensiveHealthChecks = !request.method().equals("GET");

// 建立连接
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);

// 返回连接
RealConnection connection = streamAllocation.connection();

// 继续传递处理
return realChain.proceed(request, streamAllocation, httpCodec, connection);

核心是newStream的处理。

1
2
3
4
// 找到可用连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);

HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

看看findHealthyConnection内部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 开启循环一直寻找可用请求
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
}

// 如果是新请求直接返回,这里加了个连接池锁处理。
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}

// 通过socket判断连接是否健康可用
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}

return candidate;

看看findConnection的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");

// 尝试用一下现在的连接,判断一下,是否有可用的连接
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// 获取已有的连接
result = this.connection;
releasedConnection = null;
}

// 无报告需要则连接置空
if (!reportedAcquired) {
releasedConnection = null;
}

if (result == null) {
// 尝试从连接池获取请求
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);

if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// 如果已经找到连接则返回
return result;
}

// 如果需要路由,寻找一条可用链路
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}

synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");

if (newRouteSelection) {
// 由于上面我们获取了一个线路,无论是新建的,或者已有的。
// 我们通过这个线路,继续在连接池中寻找是否有可用的连接。
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}

if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}

// 如果在连接池找不到则创建
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}

// 如果在第二轮找到连接则返回
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}

// 真正的请求处理,做TCP握手请求
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());

Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;

// 缓存连接到连接池
Internal.instance.put(connectionPool, result);

// 复用连接
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);

eventListener.connectionAcquired(call, result);
return result;
}

连接池ConnectionPool

管理http和http/2的链接,以便减少网络请求延迟。同一个address将共享同一个connection。该类实现了复用连接的目标。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final class ConnectionPool {
// 创建了一个线程池处理
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

// 使用双向队列保存RealConnection
private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();

// 最多可容乃5个空闲的连接,存活期是5分钟
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}

RealConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final class RealConnection extends Http2Connection.Listener implements Connection {
private final ConnectionPool connectionPool;
private final Route route;
private Socket rawSocket;
private Socket socket;
private Handshake handshake;
private Protocol protocol;
private Http2Connection http2Connection;
private BufferedSource source;
private BufferedSink sink;

// 如果为true则一直为true,连接无法创建新的流
public boolean noNewStreams;

public int successCount;

// 连接最大并发数量
public int allocationLimit = 1;

// 连接引用列表
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();

}

连接的复用

具体的连接池复用逻辑在ConnectionPool的get方法里:

1
2
3
4
5
6
7
8
9
10
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public boolean isEligible(Address address, @Nullable Route route) {
// 无需新的数据流则返回
if (allocations.size() >= allocationLimit || noNewStreams) return false;

// http和ssl协议配置要相同
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

// 域名要匹配
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}

// 1. http1.x协议下当前socket没有其他流正在读写时才可以复用,http2对流数量没有限制
if (http2Connection == null) return false;

// 2. 只有在没有代理时才能复用
if (route == null) return false;
if (route.proxy().type() != Proxy.Type.DIRECT) return false;
if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
if (!this.route.socketAddress() .equals(route.socketAddress())) return false;

// 3. 只有ip地址相同才能复用
if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
if (!supportsUrl(address.url())) return false;

// 4. 对不受信任证书处理方式相同才能复用
try {
address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
} catch (SSLPeerUnverifiedException e) {
return false;
}

return true; // 返回true则代表可复用
}

即遵循上述规则的连接可进行复用。

连接的清理

开启一个cleanup线程池持续进行清理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;

synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();

if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
// 统计空闲连接数量
idleConnectionCount++;

if (idleDurationNs > longestIdleDurationNs) {
// 找出空闲时间最长的连接以及对应的空闲时间
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}

if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// 在符合清理条件下,清理空闲时间最长的连接
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// 不符合清理条件,则返回下次需要执行清理的等待时间,也就是此连接即将到期的时间
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// 没有空闲的连接,则隔keepAliveDuration(分钟)之后再次执行
return keepAliveDurationNs;
} else {
// 清理结束,返回-1
cleanupRunning = false;
return -1;
}
}
// 关闭socket资源
closeQuietly(longestIdleConnection.socket());

// 这里是在清理一个空闲时间最长的连接以后会执行到这里,需要立即再次执行清理
return 0;
}

具体清理的逻辑如下:

  1. 清理任务遵循最长空闲优先清理规则;
  2. 清理完毕所有连接,清理任务才结束;
  3. 下一次put的时候,如果已经停止的清理任务则会被再次触发。

CallServerInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@Override public Response intercept(Chain chain) throws IOException {

// 1.获取几个前面拦截方法创建的,重要类
HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
Request request = chain.request();
long sentRequestMillis = System.currentTimeMillis();

// 2. 先向sink(OutputStream)中写头信息
httpCodec.writeRequestHeaders(request);

Response.Builder responseBuilder = null;

// 3.判断是否有请求实体的请求,用method判断
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return what
// we did get (such as a 4xx response) without ever transmitting the request body.

//4. 如果头部添加了"100-continue", 相对于一次见到的握手操作,只有拿到服务的结果再继续
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}

// 5. 当前面的"100-continue",需要握手,但又握手失败,这个时候responseBuilder不是空的
// Write the request body, unless an "Expect: 100-continue" expectation failed.
if (responseBuilder == null) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

// 回调RequestBody的writeTo,写相应的数据
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
}

// 6. 这里也是调用了一次 sink.flush()
httpCodec.finishRequest();

// 7. 读取头部信息,状态码,信息等
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false);
}

// 8. 构建Response, 写入原请求,握手情况,请求时间,得到的结果时间
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

int code = response.code();

// 9. 通过状态码判断以及是否webSocket判断,是否返回一个空的body
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
// 读取Body信息
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}

// 10. 如果设置了连接close,断开连接
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}

// 11. HTTP 204(no content) 代表响应报文中包含若干首部和一个状态行,但是没有实体的主体内容。
// HTTP 205(reset content) 表示响应执行成功,重置页面(Form表单),方便用户下次输入
// 这里做了同样的处理,就是抛出协议异常。
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

return response;
}

一句话总结是通过建立好的连接进行数据交换。

AppInterceptor和NetworkInterceptor的区别

  • AppInterceptor适用于在请求前统一添加一些公共参数,例如在添加自定义Header。或者是用于请求和响应的日志打印。

  • NetwrokInterceptor在这一层拦截器中可以获取到最终发送请求的request,也可以获取到真正发生网络请求回来的response响应,从而修改对应的请求或者响应数据。

比如业务接口需要自定义AppInterceptor增加Header实现接口鉴权。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class HeaderInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
FamilyAlbumConfig config = FamilyAlbumCore.getInstance().getConfig();
Request requestOrigin = chain.request();
Headers headersOrigin = requestOrigin.headers();
Headers.Builder builder = headersOrigin.newBuilder()
.add("Accept-Encoding", "identity")
.add("x-huawei-channelSrc", Params.xhuaweichannedSrc)
.add("x-MM-Source", Params.xmmSource)
.add("Content-Type", "application/json")
.add("x-UserAgent", Params.xUserAgent)
.add("x-SvcType", Params.xSvcType)
.add("x-DeviceInfo", Params.xDeviceInfo)
.add("Authorization", "WSSE realm=\"SDP\", profile=\"UsernameToken\", type=\"Appkey\"")
.set("X-WSSE", getXWSSE(config.isDebug() ? Params.DEBUG_APPKEY : Params.RELEASE_APPKEY,
config.isDebug() ? Params.DEBUG_APPSECRET : Params.RELEASE_APPSECRET));


... 省略部分代码

Headers headers = builder.build();
Request request = requestOrigin.newBuilder().headers(headers).build();
return chain.proceed(request);
}

参考文章:
https://www.cnblogs.com/chenqf/p/6386163.html
https://www.jianshu.com/p/fe43449682d6
https://www.cnblogs.com/chenqf/p/6386163.html
https://juejin.im/post/5afb89dcf265da0ba26727c7

0%