IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    OKhttp源码学习(九)—— 任务管理(Dispatcher)

    shendao发表于 2017-06-02 13:09:29
    love 0

    源码地址:https://github.com/square/okhttp

    针对具体一个请求的流程,前面已经做了学习分析,现在对OkHttp的请求任务管理进行分析学习。

    使用过OkHttp的都知道,调用分为同步阻塞式的请求execute(),以及异步调用 enqueue(Callback responseCallback)
    ,同步请求没有什么好分析的,基本就是直接发起了请求。这里主要分析异步请求,是如何进行请求的管理和分配的。

    主要的类:Dispatcher.
    主要内容:

    1. 线程池
    2. 任务分发模型

    1. 线程池

    线程池,为解决的问题,很多资料都有具体的阐述,这里就引用一些专业的解释多线程:

    多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的例子:
    假设在一台服务器完成一项任务的时间为T

    T1 创建线程的时间
    T2 在线程中执行任务的时间,包括线程间同步所需时间
    T3 线程销毁的时间
    显然T = T1+T2+T3。注意这是一个极度简化的假设。
    可以看出T1,T3是多线程本身的带来的开销(在Java中,通过映射pThead,并进一步通过SystemCall实现native线程),我们渴望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点(T1,T3),而不是优点(并发性)。

    线程池,就是针对解决减少T1 和 T3的时间,提高服务的性能。

    1.1 OkHttp的线程池

      public synchronized ExecutorService executorService() {     if (executorService == null) {       executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,           new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));     }     return executorService;   }

    Dispatcher 通过单例创建了一个线程池,针对几个参数,可以发现,OkHttp的线程池具备特点:

    1. 线程数区间[0,Integer.MAX_VALUE],不保留最少线程数,随时创建更多线程 ;
    2. 当线程空闲的时候,最多保活时间为60s;
    3. 使用一个同步队列作为工作队列,先进先出;
    4. 创建一个名为“OkHttp Dispatcher” 的线程工厂 ThreadFactory 。

    线程池的处理,就到这里,下面就开始,分析OkHttp是如何使用这个线程池来进行请求任务的调度和分配的。

    2. 任务分发模型

    在我们发起一个异步请求的时候,其实是交给了Dispatcher来处理的
    RealCall.java

      @Override public void enqueue(Callback responseCallback) {     synchronized (this) {       if (executed) throw new IllegalStateException("Already Executed");       executed = true;     }     captureCallStackTrace();     //处理再这里     client.dispatcher().enqueue(new AsyncCall(responseCallback));   }

    在Dispatcher中:

      synchronized void enqueue(AsyncCall call) {     if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {       // 添加到runningAsyncCalls队列中       runningAsyncCalls.add(call);       //线程池的调用       executorService().execute(call);     } else {       //添加到准备中的队列中       readyAsyncCalls.add(call);     }   }

    上面的代码中几个重要的变量:

    1. runningAsyncCalls 储存运行中的异步请求队列
    2. readyAsyncCalls 储存准备中的异步请求队列
    3. maxRequests 最大请求数量(64个)
    4. maxRequestsPerHost 相同Host最大请求数量(5)

    当如果运行中的请求少用64个以及相同 Host的请求小于5个的时候,直接添加到runningAsyncCalls并且调用线程池来执行这个AsyncCall,交给线程池去调度。 如果已经超出了这个限制,就把这个请求添加到 readyAsyncCalls,等待调用。但是这个准备中的队列是什么时候被调用的呢?

    撸一下代码,发现在AsyncCall的execute方法里面。也就是这个异步线程的执行方法里面:

        @Override protected void execute() {       boolean signalledCallback = false;       try {         //执行真正的请求         Response response = getResponseWithInterceptorChain();          //通过CallBack 回调给用户         if (retryAndFollowUpInterceptor.isCanceled()) {           signalledCallback = true;           responseCallback.onFailure(RealCall.this, new IOException("Canceled"));         } else {           signalledCallback = true;           responseCallback.onResponse(RealCall.this, response);         }       } catch (IOException e) {         if (signalledCallback) {           // Do not signal the callback twice!           Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);         } else {           responseCallback.onFailure(RealCall.this, e);         }       } finally {         //重点在这里         client.dispatcher().finished(this);       }     }

    最后的时候调用了Dispatcher的finished,继续向下撸:

      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {     int runningCallsCount;     Runnable idleCallback;     synchronized (this) {       if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");       //重点在这里       if (promoteCalls) promoteCalls();       runningCallsCount = runningCallsCount();       idleCallback = this.idleCallback;     }      if (runningCallsCount == 0 && idleCallback != null) {       idleCallback.run();     }   }

    这里会调用一个 promoteCalls()方法:

      private void promoteCalls() {     if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.     if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {       AsyncCall call = i.next();        if (runningCallsForHost(call) < maxRequestsPerHost) {         i.remove();        // 加入到运行中的队列,并执行。         runningAsyncCalls.add(call);         executorService().execute(call);       }        if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.     }   }

    终于找到了,在请求完成的时候,调用Dispatcher的finished同时,会检查这个时候准备中的请求,是否有可以添加到运行请求中线程池中去的。

    3. 总结

    整个任务管理的流程,其实也不复杂:

    1. 通过两个请求队列,来管理请求数量以及准备中的请求的数量;
    2. 请求交一个线程池来完成。

    最后用一个图的形式总结一下,OkHttp的请求任务管理的实现:

    OKhttp源码学习(九)—— 任务管理(Dispatcher)
    分配管理

    系列:
    OKhttp源码学习(一)—— 基本请求流程
    OKhttp源码学习(二)—— OkHttpClient
    OKhttp源码学习(三)—— Request, RealCall
    OKhttp源码学习(四)—— RetryAndFollowUpInterceptor拦截器
    OKhttp源码学习(五)—— BridgeInterceptor拦截器
    OKhttp源码学习(六)—— CacheInterceptor拦截器
    OKhttp源码学习(七)—— ConnectInterceptor拦截器
    OKhttp源码学习(八)—— CallServerInterceptor拦截器



沪ICP备19023445号-2号
友情链接