apache http client连接池实现原理

发布于:2025-05-23 ⋅ 阅读:(15) ⋅ 点赞:(0)

在java开发中我们经常会涉及到http 请求接口,一般有几种方式:

  • java自带的 HttpURLConnection
  • okHttpClient
  • apache http client

一般我们使用apache http client会比较多点,在代码中会进行如下调用方式:

 private static class HttpClientPool {
        private static CloseableHttpClient httpClient;

        private static PoolingHttpClientConnectionManager HTTP_CLIENT_POOL = new PoolingHttpClientConnectionManager();

        static {
            //连接池最大连接数:300
            HTTP_CLIENT_POOL.setMaxTotal(300);
            // 每个路由的最大连接数
            HTTP_CLIENT_POOL.setDefaultMaxPerRoute(50);
            if (httpClient == null) {
                httpClient = HttpClients.custom()
                        .setConnectionManager(HTTP_CLIENT_POOL)
                        .setDefaultRequestConfig(RequestConfig.custom()
                                .setConnectTimeout(10000)
                                .setSocketTimeout(10000).build())
                        .setDefaultSocketConfig(SocketConfig.custom()
                                .setSoTimeout(10000)
                                .build())
                        .build();

            }
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    httpClient.close();
                    HTTP_CLIENT_POOL.close();
                } catch (Exception e) {
                    LoggerFactory.getLogger(com.ipinyou.mip.base.utils.HttpClientHolder.class).error("关闭httpClient 连接池失败!", e);
                }
            }));
        }


        public static CloseableHttpClient getHttpClient() {
            return httpClient;
        }
    }

    private static HttpResponseValue httpCallWithHttpClient(String url, HttpMethodEnum method, Map<String, String> formData, String jsonData, Map<String, String> headers) {
        HttpResponseValue responseValue = new HttpResponseValue();
        if (url == null || StringUtils.isBlank(url)) {
            log.info("url is empty,return ");
            return responseValue;
        }
        long start = System.currentTimeMillis();
        HttpPost post = null;
        HttpGet get = null;
        try {
            // 设置请求超时时间为 10 分钟
            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectTimeout(HTTP_CONNECT_TIMEOUT_MS) // 连接超时时间
                    .setSocketTimeout(HTTP_CONNECT_TIMEOUT_MS) // 数据传输超时时间
                    .build();
            if (method == HttpMethodEnum.POST) {
                post = new HttpPost(url);
                post.setConfig(requestConfig);
                if (headers != null && !headers.isEmpty()) {
                    for (Map.Entry<String, String> kv : headers.entrySet()) {
                        post.addHeader(kv.getKey(), kv.getValue());
                    }
                }

                if (formData != null && !formData.isEmpty()) {
                    List<NameValuePair> nvps = new ArrayList<>();
                    formData.forEach((k, v) -> nvps.add(new BasicNameValuePair(k, v)));
                    post.setEntity(new UrlEncodedFormEntity(nvps, DEFAULT_UTF8_ENCODING));
                } else if (StringUtils.isNotBlank(jsonData)) {
                    StringEntity entity = new StringEntity(jsonData, ContentType.APPLICATION_JSON);
                    entity.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, CONTENT_TYPE_APPLICATION_JSON));
                    post.setEntity(entity);
                }
            }
            if (method == HttpMethodEnum.GET) {
                get = new HttpGet();
                get.setConfig(requestConfig);
            }
            CloseableHttpClient client = HttpClientPool.getHttpClient();
            //entity编码指定urf-8
            CloseableHttpResponse response = null;
            if (method == HttpMethodEnum.GET) {
                response = client.execute(get);
            } else {
                response = client.execute(post);
            }
            HttpEntity entity = response.getEntity();
            if (entity != null) {
                responseValue.content = EntityUtils.toString(entity, DEFAULT_UTF8_ENCODING);
            }
            response.close();
            responseValue.code = 200;
        } catch (Exception e) {
            log.info("httpCallWithHttpClient failed: {} ", e);
        } finally {
            log.info("httpCallWithHttpClient url=[{}],cost={} ms,", url, (System.currentTimeMillis() - start));
        }
        return responseValue;
    }

今天我们来研究下,apache http client的源码。
HttpClient是其内部一个抽象接口:
在这里插入图片描述
这里我们以InternalHttpClient为例来研究(CloseableHttpClient是一个抽象类):

    public CloseableHttpResponse execute(
            final HttpHost target,
            final HttpRequest request,
            final HttpContext context) throws IOException, ClientProtocolException {
        return doExecute(target, request, context);
    }
    protected CloseableHttpResponse doExecute(
            final HttpHost target,
            final HttpRequest request,
            final HttpContext context) throws IOException, ClientProtocolException {
        Args.notNull(request, "HTTP request");
        HttpExecutionAware execAware = null;
        if (request instanceof HttpExecutionAware) {
            execAware = (HttpExecutionAware) request;
        }
        try {
            final HttpRequestWrapper wrapper = HttpRequestWrapper.wrap(request, target);
            final HttpClientContext localcontext = HttpClientContext.adapt(
                    context != null ? context : new BasicHttpContext());
            RequestConfig config = null;
            if (request instanceof Configurable) {
                config = ((Configurable) request).getConfig();
            }
            if (config == null) {
                final HttpParams params = request.getParams();
                if (params instanceof HttpParamsNames) {
                    if (!((HttpParamsNames) params).getNames().isEmpty()) {
                        config = HttpClientParamConfig.getRequestConfig(params, this.defaultConfig);
                    }
                } else {
                    config = HttpClientParamConfig.getRequestConfig(params, this.defaultConfig);
                }
            }
            if (config != null) {
                localcontext.setRequestConfig(config);
            }
            setupContext(localcontext);
            final HttpRoute route = determineRoute(target, wrapper, localcontext);
            return this.execChain.execute(route, wrapper, localcontext, execAware);
        } catch (final HttpException httpException) {
            throw new ClientProtocolException(httpException);
        }
    }

实际执行发送请求是在InternalHttpClient中,发送时会根据determineRoute来确定本次请求的路径,请求中的HttpPost、HttpGet都是实现了HttpUriRequest接口,在请求前,会通过HttpUriRequest获取到请求的地址信息,将其封装到HttpHost中,主要包含如下信息:

public final class HttpHost implements java.lang.Cloneable, java.io.Serializable {
  public static final java.lang.String DEFAULT_SCHEME_NAME = "http";
  protected final java.lang.String hostname;
  protected final java.lang.String lcHostname;
  protected final int port;
  protected final java.lang.String schemeName;
  protected final java.net.InetAddress address;
  @Override
  public boolean equals(final Object obj) {
      if (this == obj) {
          return true;
      }
      if (obj instanceof HttpHost) {
          final HttpHost that = (HttpHost) obj;
          return this.lcHostname.equals(that.lcHostname)
              && this.port == that.port
              && this.schemeName.equals(that.schemeName)
              && (this.address==null ? that.address== null : this.address.equals(that.address));
      } else {
          return false;
      }
  }
  @Override
  public int hashCode() {
      int hash = LangUtils.HASH_SEED;
      hash = LangUtils.hashCode(hash, this.lcHostname);
      hash = LangUtils.hashCode(hash, this.port);
      hash = LangUtils.hashCode(hash, this.schemeName);
      if (address!=null) {
          hash = LangUtils.hashCode(hash, address);
      }
      return hash;
  }
  }

可以看到,主要包含了请求地址host,端口,协议(http、https),尤其需要注意其重写了equalshashCode方法,可以看到,判断两个HttpHost 是否一样,主要是看协议(http、https)、地址、端口号
然后根据通过routePlanner.determineRouteHttpHost和一些其他信息封装到HttpRoute,表示一个请求的路由信息:

public final class HttpRoute implements RouteInfo, Cloneable {
private final HttpHost targetHost;
    private final InetAddress localAddress;
    private final List<HttpHost> proxyChain;
    private final TunnelType tunnelled;
    private final LayerType layered;
    private final boolean secure;
     @Override
    public final boolean equals(final Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj instanceof HttpRoute) {
            final HttpRoute that = (HttpRoute) obj;
            return
                // Do the cheapest tests first
                (this.secure    == that.secure) &&
                (this.tunnelled == that.tunnelled) &&
                (this.layered   == that.layered) &&
                LangUtils.equals(this.targetHost, that.targetHost) &&
                LangUtils.equals(this.localAddress, that.localAddress) &&
                LangUtils.equals(this.proxyChain, that.proxyChain);
        } else {
            return false;
        }
    }
    @Override
    public final int hashCode() {
        int hash = LangUtils.HASH_SEED;
        hash = LangUtils.hashCode(hash, this.targetHost);
        hash = LangUtils.hashCode(hash, this.localAddress);
        if (this.proxyChain != null) {
            for (final HttpHost element : this.proxyChain) {
                hash = LangUtils.hashCode(hash, element);
            }
        }
        hash = LangUtils.hashCode(hash, this.secure);
        hash = LangUtils.hashCode(hash, this.tunnelled);
        hash = LangUtils.hashCode(hash, this.layered);
        return hash;
    }
}

需要注意的是,HttpRoute 重写了equalshashCode方法,也就是说,一般常规情况下,两个HttpRoute 是否相等,主要就是协议、地址、端口号,也就是说,我们经常设置的连接池的setDefaultMaxPerRoute这里设置的是协议、地址、端口号 为分类

通过HttpRoute,apache http client将会找到服务端并建立连接。

加下来通过ClientExecChain进行请求的发送:

public interface ClientExecChain {
  CloseableHttpResponse execute(
          HttpRoute route,
          HttpRequestWrapper request,
          HttpClientContext clientContext,
          HttpExecutionAware execAware) throws IOException, HttpException;

}

其主要实现类为MainClientExec:

public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
        Args.notNull(route, "HTTP route");
        Args.notNull(request, "HTTP request");
        Args.notNull(context, "HTTP context");

        AuthState targetAuthState = context.getTargetAuthState();
        if (targetAuthState == null) {
            targetAuthState = new AuthState();
            context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);
        }
        AuthState proxyAuthState = context.getProxyAuthState();
        if (proxyAuthState == null) {
            proxyAuthState = new AuthState();
            context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);
        }

        if (request instanceof HttpEntityEnclosingRequest) {
            RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);
        }

        Object userToken = context.getUserToken();

        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
        if (execAware != null) {
            if (execAware.isAborted()) {
                connRequest.cancel();
                throw new RequestAbortedException("Request aborted");
            } else {
                execAware.setCancellable(connRequest);
            }
        }

        final RequestConfig config = context.getRequestConfig();

        final HttpClientConnection managedConn;
        try {
            final int timeout = config.getConnectionRequestTimeout();
            managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
        } catch(final InterruptedException interrupted) {
            Thread.currentThread().interrupt();
            throw new RequestAbortedException("Request aborted", interrupted);
        } catch(final ExecutionException ex) {
            Throwable cause = ex.getCause();
            if (cause == null) {
                cause = ex;
            }
            throw new RequestAbortedException("Request execution failed", cause);
        }

        context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);

        if (config.isStaleConnectionCheckEnabled()) {
            // validate connection
            if (managedConn.isOpen()) {
                this.log.debug("Stale connection check");
                if (managedConn.isStale()) {
                    this.log.debug("Stale connection detected");
                    managedConn.close();
                }
            }
        }

        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            if (execAware != null) {
                execAware.setCancellable(connHolder);
            }

            HttpResponse response;
            for (int execCount = 1;; execCount++) {

                if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
                    throw new NonRepeatableRequestException("Cannot retry request " +
                            "with a non-repeatable request entity.");
                }

                if (execAware != null && execAware.isAborted()) {
                    throw new RequestAbortedException("Request aborted");
                }

                if (!managedConn.isOpen()) {
                    this.log.debug("Opening connection " + route);
                    try {
                        establishRoute(proxyAuthState, managedConn, route, request, context);
                    } catch (final TunnelRefusedException ex) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug(ex.getMessage());
                        }
                        response = ex.getResponse();
                        break;
                    }
                }
                final int timeout = config.getSocketTimeout();
                if (timeout >= 0) {
                    managedConn.setSocketTimeout(timeout);
                }

                if (execAware != null && execAware.isAborted()) {
                    throw new RequestAbortedException("Request aborted");
                }

                if (this.log.isDebugEnabled()) {
                    this.log.debug("Executing request " + request.getRequestLine());
                }

                if (!request.containsHeader(AUTH.WWW_AUTH_RESP)) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Target auth state: " + targetAuthState.getState());
                    }
                    this.authenticator.generateAuthResponse(request, targetAuthState, context);
                }
                if (!request.containsHeader(AUTH.PROXY_AUTH_RESP) && !route.isTunnelled()) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Proxy auth state: " + proxyAuthState.getState());
                    }
                    this.authenticator.generateAuthResponse(request, proxyAuthState, context);
                }

                response = requestExecutor.execute(request, managedConn, context);

                // The connection is in or can be brought to a re-usable state.
                if (reuseStrategy.keepAlive(response, context)) {
                    // Set the idle duration of this connection
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                        if (duration > 0) {
                            s = "for " + duration + " " + TimeUnit.MILLISECONDS;
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection can be kept alive " + s);
                    }
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    connHolder.markReusable();
                } else {
                    connHolder.markNonReusable();
                }

                if (needAuthentication(
                        targetAuthState, proxyAuthState, route, response, context)) {
                    // Make sure the response body is fully consumed, if present
                    final HttpEntity entity = response.getEntity();
                    if (connHolder.isReusable()) {
                        EntityUtils.consume(entity);
                    } else {
                        managedConn.close();
                        if (proxyAuthState.getState() == AuthProtocolState.SUCCESS
                                && proxyAuthState.getAuthScheme() != null
                                && proxyAuthState.getAuthScheme().isConnectionBased()) {
                            this.log.debug("Resetting proxy auth state");
                            proxyAuthState.reset();
                        }
                        if (targetAuthState.getState() == AuthProtocolState.SUCCESS
                                && targetAuthState.getAuthScheme() != null
                                && targetAuthState.getAuthScheme().isConnectionBased()) {
                            this.log.debug("Resetting target auth state");
                            targetAuthState.reset();
                        }
                    }
                    // discard previous auth headers
                    final HttpRequest original = request.getOriginal();
                    if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {
                        request.removeHeaders(AUTH.WWW_AUTH_RESP);
                    }
                    if (!original.containsHeader(AUTH.PROXY_AUTH_RESP)) {
                        request.removeHeaders(AUTH.PROXY_AUTH_RESP);
                    }
                } else {
                    break;
                }
            }

            if (userToken == null) {
                userToken = userTokenHandler.getUserToken(context);
                context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
            }
            if (userToken != null) {
                connHolder.setState(userToken);
            }

            // check for entity, release connection if possible
            final HttpEntity entity = response.getEntity();
            if (entity == null || !entity.isStreaming()) {
                // connection not needed and (assumed to be) in re-usable state
                connHolder.releaseConnection();
                return new HttpResponseProxy(response, null);
            } else {
                return new HttpResponseProxy(response, connHolder);
            }
        } catch (final ConnectionShutdownException ex) {
            final InterruptedIOException ioex = new InterruptedIOException(
                    "Connection has been shut down");
            ioex.initCause(ex);
            throw ioex;
        } catch (final HttpException ex) {
            connHolder.abortConnection();
            throw ex;
        } catch (final IOException ex) {
            connHolder.abortConnection();
            throw ex;
        } catch (final RuntimeException ex) {
            connHolder.abortConnection();
            throw ex;
        }
    }

实际发送的这个方法比较长,我们分几段看。

第一步是根据上述的HttpRoute和服务端建立TCP连接

通过connManager.requestConnection(route, userToken);建立连接,在HttpClientBuilder中,设置connManagerPoolingHttpClientConnectionManager

public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        Args.notNull(route, "HTTP route");
        if (this.log.isDebugEnabled()) {
            this.log.debug("Connection request: " + format(route, state) + formatStats(route));
        }
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {
            @Override
            public boolean cancel() {
                return future.cancel(true);
            }

            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                return leaseConnection(future, timeout, tunit);
            }
        };
    }

上面这一连串下来,起始就是要通过中连接池拿连接,建立连接的实际调用在其父类AbstractConnPool中:

private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final Future<E> future) throws IOException, InterruptedException, TimeoutException {

        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));
        }
        this.lock.lock();
        try {
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                for (;;) {
                    entry = pool.getFree(state);
                    if (entry == null) {
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }

                // New connection is needed
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }

                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }

                boolean success = false;
                try {
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                    pool.queue(future);
                    this.pending.add(future);
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                } finally {
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }

这里面当连接不够时,如果没超过maxPerRoutemaxTotal就会在创建一个链接,最终通过ManagedHttpClientConnectionFactory创建:

    public ManagedHttpClientConnection create(final HttpRoute route, final ConnectionConfig config) {
        final ConnectionConfig cconfig = config != null ? config : ConnectionConfig.DEFAULT;
        CharsetDecoder chardecoder = null;
        CharsetEncoder charencoder = null;
        final Charset charset = cconfig.getCharset();
        final CodingErrorAction malformedInputAction = cconfig.getMalformedInputAction() != null ?
                cconfig.getMalformedInputAction() : CodingErrorAction.REPORT;
        final CodingErrorAction unmappableInputAction = cconfig.getUnmappableInputAction() != null ?
                cconfig.getUnmappableInputAction() : CodingErrorAction.REPORT;
        if (charset != null) {
            chardecoder = charset.newDecoder();
            chardecoder.onMalformedInput(malformedInputAction);
            chardecoder.onUnmappableCharacter(unmappableInputAction);
            charencoder = charset.newEncoder();
            charencoder.onMalformedInput(malformedInputAction);
            charencoder.onUnmappableCharacter(unmappableInputAction);
        }
        final String id = "http-outgoing-" + Long.toString(COUNTER.getAndIncrement());
        return new LoggingManagedHttpClientConnection(
                id,
                log,
                headerlog,
                wirelog,
                cconfig.getBufferSize(),
                cconfig.getFragmentSizeHint(),
                chardecoder,
                charencoder,
                cconfig.getMessageConstraints(),
                incomingContentStrategy,
                outgoingContentStrategy,
                requestWriterFactory,
                responseParserFactory);
    }

到这里我们获取到了一个HttpClientConnection,但是这个时候并没有建立真正的连接。加下来通过:

 this.connManager.connect(
                    managedConn,
                    route,
                    timeout > 0 ? timeout : 0,
                    context);
public void  connect(
            final HttpClientConnection managedConn,
            final HttpRoute route,
            final int connectTimeout,
            final HttpContext context) throws IOException {
        Args.notNull(managedConn, "Managed Connection");
        Args.notNull(route, "HTTP route");
        final ManagedHttpClientConnection conn;
        synchronized (managedConn) {
            final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
            conn = entry.getConnection();
        }
        final HttpHost host;
        if (route.getProxyHost() != null) {
            host = route.getProxyHost();
        } else {
            host = route.getTargetHost();
        }
        final InetSocketAddress localAddress = route.getLocalSocketAddress();
        SocketConfig socketConfig = this.configData.getSocketConfig(host);
        if (socketConfig == null) {
            socketConfig = this.configData.getDefaultSocketConfig();
        }
        if (socketConfig == null) {
            socketConfig = SocketConfig.DEFAULT;
        }
        this.connectionOperator.connect(
                conn, host, localAddress, connectTimeout, socketConfig, context);
    } 
    public void connect(
            final ManagedHttpClientConnection conn,
            final HttpHost host,
            final InetSocketAddress localAddress,
            final int connectTimeout,
            final SocketConfig socketConfig,
            final HttpContext context) throws IOException {
        final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(context);
        final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName());
        if (sf == null) {
            throw new UnsupportedSchemeException(host.getSchemeName() +
                    " protocol is not supported");
        }
        final InetAddress[] addresses = host.getAddress() != null ?
                new InetAddress[] { host.getAddress() } : this.dnsResolver.resolve(host.getHostName());
        final int port = this.schemePortResolver.resolve(host);
        for (int i = 0; i < addresses.length; i++) {
            final InetAddress address = addresses[i];
            final boolean last = i == addresses.length - 1;

            Socket sock = sf.createSocket(context);
            sock.setSoTimeout(socketConfig.getSoTimeout());
            sock.setReuseAddress(socketConfig.isSoReuseAddress());
            sock.setTcpNoDelay(socketConfig.isTcpNoDelay());
            sock.setKeepAlive(socketConfig.isSoKeepAlive());
            if (socketConfig.getRcvBufSize() > 0) {
                sock.setReceiveBufferSize(socketConfig.getRcvBufSize());
            }
            if (socketConfig.getSndBufSize() > 0) {
                sock.setSendBufferSize(socketConfig.getSndBufSize());
            }

            final int linger = socketConfig.getSoLinger();
            if (linger >= 0) {
                sock.setSoLinger(true, linger);
            }
            conn.bind(sock);

            final InetSocketAddress remoteAddress = new InetSocketAddress(address, port);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connecting to " + remoteAddress);
            }
            try {
                sock = sf.connectSocket(
                        connectTimeout, sock, host, remoteAddress, localAddress, context);
                conn.bind(sock);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Connection established " + conn);
                }
                return;
            } catch (final SocketTimeoutException ex) {
                if (last) {
                    throw new ConnectTimeoutException(ex, host, addresses);
                }
            } catch (final ConnectException ex) {
                if (last) {
                    final String msg = ex.getMessage();
                    if ("Connection timed out".equals(msg)) {
                        throw new ConnectTimeoutException(ex, host, addresses);
                    } else {
                        throw new HttpHostConnectException(ex, host, addresses);
                    }
                }
            } catch (final NoRouteToHostException ex) {
                if (last) {
                    throw ex;
                }
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connect to " + remoteAddress + " timed out. " +
                        "Connection will be retried using another IP address");
            }
        }
    }                   

建立真正的连接。以http协议为例,这里通过PlainConnectionSocketFactory创建一个普通的Socket,然后绑定到到连接上:

public Socket createSocket(final HttpContext context) throws IOException {
        return new Socket();
    }

    @Override
    public Socket connectSocket(
            final int connectTimeout,
            final Socket socket,
            final HttpHost host,
            final InetSocketAddress remoteAddress,
            final InetSocketAddress localAddress,
            final HttpContext context) throws IOException {
        final Socket sock = socket != null ? socket : createSocket(context);
        if (localAddress != null) {
            sock.bind(localAddress);
        }
        try {
            sock.connect(remoteAddress, connectTimeout);
        } catch (final IOException ex) {
            try {
                sock.close();
            } catch (final IOException ignore) {
            }
            throw ex;
        }
        return sock;
    }

到这一步,连接就真正建立起来了。

第二步发送请求信息

请求的发送,是通过HttpRequestExecutor进行发送的:

protected HttpResponse doSendRequest(
            final HttpRequest request,
            final HttpClientConnection conn,
            final HttpContext context) throws IOException, HttpException {
        Args.notNull(request, "HTTP request");
        Args.notNull(conn, "Client connection");
        Args.notNull(context, "HTTP context");

        HttpResponse response = null;

        context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
        context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.FALSE);

        conn.sendRequestHeader(request);
        if (request instanceof HttpEntityEnclosingRequest) {
            // Check for expect-continue handshake. We have to flush the
            // headers and wait for an 100-continue response to handle it.
            // If we get a different response, we must not send the entity.
            boolean sendentity = true;
            final ProtocolVersion ver =
                request.getRequestLine().getProtocolVersion();
            if (((HttpEntityEnclosingRequest) request).expectContinue() &&
                !ver.lessEquals(HttpVersion.HTTP_1_0)) {

                conn.flush();
                // As suggested by RFC 2616 section 8.2.3, we don't wait for a
                // 100-continue response forever. On timeout, send the entity.
                if (conn.isResponseAvailable(this.waitForContinue)) {
                    response = conn.receiveResponseHeader();
                    if (canResponseHaveBody(request, response)) {
                        conn.receiveResponseEntity(response);
                    }
                    final int status = response.getStatusLine().getStatusCode();
                    if (status < 200) {
                        if (status != HttpStatus.SC_CONTINUE) {
                            throw new ProtocolException(
                                    "Unexpected response: " + response.getStatusLine());
                        }
                        // discard 100-continue
                        response = null;
                    } else {
                        sendentity = false;
                    }
                }
            }
            if (sendentity) {
                conn.sendRequestEntity((HttpEntityEnclosingRequest) request);
            }
        }
        conn.flush();
        context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.TRUE);
        return response;
    }

首先是写入请求的header:

public void write(final T message) throws IOException, HttpException {
        Args.notNull(message, "HTTP message");
        writeHeadLine(message);
        for (final HeaderIterator it = message.headerIterator(); it.hasNext(); ) {
            final Header header = it.nextHeader();
            this.sessionBuffer.writeLine
                (lineFormatter.formatHeader(this.lineBuf, header));
        }
        this.lineBuf.clear();
        this.sessionBuffer.writeLine(this.lineBuf);
    }
    protected void doFormatHeader(final CharArrayBuffer buffer,
                                  final Header header) {
        final String name = header.getName();
        final String value = header.getValue();

        int len = name.length() + 2;
        if (value != null) {
            len += value.length();
        }
        buffer.ensureCapacity(len);

        buffer.append(name);
        buffer.append(": ");
        if (value != null) {
            buffer.append(value);
        }
    }

可以看到,http请求中,header每个key-value分行写入,并且按照Key: Value的格式。

写入完header之后,接下来就会写入请求体:

public void sendRequestEntity(final HttpEntityEnclosingRequest request)
            throws HttpException, IOException {
        Args.notNull(request, "HTTP request");
        assertOpen();
        if (request.getEntity() == null) {
            return;
        }
        this.entityserializer.serialize(
                this.outbuffer,
                request,
                request.getEntity());
    }
public void serialize(
            final SessionOutputBuffer outbuffer,
            final HttpMessage message,
            final HttpEntity entity) throws HttpException, IOException {
        Args.notNull(outbuffer, "Session output buffer");
        Args.notNull(message, "HTTP message");
        Args.notNull(entity, "HTTP entity");
        final OutputStream outstream = doSerialize(outbuffer, message);
        entity.writeTo(outstream);
        outstream.close();
    }    
protected OutputStream doSerialize(
            final SessionOutputBuffer outbuffer,
            final HttpMessage message) throws HttpException, IOException {
        final long len = this.lenStrategy.determineLength(message);
        if (len == ContentLengthStrategy.CHUNKED) {
            return new ChunkedOutputStream(outbuffer);
        } else if (len == ContentLengthStrategy.IDENTITY) {
        	// 默认走这里
            return new IdentityOutputStream(outbuffer);
        } else {
            return new ContentLengthOutputStream(outbuffer, len);
        }
    }    
   

如果是我们常见的UrlEncodedFormEntity,则是拼接成 key1=value1&key2=value2的格式,获取其byte数组写入到流中。

第三步获取响应

在发送完请求之后,会通过doReceiveResponse获取响应:

protected HttpResponse doReceiveResponse(
            final HttpRequest request,
            final HttpClientConnection conn,
            final HttpContext context) throws HttpException, IOException {
        Args.notNull(request, "HTTP request");
        Args.notNull(conn, "Client connection");
        Args.notNull(context, "HTTP context");
        HttpResponse response = null;
        int statusCode = 0;

        while (response == null || statusCode < HttpStatus.SC_OK) {

            response = conn.receiveResponseHeader();
            if (canResponseHaveBody(request, response)) {
                conn.receiveResponseEntity(response);
            }
            statusCode = response.getStatusLine().getStatusCode();

        } // while intermediate response

        return response;
    }

获取响应也是先获取响应的header,然后获取响应体:

public HttpResponse receiveResponseHeader()
            throws HttpException, IOException {
        assertOpen();
        final HttpResponse response = this.responseParser.parse();
        if (response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK) {
            this.metrics.incrementResponseCount();
        }
        return response;
    }
public T parse() throws IOException, HttpException {
        final int st = this.state;
        switch (st) {
        case HEAD_LINE:
            try {
                this.message = parseHead(this.sessionBuffer);
            } catch (final ParseException px) {
                throw new ProtocolException(px.getMessage(), px);
            }
            this.state = HEADERS;
            //$FALL-THROUGH$
        case HEADERS:
            final Header[] headers = AbstractMessageParser.parseHeaders(
                    this.sessionBuffer,
                    this.messageConstraints.getMaxHeaderCount(),
                    this.messageConstraints.getMaxLineLength(),
                    this.lineParser,
                    this.headerLines);
            this.message.setHeaders(headers);
            final T result = this.message;
            this.message = null;
            this.headerLines.clear();
            this.state = HEAD_LINE;
            return result;
        default:
            throw new IllegalStateException("Inconsistent parser state");
        }
    }    

AbstractHttpClientConnection实现了receiveResponseHeader

    public HttpResponse receiveResponseHeader()
            throws HttpException, IOException {
        assertOpen();
        final HttpResponse response = this.responseParser.parse();
        if (response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK) {
            this.metrics.incrementResponseCount();
        }
        return response;
    }

而这里的responseParser其实现为DefaultHttpResponseParser 这个里面会对响应进行HTTP协议头的解析,先解析出响应的状态行的信息,
最常见的是HTTP-Version Status-Code Reason-Phrase格式。
组成部分:

  1. HTTP-Version:这是HTTP协议的版本,如HTTP/1.1或HTTP/2。
  2. Status-Code:这是一个三位数字的代码,用来表示请求的结果状态。例如,200表示成功,404表示未找到,500表示服务器内部错误等。
  3. Reason-Phrase:这是一个简短的文本描述,用来解释状态码的含义。例如,对于状态码200,原因短语可能是"OK"。
    基础实现为BasicStatusLine:
public interface StatusLine {
    ProtocolVersion getProtocolVersion();
    int getStatusCode();
    String getReasonPhrase();
}
public class BasicStatusLine implements StatusLine, Cloneable, Serializable {
    private final ProtocolVersion protoVersion;
    private final int statusCode;
    private final String reasonPhrase;
}

可以看到,上面先从连接的input流中读取了StausLine,然后紧接着按行读取请求头,按照“ : ”分割每个header。

响应的请求头处理完之后,会判断是否有响应内容需要处理:

    protected boolean canResponseHaveBody(final HttpRequest request,
                                          final HttpResponse response) {
        if ("HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
            return false;
        }
        final int status = response.getStatusLine().getStatusCode();
        return status >= HttpStatus.SC_OK
            && status != HttpStatus.SC_NO_CONTENT
            && status != HttpStatus.SC_NOT_MODIFIED
            && status != HttpStatus.SC_RESET_CONTENT;
    }

判断是否有响应内容读取,主要判断请求方法和响应码。如果有响应内容需要读取,则通过EntityDeserializer读取连接的InputBuffer:

    public void receiveResponseEntity(final HttpResponse response)
            throws HttpException, IOException {
        Args.notNull(response, "HTTP response");
        assertOpen();
        final HttpEntity entity = this.entitydeserializer.deserialize(this.inbuffer, response);
        response.setEntity(entity);
    }
protected BasicHttpEntity doDeserialize(
            final SessionInputBuffer inbuffer,
            final HttpMessage message) throws HttpException, IOException {
        final BasicHttpEntity entity = new BasicHttpEntity();

        final long len = this.lenStrategy.determineLength(message);
        if (len == ContentLengthStrategy.CHUNKED) {
            entity.setChunked(true);
            entity.setContentLength(-1);
            entity.setContent(new ChunkedInputStream(inbuffer));
        } else if (len == ContentLengthStrategy.IDENTITY) {
        // 默认走这里
            entity.setChunked(false);
            entity.setContentLength(-1);
            entity.setContent(new IdentityInputStream(inbuffer));
        } else {
            entity.setChunked(false);
            entity.setContentLength(len);
            entity.setContent(new ContentLengthInputStream(inbuffer, len));
        }

        final Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
        if (contentTypeHeader != null) {
            entity.setContentType(contentTypeHeader);
        }
        final Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
        if (contentEncodingHeader != null) {
            entity.setContentEncoding(contentEncodingHeader);
        }
        return entity;
    }

这里默认读取响应流的策略是IDENTITY.这里我们也可以看到,默认返回BasicHttpEntitycontent就是一个流。
得到响应流之后,我们一般会从HttpEntity响应中得到具体的响应内容,然后在关闭流:

CloseableHttpResponse response = client.execute(httpPost);
                HttpEntity entity = response.getEntity();
                if (entity != null) {
                    result = EntityUtils.toString(entity, DEFAULT_UTF8_ENCODING);
                }
                response.close();

EntityUtils.toString则将HttpEntity总input流读取到一个String中去。

private static String toString(
            final HttpEntity entity,
            final ContentType contentType) throws IOException {
        final InputStream inStream = entity.getContent();
        if (inStream == null) {
            return null;
        }
        try {
            Args.check(entity.getContentLength() <= Integer.MAX_VALUE,
                    "HTTP entity too large to be buffered in memory");
            int capacity = (int)entity.getContentLength();
            if (capacity < 0) {
                capacity = DEFAULT_BUFFER_SIZE;
            }
            Charset charset = null;
            if (contentType != null) {
                charset = contentType.getCharset();
                if (charset == null) {
                    final ContentType defaultContentType = ContentType.getByMimeType(contentType.getMimeType());
                    charset = defaultContentType != null ? defaultContentType.getCharset() : null;
                }
            }
            if (charset == null) {
                charset = HTTP.DEF_CONTENT_CHARSET;
            }
            final Reader reader = new InputStreamReader(inStream, charset);
            final CharArrayBuffer buffer = new CharArrayBuffer(capacity);
            final char[] tmp = new char[1024];
            int l;
            while((l = reader.read(tmp)) != -1) {
                buffer.append(tmp, 0, l);
            }
            return buffer.toString();
        } finally {
            inStream.close();
        }
    }

可以看到,读取完之后就把HttpEntity中的流关闭了,也就是这个流一直到我们实际读取内容之后才关闭。接下来,就是关闭response,

public void close() throws IOException {
        if (this.connHolder != null) {
            this.connHolder.close();
        }
    }
    public void close() throws IOException {
        releaseConnection(false);
    }
    private void releaseConnection(final boolean reusable) {
        if (this.released.compareAndSet(false, true)) {
            synchronized (this.managedConn) {
                if (reusable) {
                    this.manager.releaseConnection(this.managedConn,
                            this.state, this.validDuration, this.tunit);
                } else {
                    try {
                        this.managedConn.close();
                        log.debug("Connection discarded");
                    } catch (final IOException ex) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug(ex.getMessage(), ex);
                        }
                    } finally {
                        this.manager.releaseConnection(
                                this.managedConn, null, 0, TimeUnit.MILLISECONDS);
                    }
                }
            }
        }
    }

如果是可以复用的,则会将连接归还到连接池中。注意,这里的连接时在http client层封装的,底层的网络Socket是一次性的,也会被关闭。

这样就完成了一次完整的http调用