OkHttp源码解析

发表于 5年以前  | 总阅读数:2196 次

概述

OkHttp是一个适用于AndroidJava应用程序的HTTP + HTTP/2框架。

使用示例

                //创建OkHttpClient.Builder
        OkHttpClient.Builder builder = new OkHttpClient.Builder();

                //创建OkHttpClient
        OkHttpClient okHttpClient = builder.build();

                //创建Request
        Request request = new Request.Builder().build();

                //创建Call
        Call call = okHttpClient.newCall(request);

                //发起同步请求
        try {
            Response response = call.execute();
        } catch (IOException e) {
            e.printStackTrace();
        }

                //发起异步请求
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {

            }
        });

一步一步来看,首先第一步是获取一个OkHttpClient.Builder对象,然后第二步通过这个builder对象build()出来了一个OkHttp对象,不用说,这是简单的建造者(Builder)设计模式,看一眼OkHttpClient.Builder的构造方法:

 public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      callTimeout = 0;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }

这里初始化了一堆东西,我们重点注意一下第2行的dispatcher对象,对应的Dispathcher类时OkHttp中的核心类,我们后面会重点详细解析,大家先有个印象即可。

第三步是通过Request request = new Request.Builder().build()初始化了一个Request对象,也是建造者模式,这个Request对象主要是描述要发起请求的详细信息。

第四步通过Call call = okHttpClient.newCall(request)创建了一个Call的对象,来看看这个newCall()方法:

 @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

再继续深入到RealCall.newRealCall()中:

  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.transmitter = new Transmitter(client, call);
    return call;
  }

到这里我们就明白了,我们获取到的Call对象实际上是一个RealCall对象,看看CallRealCall的声明:

Call

public interface Call extends Cloneable

RealCall

final class RealCall implements Call 

明白了,Call是一个接口,而RealCall实现了这个接口,所以返回new RealCall()call对象当然是没问题的。

然后,如果我们想发起同步网络请求,则执行:

 Response response = call.execute();

如果想发起异步网络请求,则执行:

 call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {

            }
        });

我们先来分析同步的情况。

RealCall.excute()

我们上面提到,这个call实际上是一个RealCall对象,那么我们看看这个RealCall.excute()方法的源码:

   @Override public Response execute() throws IOException {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        transmitter.timeoutEnter();
        transmitter.callStart();
        try {
          client.dispatcher().executed(this);
          return getResponseWithInterceptorChain();
        } finally {
          client.dispatcher().finished(this);
        }
  }

可以看到,这里首先使用了一个synchronized锁判断了executed标志位的值,如果executedtrue,则抛出异常,异常信息为"Already Executed",否则将executed置为true,继续执行下面的逻辑。所以,这个executed就是用来标记当前RealCallexcute()方法是否已经被执行过,后面到异步请求enqueue()的代码中我们会发现同样使用了这个executed标志位做了相同逻辑的判断,所以我们可以得出一个Call对象只能被执行一次(不管是同步请求还是异步请求)的结论。

==那么可能有同学会有疑问了,为什么OkHttp中的一个Call对象只能发起一次请求?这个和OkHttp中的连接池有关系,我们会在后面讲ConnectInterceptor拦截器的时候详细分析。==

如果excuted判断没有问题之后,就会执行:

transmitter.timeoutEnter();
transmitter.callStart();
try {
    client.dispatcher().executed(this);
    return getResponseWithInterceptorChain();
} finally {
    client.dispatcher().finished(this);
}

我们抓住重点,直接从第4行开始看,这里执行了 client.dispatcher().executed(this),注意这个client是我们刚才传进来的OkHttpClient对象,dispather对象是我们刚才在上面提到的OkHttpClient.Builder中初始化的,我们来看看这个Dispatcher.excuted()方法:

 /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

可以看到这里主要是将当前这次请求的call对象加入到了runningSyncCalls中,我们看看这份runningSyncCalls的声明:

 /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

这个runningSyncCalls是一个队列,从其源代码的注释我们可以得知这个runningSyncCalls的作用是存储当前OkHttpClient正在执行的同步请求。

好,下一步我们来分析:

@Override public Response execute() throws IOException {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        transmitter.timeoutEnter();
        transmitter.callStart();
        try {
          client.dispatcher().executed(this);
          return getResponseWithInterceptorChain();
        } finally {
          client.dispatcher().finished(this);
        }
  }

中第9行之后的代码,可以看到,第10行直接返回了一个getResponseWithInterceptorChain(),而public Response execute()方法返回的是一个Response对象,所以说这个getResponseWithInterceptorChain()方法返回的也是一个Response对象,即这个getResponseWithInterceptorChain()方法中执行了真正的同步请求的逻辑并返回了Response对象,其具体实现细节我们后面详细分析。

注意,Response execute()方法的第11行到第13行,这是try...finally语句块中的finally体,也就是说无论try中语句的执行结构如何,都会执行这个finally块中代码,其中只有一行代码:

client.dispatcher().finished(this);

我们来看看Dispatcher.finished()方法的实现:

 /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call);
  }

  private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

client.dispatcher().finished(this)调用了dispatcher().finished(this)方法并将自身(call)传递了进去,在finished(RealCall call)方法中又调用了finished(Deque<T> calls, T call)方法,传入了runningSyncCalls和我们当前的call对象,还记得这个runningSyncCalls在哪里出现过吗?对的,它在dispater.excuted()方法中出现过,当时的操作是将当前call对象加入到这个runningSyncCalls队列中,那么现在请求结束了,finished()方法中应该做什么?当然是将当前call对象从runningSyncCalls队列中移除,在代码中就是:

synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

可以看到为了考虑线程安全,这里使用了synchronized锁保证了线程同步,然后将当前callrunningSyncCalls队列中移除。

到这里我们就分析完了同步请求的大致流程,现在我们来看一下OkHttp中发起请求的核心方法getResponseWithInterceptorChain(),可以看到在同步请求中仅仅调用了这一个方法就得到了返回的Response,我们来看一下它的源码:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,originalRequest, this, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

第3行到第12行是new了一个List并依次将用户自定义的应用拦截器集合(第4行)、OkHttp内置的拦截器集合(第5-8行)、用户自定义的网络拦截器集合(第9-11行)添加了进去,构成了一个大的拦截器集合。

然后执行:

 Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,originalRequest, this, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());

再看看RealInterceptorChain类的构造方法:

public RealInterceptorChain(List<Interceptor> interceptors, Transmitter transmitter,
      @Nullable Exchange exchange, int index, Request request, Call call,
      int connectTimeout, int readTimeout, int writeTimeout) {
    this.interceptors = interceptors;
    this.transmitter = transmitter;
    this.exchange = exchange;
    this.index = index;
    this.request = request;
    this.call = call;
    this.connectTimeout = connectTimeout;
    this.readTimeout = readTimeout;
    this.writeTimeout = writeTimeout;
  }

主要是初始化了一个RealInterceptorChain类的对象,注意一下传入的第1个参数(interceptors)和第4个参数(index),分别传入的是我们刚才构成的拦截器集合以及0,一会我们会用到。

初始化好RealInterceptorChain的对象后继续往下执行,关注一下第18行,可以看到,真正返回的response就是从这里的 chain.proceed(originalRequest)方法返回的,当前这个chainRealInterceptorChain类的对象,所以我们来看看RealInterceptorChain.proceed()方法中做了什么:

 @Override public Response proceed(Request request) throws IOException {
    return proceed(request, transmitter, exchange);
  }

可以看到,虽然我们调用的是chain.proceed(originalRequest),但是实际上它内部执行的是 proceed(request, transmitter, exchange),我们来看看这个方法的源码:

public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.exchange != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

第2行首先检查了一下index是否超出了interceptorssize,还记得indexinterceptors是什么吗?对的,就是我们在getResponseWithInterceptorChain()源码的第14行传入的0和我们初始化的拦截器集合,为什么要检测indexinterceptorssize之间的关系呢?猜想是想通过index访问·中的元素,我们继续往下看,注意第19行到第21行,我们把这几行代码拿下来:

 RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

这里又初始化了一个RealInterceptorChain对象,那这里初始化的这个RealInterceptorChain对象和当前RealInterceptorChain有什么区别呢?我们再看看当前RealInterceptorChain对象初始化的代码:

Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,originalRequest, this, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());

可以发现,只有一个参数不一样,就是第4个参数,对应RealInterceptorChain构造方法中的index参数,现在这个RealInterceptorChain next对象的构造方法中的index值传入的是当前RealInterceptorChain对象的index+1。然后下一行果然通过index拿到了interceptors中的元素interceptor,这也是proceed()方法的开头为什么先检测indexinterceptors.size()大小关系的原因,就是为了防止这发生越界异常。拿到interceptor对象之后,下一行执行了interceptor.intercept(next)并返回了response,而最后也是将这个response最终作为当前proceed()方法的返回值,这时候我们就有必要深入一下interceptor.intercept(next)的源码了,我们尝试跟踪源码,会发现这个Interceptor其实是一个接口: 在这里插入图片描述

我们看看Interceptor都有哪些实现类: image-20190503114437541 我们看到了5个拦截器类,由于当前interceptor是通过interceptors.get(index)拿到的,而index当前传入的值为0,所以第一次执行的应该是第一个加入拦截器集合的那个拦截器类的intercept()方法,这里我们先不考虑用户自添加的拦截器,那么第一个拦截器就应该是RetryAndFollowUpInterceptor拦截器,我们来看看它的intercept()方法:

 @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Transmitter transmitter = realChain.transmitter();

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      transmitter.prepareToConnect(request);

      if (transmitter.isCanceled()) {
        throw new IOException("Canceled");
      }

      Response response;
      boolean success = false;
      try {
        response = realChain.proceed(request, transmitter, null);
        success = true;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), transmitter, false, request)) {
          throw e.getFirstConnectException();
        }
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, transmitter, requestSendStarted, request)) throw e;
        continue;
      } finally {
        // The network call threw an exception. Release any resources.
        if (!success) {
          transmitter.exchangeDoneDueToException();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Exchange exchange = Internal.instance.exchange(response);
      Route route = exchange != null ? exchange.connection().route() : null;
      Request followUp = followUpRequest(response, route);

      if (followUp == null) {
        if (exchange != null && exchange.isDuplex()) {
          transmitter.timeoutEarlyExit();
        }
        return response;
      }

      RequestBody followUpBody = followUp.body();
      if (followUpBody != null && followUpBody.isOneShot()) {
        return response;
      }

      closeQuietly(response.body());
      if (transmitter.hasExchange()) {
        exchange.detachWithViolence();
      }

      if (++followUpCount > MAX_FOLLOW_UPS) {
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      request = followUp;
      priorResponse = response;
    }
  }

代码很长,请大家暂时忽略其他代码,只关注第3行和第18行,第3行将当前传入的Chain chain对象类型转化为了RealInterceptorChain realChain对象,第18行执行了realChain.proceed(request, transmitter, null)并返回了response,注意,这个realChain是我们调用当前intercept()方法时传入的chain参数,而这个chain参数传入的是:

 RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

即由interceptorsindex+1构建的新的RealInterceptorChain,所以整个逻辑就是:image-20190505084646823

可以发现,整个拦截器集合构成了一个链式结构,当前拦截器执行完对应的拦截方法后激活下一个拦截器开始工作,直到最后一个拦截器,这也告诉我们如果要添加自定义的拦截器,则必须在重写intercept(Chain chain)方法时返回chain.proceed(),否则责任链就会断链,自然不会成功地发起网络请求。

注意由于CallServerInterceptor这个拦截器是最后一个拦截器,所以它的intercept方法中没有像之前的拦截器那样执行next.proceed(),而是直接使用传入的Chain chain参数直接发起了网络请求并返回Response

至此,我们已经分析完了OkHttp同步请求的完整流程,总结一下:

  • RealCall.excute()发起网络同步请求
  • 利用excuted标志位判断是否当前call对象已经执行过,若执行过抛出异常
  • 调用client.dispatcher.excuted(),将当前call对象加入runningSyncCalls这个队列
  • 调用getResponseWithInterceptorChain()方法,内部利用责任链模式依次执行拦截器链中的拦截器,最终发起网络请求并返回Response
  • 调用client.dispatcher.finished(),将当前call对象从runningSyncCalls队列中移除
  • 返回Response

Realcall.enqueue()

先看一下发起OkHttp异步网络请求的典型代码:

 call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {

            }
        });

可以看到,传入了一个CallBack的回调对象,该CallBack类中有onFailure()onResponse()两个方法,分别代表网络请求发起成功和失败的情况,这里抛出一个问题供大家思考:onFailure()onResponse()这两个方法是处于主线程的还是子线程的?这个问题我会在后面的分析中解答。

那么我们看一下RealCall.enqueue(Callback callback)的源码:

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.callStart();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

可以看到,和同步的excute()一样,开头先会检测excuted标志位判断当前call对象是否已经被执行过,如果已经被执行过,抛出异常。

如果当前call对象没有被执行过,则执行第7行,调用Dispatcherenqueue()方法,传入了一个AsyncCall参数,我们先看看Dispatcher.enqueue()方法的源码:

void enqueue(AsyncCall call) {
    synchronized (this) {
        readyAsyncCalls.add(call);

        // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
        // the same host.
        if (!call.get().forWebSocket) {
          AsyncCall existingCall = findExistingCallWithHost(call.host());
          if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
        }
    }
    promoteAndExecute();
  }

首先加锁将传入的AsyncCall call加入readyAsyncCalls这个队列,然后执行了第7到第10行,首先判断call.get().forWebSocket的值,其声明及初始化如下:

 final boolean forWebSocket;
 private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
         ......
     this.forWebSocket = forWebSocket;
 }

RealCall的构造方法的调用代码如下:

  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

forWebSocket的值默认为false,所以,会执行Dispatcher.enqueue()方法中 的第8-9行:

AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);

看看findExistingCallWithHost()方法 的源码:

@Nullable private AsyncCall findExistingCallWithHost(String host) {
    for (AsyncCall existingCall : runningAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    for (AsyncCall existingCall : readyAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    return null;
  }

可以看到这个方法就是遍历了一下当前正在执行的和准备执行的异步网络请求的call队列,看看有没有某一个callhost和当前callhost相同,如果有就返回。

然后:

if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);

判断了一下findExistingCallWithHost()返回的是否为null,如果不为null,调用call.reuseCallsPerHostFrom(existingCall)

 void reuseCallsPerHostFrom(AsyncCall other) {
      this.callsPerHost = other.callsPerHost;
    }

就是对call对象的callsPerHost进行了更新,注意这里是直接使用了=this.callsPerHost进行了赋值,而且在java中参数默认传递的是引用,所以当前callsPerHostfindExistingCallWithHost()中返回的那个AsyncCall对象的callsPerHost是同一个引用,那么再延伸一下,所有host相同的AsyncCall对象中的callsPerHost都是同一个引用,即如果改变其中一个AsyncCall对象的callsPerHost值,其他的所有AsyncCall对象的 callsPerHost的值也会随之改变,下面我们会看到作者巧妙地利用了这一点更新了所有host相同的 AsyncCall对象的callsPerHost值,实在是非常优秀。 这个callPerHost的声明如下:

 private volatile AtomicInteger callsPerHost = new AtomicInteger(0);

即初始为0。

然后执行 promoteAndExecute()方法,我们看看其源码:

private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

这个方法中遍历了readyAsyncCalls这个队列,对于readyAsyncCalls中的每一个被遍历的当前AsyncCall对象,会首先判断runningAsyncCalls这个队列的长度是否大于了maxRequests,这个maxRequests默认是64,意思就是说要求当前正在执行的网络请求不能超过64个(为了节约资源考虑),如果runningAsyncCalls的元素数量不超过maxRequests,则判断asyncCall.callsPerHost().get()是否大于maxRequestsPerHostmaxRequestsPerHost的值默认为5,上面我们说到callsPerHost的初始为0,那么asyncCall.callsPerHost().get()初始应该是小于maxRequestsPerHost的,这里这个判断的意思就是当前正在执行的所有的请求中,与asyncCall对应的主机(host)相同请求的数量不能超过maxRequestsPerHost也就是5个,如果满足条件即同一个host的请求不超过5个,则往下执行13-16行,首先将当前AsyncCall对象从readyAsyncCalls中移除,然后执行asyncCall.callsPerHost().incrementAndGet(),就是将callsPerHost的值增1,上面我提到了,所有host相同的AsyncCall对象中的callsPerHost都是同一个引用,所以这里对当前这个callsPerHost的值增1实际上是更新了readyAsyncCalls中的所有AsyncCall对象中的callsPerHost的值,这样callsPerHost这个属性值就能够表示请求host与当前host相同的请求数量。

然后下面15行是将当前asyncCall对象加入到executableCalls中,下面会执行所有executableCalls中的请求,16行就是将当前这个asyncCall对象加入到runningAsyncCalls中表示其现在已经是正在执行了,注意这时executableCallsrunningAsyncCalls两个集合的不同。

然后下面第21行到24行,主要是对executableCalls进行了遍历,对于executableCalls中的每一个AsyncCall对象,执行asyncCall.executeOn()方法,传入了一个executorService(),我们首先看executorService()的源码:

private @Nullable ExecutorService executorService;
public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

可以看到这里是利用单例模式返回了一个线程池的对象,线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE即不加限制,这里将线程池的最大线程数置位Integer.MAX_VALUE的原因是我们在Dispatcher中默认使用了maxRequests控制了同时并发的最大请求数量,所以这里就不用在线程池中加以控制了,然后设置了最大存活时间为60s,也就是说如果当前线程的任务执行完成了,60s内本线程不会被销毁,如果此时有其他网络请求的任务,就不用新建线程,可以直接复用之前的线程,如果60s后还没有被复用,则本线程会被销毁。

然后我们看asyncCall.executeOn()的源码:

void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

重点是第5行,主要是调用线程池的execute()方法执行当前asyncCall,所以AsyncCall类应该实现了Runnable接口,我们看看AsyncCall类的声明:

final class AsyncCall extends NamedRunnable

再看看NamedRunnable接口的源码:

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

可以看到,NamedRunnable实现了Runnable接口,其run()方法中主要是调用了抽象方法execute(),而execute()方法会被AsyncCall类实现,所以,AsyncCall类的run()方法实际上执行的是其execute()中的内容:

@Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

可以看到,execute()方法中发起了真正的网络请求,核心方法是getResponseWithInterceptorChain(),这个方法我们在解析RealCall.excute()方法时已经解释过,其作用是发起网络请求,返回Response,注意这里使用了try...catch语句块对异常进行了捕捉,如果发生异常,则调用responseCallback.onFailure(RealCall.this, e),而这个responseCallback就是我们发起异步请求时传入的那个CallBack对象,所以就是在这里回调的onFailure()方法。如果请求成功,则调用responseCallback.onResponse(RealCall.this, response)即我们的onResponse()回调方法。那么由于当前execute()方法是在Runnable接口的run()方法中被调用的,而asyncCall又被传入了executorService.execute()中,所以当前execute()方法会在线程池中被执行,即onFailure()onResponse()这两个回调方法会在子线程被调用,这也说明了我们不能再RealCall.enqueue()方法的回调中直接更新UI,因为其回调方法都是在子线程被调用的。

最后关注一下第16行的finally语句块中的内容,主要是执行了client.dispatcher().finished(this)this指的是当前asyncCall对象,看看这个Dispatcher.finished()的源码:

/** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    call.callsPerHost().decrementAndGet();
    finished(runningAsyncCalls, call);
  }

可以看到,首先执行 call.callsPerHost().decrementAndGet()asyncCall对象的callsPerHost的值减1,因为当前asyncCall请求结束了,那么就应该将与本asyncCall具有相同host的请求数量减1,然后调用了:

private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

主要做的工作就是将当前call对象从runningAsyncCalls中移除。

至此,我们分析完了OkHttp异步网络请求的整体流程,可以发现,在异步网络请求中Dispatcher类扮演了相当重要的角色。

总结一下OkHttp异步请求的步骤:

  • 调用call.enqueue(CallBack callback),传入callback回调
  • 将当前call对象转化为asyncCall对象,调用client.dispater.enqueue(),调用OkHttpClientdispatcher对象的enqueue()方法
  • 将当前asyncCall对象加入readyAsyncCalls队列
  • 遍历readyAsyncCalls,将符合条件的asyncCall对象移除并加入executableCallsrunningAsyncCalls集合
  • 遍历executableCalls集合,执行每一个asyncCall对象的executeOn()方法,传入线程池
  • 在线程池中发起当前asyncCall的网络请求
  • 回调成功或失败对应的回调方法
  • 将当前asyncCall对象从runningAsyncCalls中移除

到这里就分析完了OkHttp中同步请求和异步请求的执行流程,之后会推出OkHttp中内置的5大拦截器的源码分析,深入分析每一个拦截器的实现原理与作用,欢迎大家关注。

 相关推荐

刘强东夫妇:“移民美国”传言被驳斥

京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。

发布于:1年以前  |  808次阅读  |  详细内容 »

博主曝三大运营商,将集体采购百万台华为Mate60系列

日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。

发布于:1年以前  |  770次阅读  |  详细内容 »

ASML CEO警告:出口管制不是可行做法,不要“逼迫中国大陆创新”

据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。

发布于:1年以前  |  756次阅读  |  详细内容 »

抖音中长视频App青桃更名抖音精选,字节再发力对抗B站

今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。

发布于:1年以前  |  648次阅读  |  详细内容 »

威马CDO:中国每百户家庭仅17户有车

日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。

发布于:1年以前  |  589次阅读  |  详细内容 »

研究发现维生素 C 等抗氧化剂会刺激癌症生长和转移

近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。

发布于:1年以前  |  449次阅读  |  详细内容 »

苹果据称正引入3D打印技术,用以生产智能手表的钢质底盘

据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。

发布于:1年以前  |  446次阅读  |  详细内容 »

千万级抖音网红秀才账号被封禁

9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...

发布于:1年以前  |  445次阅读  |  详细内容 »

亚马逊股东起诉公司和贝索斯,称其在购买卫星发射服务时忽视了 SpaceX

9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。

发布于:1年以前  |  444次阅读  |  详细内容 »

苹果上线AppsbyApple网站,以推广自家应用程序

据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。

发布于:1年以前  |  442次阅读  |  详细内容 »

特斯拉美国降价引发投资者不满:“这是短期麻醉剂”

特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。

发布于:1年以前  |  441次阅读  |  详细内容 »

光刻机巨头阿斯麦:拿到许可,继续对华出口

据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。

发布于:1年以前  |  437次阅读  |  详细内容 »

马斯克与库克首次隔空合作:为苹果提供卫星服务

近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。

发布于:1年以前  |  430次阅读  |  详细内容 »

𝕏(推特)调整隐私政策,可拿用户发布的信息训练 AI 模型

据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。

发布于:1年以前  |  428次阅读  |  详细内容 »

荣耀CEO谈华为手机回归:替老同事们高兴,对行业也是好事

9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI操控无人机能力超越人类冠军

《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI生成的蘑菇科普书存在可致命错误

近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。

发布于:1年以前  |  420次阅读  |  详细内容 »

社交媒体平台𝕏计划收集用户生物识别数据与工作教育经历

社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”

发布于:1年以前  |  411次阅读  |  详细内容 »

国产扫地机器人热销欧洲,国产割草机器人抢占欧洲草坪

2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。

发布于:1年以前  |  406次阅读  |  详细内容 »

罗永浩吐槽iPhone15和14不会有区别,除了序列号变了

罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。

发布于:1年以前  |  398次阅读  |  详细内容 »
 相关文章
简化Android的UI开发 5年以前  |  521240次阅读
Android 深色模式适配原理分析 4年以前  |  29635次阅读
Android阴影实现的几种方案 2年以前  |  12221次阅读
Android 样式系统 | 主题背景覆盖 4年以前  |  10293次阅读
 目录