💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] 基于'com.squareup.okhttp3:okhttp:3.12.0' ## 整体设计 ![](https://img.kancloud.cn/11/be/11be920fc9ce5321e0bdd800d2405032_1000x1600.png) ## 线程池 ### 个人理解 1. 如果请求是同步,请求直接加入请求同步队列(runningAsyncCalls)中,然后那么就直接请求。 2. 如果请求是异步,判断请求异步队列(runningAsyncCalls)数量是否在64以内 以及 同一host的请求是是否在5以内。 3. 如果否,加入等待异步队列(readyAsyncCalls)。 4. 如果是,则加入请求异步队列(runningAsyncCalls),开始请求。 5. 请求结束后,调用finally里的方法。 6. 将自己移除请求异步队列(runningAsyncCalls),并且在等待异步队列(readyAsyncCalls)取出任务执行 ## 拦截器与责任链 ~~~ RealCall @Override protected void execute() { boolean signalledCallback = false; try { // 核心 Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } @Override Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); } RealInterceptorChain public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, Connection connection) throws IOException { ..... // 核心 RealInterceptorChain next = new RealInterceptorChain( interceptors, streamAllocation, httpCodec, connection, index + 1, request); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); ..... return response; } ****Interceptor 真实拦截器 @Override public Response intercept(Chain chain) throws IOException { ..... chain.proceed(originalRequest);//下一个拦截器 .. ~~~ ![](https://img.kancloud.cn/36/2f/362f0e13c05487158a8e3275f0abcc36_432x613.png) ## RetryAndFollowUpInterceptor 重定向 当返回了3\*\*的状态码,说明需要进行重定向,此拦截器就会自从重定向后重新请求。最多请求20次。 ~~~ public final class RetryAndFollowUpInterceptor implements Interceptor { //最大重定向次数: private static final int MAX_FOLLOW_UPS = 20; @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); // 三个参数分别对应:(1)全局的连接池,(2)连接线路Address, (3)堆栈对象 streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(request.url()), callStackTrace); int followUpCount = 0; Response priorResponse = null; while (true) { if (canceled) { streamAllocation.release(); throw new IOException("Canceled"); } Response response = null; boolean releaseConnection = true; // 执行下一个拦截器,即BridgeInterceptor response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); releaseConnection = false; ... //核心 检查是否符合要求 Request followUp = followUpRequest(response); if (followUp == null) { if (!forWebSocket) { streamAllocation.release(); } // 返回结果 return response; } //不符合,关闭响应流 closeQuietly(response.body()); // 是否超过最大限制 if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); throw new ProtocolException("Too many follow-up requests: " + followUpCount); } if (followUp.body() instanceof UnrepeatableRequestBody) { streamAllocation.release(); throw new HttpRetryException("Cannot retry streamed HTTP body", response.code()); } if (!sameConnection(response, followUp.url())) { streamAllocation.release(); streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(followUp.url()), callStackTrace); } else if (streamAllocation.codec() != null) { throw new IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); } request = followUp; priorResponse = response; ~~~ ## BridgeInterceptor 添加请求头 BridgeInterceptor比较简单 **发送请求** * header包括*Content-Type*、*Content-Length*、*Transfer-Encoding*、*Host*、*Connection*、*Accept-Encoding*、*User-Agent*。 * 如果需要gzip压缩则进行gzip压缩 * 加载*Cookie* **响应** * 首先保存*Cookie* * 如果服务器返回的响应content是以gzip压缩过的,则会先进行解压缩,移除响应中的header Content-Encoding和Content-Length,构造新的响应返回。 * 否则直接返回response ~~~ public final class BridgeInterceptor implements Interceptor { private final CookieJar cookieJar; public BridgeInterceptor(CookieJar cookieJar) { this.cookieJar = cookieJar; } @Override public Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); RequestBody body = userRequest.body(); if (body != null) { MediaType contentType = body.contentType(); if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()); } long contentLength = body.contentLength(); if (contentLength != -1) { requestBuilder.header("Content-Length", Long.toString(contentLength)); requestBuilder.removeHeader("Transfer-Encoding"); } else { requestBuilder.header("Transfer-Encoding", "chunked"); requestBuilder.removeHeader("Content-Length"); } } if (userRequest.header("Host") == null) { requestBuilder.header("Host", hostHeader(userRequest.url(), false)); } if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive"); } // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing // the transfer stream. boolean transparentGzip = false; if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true; requestBuilder.header("Accept-Encoding", "gzip"); } List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url()); if (!cookies.isEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)); } if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", Version.userAgent()); } Response networkResponse = chain.proceed(requestBuilder.build()); HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers()); Response.Builder responseBuilder = networkResponse.newBuilder() .request(userRequest); if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) && HttpHeaders.hasBody(networkResponse)) { GzipSource responseBody = new GzipSource(networkResponse.body().source()); Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build(); responseBuilder.headers(strippedHeaders); String contentType = networkResponse.header("Content-Type"); responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody))); } return responseBuilder.build(); } /** Returns a 'Cookie' HTTP request header with all cookies, like {@code a=b; c=d}. */ private String cookieHeader(List<Cookie> cookies) { StringBuilder cookieHeader = new StringBuilder(); for (int i = 0, size = cookies.size(); i < size; i++) { if (i > 0) { cookieHeader.append("; "); } Cookie cookie = cookies.get(i); cookieHeader.append(cookie.name()).append('=').append(cookie.value()); } return cookieHeader.toS ~~~ ## CacheInterceptor 缓存处理 ### 注意事项 1. 目前只支持GET,其他请求方式需要自己实现。 2. 需要服务器配合,通过head设置相关头来控制缓存 3. 创建OkHttpClient时候需要配置Cache ### 流程 1. 如果配置了缓存,则从缓存中取出(可能为null) 2. 获取缓存的策略. 3. 监测缓存 4. 如果禁止使用网络(比如飞行模式),且缓存无效,直接返回 5. 如果缓存有效,使用网络,不使用网络 6. 如果缓存无效,执行下一个拦截器 7. 本地有缓存、根据条件判断是使用缓存还是使用网络的response 8. 把response缓存到本地 ### CacheStrategy类详解 缓存策略类,根据输出的networkRequest和cacheResponse的值是否为null给出不同的策略,如下: networkRequestcacheResponseresult 结果nullnullonly-if-cached (表明不进行网络请求,且缓存不存在或者过期,一定会返回503错误)nullnon-null不进行网络请求,直接返回缓存,不请求网络non-nullnull需要进行网络请求,而且缓存不存在或者过去,直接访问网络non-nullnon-nullHeader中包含ETag/Last-Modified标签,需要在满足条件下请求,还是需要访问网络 ~~~ public Factory(long nowMillis, Request request, Response cacheResponse) { this.nowMillis = nowMillis; this.request = request; this.cacheResponse = cacheResponse; if (cacheResponse != null) { this.sentRequestMillis = cacheResponse.sentRequestAtMillis(); this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis(); Headers headers = cacheResponse.headers(); //获取cacheReposne中的header中值 for (int i = 0, size = headers.size(); i < size; i++) { String fieldName = headers.name(i); String value = headers.value(i); if ("Date".equalsIgnoreCase(fieldName)) { servedDate = HttpDate.parse(value); servedDateString = value; } else if ("Expires".equalsIgnoreCase(fieldName)) { expires = HttpDate.parse(value); } else if ("Last-Modified".equalsIgnoreCase(fieldName)) { lastModified = HttpDate.parse(value); lastModifiedString = value; } else if ("ETag".equalsIgnoreCase(fieldName)) { etag = value; } else if ("Age".equalsIgnoreCase(fieldName)) { ageSeconds = HttpHeaders.parseSeconds(value, -1); } } } } /** * Returns a strategy to satisfy {@code request} using the a cached response {@code response}. */ public CacheStrategy get() { CacheStrategy candidate = getCandidate(); if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) { // We're forbidden from using the network and the cache is insufficient. return new CacheStrategy(null, null); } return candidate; } /** * Returns a strategy to satisfy {@code request} using the a cached response {@code response}. */ public CacheStrategy get() { //获取当前的缓存策略 CacheStrategy candidate = getCandidate(); //如果是网络请求不为null并且请求里面的cacheControl是只用缓存 if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) { // We're forbidden from using the network and the cache is insufficient. //使用只用缓存的策略 return new CacheStrategy(null, null); } return candidate; } /** Returns a strategy to use assuming the request can use the network. */ private CacheStrategy getCandidate() { // No cached response. //如果没有缓存响应,返回一个没有响应的策略 if (cacheResponse == null) { return new CacheStrategy(request, null); } //如果是https,丢失了握手,返回一个没有响应的策略 // Drop the cached response if it's missing a required handshake. if (request.isHttps() && cacheResponse.handshake() == null) { return new CacheStrategy(request, null); } // 响应不能被缓存 // If this response shouldn't have been stored, it should never be used // as a response source. This check should be redundant as long as the // persistence store is well-behaved and the rules are constant. if (!isCacheable(cacheResponse, request)) { return new CacheStrategy(request, null); } //获取请求头里面的CacheControl CacheControl requestCaching = request.cacheControl(); //如果请求里面设置了不缓存,则不缓存 if (requestCaching.noCache() || hasConditions(request)) { return new CacheStrategy(request, null); } //获取响应的年龄 long ageMillis = cacheResponseAge(); //获取上次响应刷新的时间 long freshMillis = computeFreshnessLifetime(); //如果请求里面有最大持久时间要求,则两者选择最短时间的要求 if (requestCaching.maxAgeSeconds() != -1) { freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds())); } long minFreshMillis = 0; //如果请求里面有最小刷新时间的限制 if (requestCaching.minFreshSeconds() != -1) { //用请求中的最小更新时间来更新最小时间限制 minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds()); } //最大验证时间 long maxStaleMillis = 0; //响应缓存控制器 CacheControl responseCaching = cacheResponse.cacheControl(); //如果响应(服务器)那边不是必须验证并且存在最大验证秒数 if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) { //更新最大验证时间 maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds()); } //响应支持缓存 //持续时间+最短刷新时间<上次刷新时间+最大验证时间 则可以缓存 //现在时间(now)-已经过去的时间(sent)+可以存活的时间<最大存活时间(max-age) if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) { Response.Builder builder = cacheResponse.newBuilder(); if (ageMillis + minFreshMillis >= freshMillis) { builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\""); } long oneDayMillis = 24 * 60 * 60 * 1000L; if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) { builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\""); } //缓存响应 return new CacheStrategy(null, builder.build()); } //如果想缓存request,必须要满足一定的条件 // Find a condition to add to the request. If the condition is satisfied, the response body // will not be transmitted. String conditionName; String conditionValue; if (etag != null) { conditionName = "If-None-Match"; conditionValue = etag; } else if (lastModified != null) { conditionName = "If-Modified-Since"; conditionValue = lastModifiedString; } else if (servedDate != null) { conditionName = "If-Modified-Since"; conditionValue = servedDateString; } else { //没有条件则返回一个定期的request return new CacheStrategy(request, null); // No condition! Make a regular request. } Headers.Builder conditionalRequestHeaders = request.headers().newBuilder(); Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue); Request conditionalRequest = request.newBuilder() .headers(conditionalRequestHeaders.build()) .build(); //返回有条件的缓存request策略 return new CacheStrategy(conditionalRequest, cacheResponse); ~~~ 通过上面分析,我们可以发现,OKHTTP实现的缓存策略实质上就是大量的if/else判断,这些其实都是和RFC标准文档里面写死的。 ### CacheInterceptor 类详解 ~~~ @Override public Response intercept(Chain chain) throws IOException { //如果存在缓存,则从缓存中取出,有可能为null Response cacheCandidate = cache != null ? cache.get(chain.request()) : null; long now = System.currentTimeMillis(); //获取缓存策略对象 CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); //策略中的请求 Request networkRequest = strategy.networkRequest; //策略中的响应 Response cacheResponse = strategy.cacheResponse; //缓存非空判断, if (cache != null) { cache.trackResponse(strategy); } //缓存策略不为null并且缓存响应是null if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. } //禁止使用网络(根据缓存策略),缓存又无效,直接返回 // If we're forbidden from using the network and the cache is insufficient, fail. if (networkRequest == null && cacheResponse == null) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } //缓存有效,不使用网络 // If we don't need the network, we're done. if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } //缓存无效,执行下一个拦截器 Response networkResponse = null; try { networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } } //本地有缓存,根据条件选择使用哪个响应 // If we have a cache response too, then we're doing a conditional get. if (cacheResponse != null) { if (networkResponse.code() == HTTP_NOT_MODIFIED) { Response response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } } //使用网络响应 Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); if (cache != null) { //缓存到本地 if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. CacheRequest cacheRequest = cache.put(response); return cacheWritingResponse(cacheRequest, response); } if (HttpMethod.invalidatesCache(networkRequest.method())) { try { cache.remove(networkRequest); } catch (IOException ignored) { // The cache cannot be written. } } } return response; ~~~ ## ConnectInterceptor 连接相关 ### ConnectInterceptor 详情 ~~~ public final class ConnectInterceptor implements Interceptor { public final OkHttpClient client; public ConnectInterceptor(OkHttpClient client) { this.client = client; } @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); ~~~ ### RealConnection 详情 RealConnection是Connection的实现类,代表着链接socket的链路,如果拥有了一个RealConnection就代表了我们已经跟服务器有了一条通信链路,而且通过 RealConnection代表是连接socket链路,RealConnection对象意味着我们已经跟服务端有了一条通信链路了。很多朋友这时候会想到,有通信链路了,是不是与意味着在这个类实现的三次握手,你们猜对了,的确是在这个类里面实现的三次握手。在讲握手的之前,看下它的属性和构造函数,对他有个大概的了解。 ~~~ private final ConnectionPool connectionPool; private final Route route; // The fields below are initialized by connect() and never reassigned. //下面这些字段,通过connect()方法开始初始化,并且绝对不会再次赋值 /** The low-level TCP socket. */ private Socket rawSocket; //底层socket /** * The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or * {@link #rawSocket} itself if this connection does not use SSL. */ private Socket socket; //应用层socket //握手 private Handshake handshake; //协议 private Protocol protocol; // http2的链接 private Http2Connection http2Connection; //通过source和sink,大家可以猜到是与服务器交互的输入输出流 private BufferedSource source; private BufferedSink sink; // The fields below track connection state and are guarded by connectionPool. //下面这个字段是 属于表示链接状态的字段,并且有connectPool统一管理 /** If true, no new streams can be created on this connection. Once true this is always true. */ //如果noNewStreams被设为true,则noNewStreams一直为true,不会被改变,并且表示这个链接不会再创新的stream流 public boolean noNewStreams; //成功的次数 public int successCount; /** * The maximum number of concurrent streams that can be carried by this connection. If {@code * allocations.size() < allocationLimit} then new streams can be created on this connection. */ //此链接可以承载最大并发流的限制,如果不超过限制,可以随意增加 public int allocationLimit = ~~~ #### connect() ~~~ public void connect( int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) { if (protocol != null) throw new IllegalStateException("already connected"); // 线路的选择 RouteException routeException = null; List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs(); ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs); if (route.address().sslSocketFactory() == null) { if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication not enabled for client")); } String host = route.address().url().host(); if (!Platform.get().isCleartextTrafficPermitted(host)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication to " + host + " not permitted by network security policy")); } } // 连接开始 while (true) { try { // 如果要求隧道模式,建立通道连接,通常不是这种 if (route.requiresTunnel()) { connectTunnel(connectTimeout, readTimeout, writeTimeout); } else { // 一般都走这条逻辑了,实际上很简单就是socket的连接 connectSocket(connectTimeout, readTimeout); } // https的建立 establishProtocol(connectionSpecSelector); break; } catch (IOException e) { closeQuietly(socket); closeQuietly(rawSocket); socket = null; rawSocket = null; source = null; sink = null; handshake = null; protocol = null; http2Connection = null; if (routeException == null) { routeException = new RouteException(e); } else { routeException.addConnectException(e); } if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) { throw routeException; } } } if (http2Connection != null) { synchronized (connectionPool) { allocationLimit = http2Connection.maxConcurrentStreams(); } } ~~~ #### connectSocket() ~~~ /** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */ private void connectSocket(int connectTimeout, int readTimeout) throws IOException { Proxy proxy = route.proxy(); Address address = route.address(); // 根据代理类型来选择socket类型,是代理还是直连 rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP ? address.socketFactory().createSocket() : new Socket(proxy); rawSocket.setSoTimeout(readTimeout); try { // 连接socket,之所以这样写是因为支持不同的平台 //里面实际上是 socket.connect(address, connectTimeout); Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout); } catch (ConnectException e) { ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress()); ce.initCause(e); throw ce; } // 得到输入/输出流 source = Okio.buffer(Okio.source(rawSocket)); sink = Okio.buffer(Okio.sink(rawSocket)); } ~~~ ### ConnectionPool 详情 管理http和http/2的链接,以便减少网络请求延迟。同一个address将共享同一个connection。该类实现了复用连接的目标。 ~~~ /** * Background threads are used to cleanup expired connections. There will be at most a single * thread running per connection pool. The thread pool executor permits the pool itself to be * garbage collected. */ //这是一个用于清楚过期链接的线程池,每个线程池最多只能运行一个线程,并且这个线程池允许被垃圾回收 private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */, Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true)); /** The maximum number of idle connections for each address. */ //每个address的最大空闲连接数。 private final int maxIdleConnections; private final long keepAliveDurationNs; //清理任务 private final Runnable cleanupRunnable = new Runnable() { @Override public void run() { while (true) { long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try { ConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } } }; //链接的双向队列 private final Deque<RealConnection> connections = new ArrayDeque<>(); //路由的数据库 final RouteDatabase routeDatabase = new RouteDatabase(); //清理任务正在执行的标志 boolean cleanupRunning; ~~~ #### cleanup 方法 ~~~ long cleanup(long now) { int inUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE; // Find either a connection to evict, or the time that the next eviction is due. synchronized (this) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); // If the connection is in use, keep searching. if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++; continue; } //统计空闲连接数量 idleConnectionCount++; // If the connection is ready to be evicted, we're done. long idleDurationNs = now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { //找出空闲时间最长的连接以及对应的空闲时间 longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } } if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { // We've found a connection to evict. Remove it from the list, then close it below (outside // of the synchronized block). //在符合清理条件下,清理空闲时间最长的连接 connections.remove(longestIdleConnection); } else if (idleConnectionCount > 0) { // A connection will be ready to evict soon. //不符合清理条件,则返回下次需要执行清理的等待时间,也就是此连接即将到期的时间 return keepAliveDurationNs - longestIdleDurationNs; } else if (inUseConnectionCount > 0) { // All connections are in use. It'll be at least the keep alive duration 'til we run again. //没有空闲的连接,则隔keepAliveDuration(分钟)之后再次执行 return keepAliveDurationNs; } else { // No connections, idle or in use. //清理结束 cleanupRunning = false; return -1; } } //关闭socket资源 closeQuietly(longestIdleConnection.socket()); // Cleanup again immediately. //这里是在清理一个空闲时间最长的连接以后会执行到这里,需要立即再次执行清理 return 0; ~~~ ### StreamAllocation 详情 StreamAllocation根据"请求"寻找对应的"连接"、"流"。 ## CallServerInterceptor 请求 ### CallServerInterceptor ~~~ @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; HttpCodec httpCodec = realChain.httpStream(); StreamAllocation streamAllocation = realChain.streamAllocation(); RealConnection connection = (RealConnection) realChain.connection(); Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); //写入请求头 httpCodec.writeRequestHeaders(request); Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return what // we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { httpCodec.flushRequest(); responseBuilder = httpCodec.readResponseHeaders(true); } //写入请求体 if (responseBuilder == null) { // Write the request body if the "Expect: 100-continue" expectation was met. Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength()); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } else if (!connection.isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from // being reused. Otherwise we're still obligated to transmit the request body to leave the // connection in a consistent state. streamAllocation.noNewStreams(); } } httpCodec.finishRequest(); //读取响应头 if (responseBuilder == null) { responseBuilder = httpCodec.readResponseHeaders(false); } Response response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); //读取响应体 int code = response.code(); if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(httpCodec.openResponseBody(response)) .build(); } if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { streamAllocation.noNewStreams(); } if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } return response; ~~~ ### HttpCodec 在okHttp中,HttpCodec是网络读写的管理类,也可以理解为解码器(注释上就是这样写的),它有对应的两个子类,Http1Codec和Http2Codec,分别对应HTTP/1.1以及HTTP/2.0协议 ~~~ /** Encodes HTTP requests and decodes HTTP responses. */ public interface HttpCodec { int DISCARD_STREAM_TIMEOUT_MILLIS = 100; //写入请求体 Sink createRequestBody(Request request, long contentLength); //写入请求头 void writeRequestHeaders(Request request) throws IOException; //相当于flush,把请求刷入底层socket void flushRequest() throws IOException; //相当于flush,把请求输入底层socket并不在发出请求 void finishRequest() throws IOException; //读取响应头 Response.Builder readResponseHeaders(boolean expectContinue) throws IOException; //读取响应体 ResponseBody openResponseBody(Response response) throws IOException; //读取响应体 void cancel ~~~ ### Http1Codec ~~~ public final class Http1Codec implements HttpCodec { final OkHttpClient client; final StreamAllocation streamAllocation; final BufferedSource source; final BufferedSink sink; int state = STATE_IDLE; private long headerLimit = HEADER_LIMIT; } ~~~ ### Http2Codec 由于HTTP/2 里面支持一个"连接"可以发送多个请求,所以和HTTP/1.x有着本质的区别,所以Http1Codec里面有source和sink,而Http2Codec没有,因为在HTTP/1.x里面一个连接对应一个请求。而HTTP2则不是,一个TCP连接上可以跑多个请求。所以OkHttp里面用一个Http2Connection代表一个连接。然后用Http2Stream代表一个请求的流。 ## 参考资料 [OKHttp源码解析](https://www.jianshu.com/p/82f74db14a18)