併發07–線程池及Executor框架

一、JAVA中的線程池

線程池的實現原理及流程如下圖所示:

 

 

   如上圖所示,當一個線程提交到線程池時(execute()或submit()),先判斷核心線程數(corePoolSize)是否已滿,如果未滿,則直接創建線程執行任務;如果已滿,則判斷隊列(BlockingQueue)是否已滿,如果未滿,則將線程添加到隊列中;如果已滿,則判斷線程池(maximumPoolSize)是否已滿,如果未滿,則創建線程池執行任務;如果線程池已滿,則交給飽和策略(RejectedExecutionHandler.rejectExcution())來處理。

  可以看下線程池ThreadPoolExecutor的全參構造函數源碼:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

對其入參釋義如下:

參數 描述 作用
coolPoolSize 線程核心線程數 當一個任務提交到線程池時,線程池會創建一個線程來執行任務,即使其他的核心線程足夠執行新任務,也會創建線程,直到需要執行的任務數大於核心線程數后才不再創建;如果線程池先調用了preStartAllCoreThread()方法,則會先啟動所有核心線程。
maximumPoolSize 線程池最大線程數 如果隊列滿了,並且已創建的線程數小於該值,則會創建新的線程執行任務。這裏需要說明一點,如果使用的隊列時無界隊列,那麼該值無用。
keepAliveTime 存活時間 當線程池中線程超過超時時間沒有新的任務進入,則停止該線程;只會停止多於核心線程數的那幾個線程。
unit 線程存活的時間單位 可以有天、小時、分鐘、秒、毫秒、微妙、納秒
workQueue 任務隊列

用於保存等待執行任務的阻塞隊列。可以選擇如下幾個隊列:數組結構的有界隊列ArrayBlockingQueue、鏈表結果的有界隊列LinkedBlockingQueue、不存儲元素的阻塞隊列SynchronousQueue、一個具有優先級的無界阻塞隊列PriortyBlockingQueue

threadFactory 創建線程的工廠

可以通過工廠給每個線程創建更有意義的名字。使用Guava提供的ThreadFactoryBuilder可以快速的給線程池裡的線程創建有意義的名字,代碼如下

new ThreadFactoryBuilder().setNameFormat(“aaaaaaaa”).build();

handler 包和策略

當隊列和線程都滿了,說明線程池處於飽和狀態,那麼必須採取一種策略來處理新提交的任務。

AbortPolicy(默認),表示無法處理新任務時拋出異常。

CallerRunsPolicy:只有調用者所在線程來運行

DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務

DiscardPolicy:不處理,直接丟棄

  上面說到,向線程池提交任務有兩種方法,分別是execute()和submit(),兩者的區別主要是execute()提交的是不需要有返回值的任務,而submit提交的是需要有返回值的任務,並且submit()會返回一個Furure對象,並且可以使用future.get()方法獲取返回值,並且get方法會阻塞,直到有返回值。

  線程池的關閉有shutdown()和shutdownNow兩個方法,他們的原理是遍歷線程池中的工作線程,然後逐個調用interrupt方法來中斷線程,所以無法中斷的線程可能永遠無法終止;但是二者也有區別,shutdownNow是將線程池的狀態設置為STOP,然後嘗試停止所有正在執行或者暫停的線程,並返回等待執行任務列表;而shutdown只是將線程池的狀態設置成SHUTDOWN,然後中斷所有沒有正在執行的任務。當調用這兩個方法中的任何一個后,isShutdown方法就會返回true,當所有任務都已經關閉后,調用isTerminaed方法會返回true。

  使用線程池時,需要從任務的性質(IO密集型還是CPU密集型或是混合型)、任務的優先級、任務的執行時常、任務的依賴性(是否依賴其他系統資源,如數據庫連接等)來綜合判斷,比如說,CPU密集型,就可以就可以配置N+1個線程個數,其中N為CPU核數,如果是IO密集型,則可以配置2*N個線程數;如果是混合型的任務,可以將其拆分成IO密集型和CPU密集型,但是如果兩個任務的執行時間相差較大,則沒有必要進行拆分;優先級不同的任務可以使用優先級隊列PriortyBlockingQueue來處理;依賴數據出等其它資源的線程池,比如說依賴數據庫,那麼就可以加大線程數量,因為在等待sql執行的時候,線程是處於空閑狀態;另外,最好使用有界隊列,因為無界隊列,因為有界隊列可以增加系統的穩定性和預警能力。

  對於線程的監控,還有以下幾個方法可以使用:

方法 描述
taskCount() 線程池需要執行的任務數量
completedTskCount 線程池運行過程中已經執行完畢的任務數量
IarestPoolSize 線程池中曾經創建過的最大線程數
getPoolSize 線程池的線程數量
getActiveCount 獲取活動的線程數

二、Exector框架

   在java中,是用線程來異步執行任務,java線程的創建與銷毀需要一定的開銷。如果我們為每一個任務創建一個線程的話,這些線程就會消耗大量的計算資源,會使處於高負荷的應用崩潰。

  在HotSpot虛擬機中,JAVA線程被一對一的映射為本地操作系統線程。JAVA線程啟動時會創建一個本地操作系統線程,當該JAVA線程終止時,這個操作系統線程也會被收回,操作系統會調用多有線程並將他們分配給可用的CPU。

 

   Executor框架的兩級調度模型如上圖所示,應用程序通過Executor控制上層的調度,而下層的調用由操作系統內核控制,將線程映射到硬件處理器上,下層的調用不受應用程序的控制。

   關於Executor的組成部分如下所示:

元素 描述
任務 包括被執行任務需要實現的接口Runnable和Callable接口
任務的執行 包括任務執行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor接口有兩個關鍵的實現類實現了ExecutorService接口:ThreadPoolExecutor和ScheduledThreadPoolExecutor
異步計算的結果 包括接口Future和實現Future接口的FurureTask類

  Executor框架使用示意圖如下:

 

 

   如上圖所示,主線程首先創建實現Runnable或Callable接口的任務對象,然後把任務對象提交給ExecutorService執行,如果使用的是submit提交,執行完畢后將返回一個實現Future接口的對象,最後,主線程可以執行FutureTask.get()方法來獲取返回值;主線程也可以調用FutureTask.cancel()方法來取消此任務的執行。

  Executor框架的成員如下:

成員 描述 子類 描述
ThreadPoolExecutor

通常使用工廠類Executors來創建,Executors可以創建三種類型的ThreadPoolExecutor

固定線程數的FixedThreadPool

適用於為了滿足資源管理的需求,而需要限制當前線程數量的應用場景,它適用於負載比較重的應用。
單一線程的SingleThreadPool 適用於需要保證順序的執行各個任務,並且在任意時間點都不會有多個線程活動的場景。
根據需要創建線程的CacheThreadPool 這是一個無界的線程池,適用於執行很多短期異步任務的小程序,或者是負載比較輕的服務器。
ScheduledThreadPoolExecutor 通常使用工廠類Executors創建,Executors可以創建兩種類型的ScheduledThreadPoolExecutor 包含若干線程的ScheduledThreadPoolExecutor 適用於需要多個後台線程執行周期任務,同時為了滿足資源管理的需求而需要限制後台線程數量的應用場景。
只包含一個線程的SingleThreadScheduledExecutor                         適用於需要單個後台線程執行周期任務,同時需要保證順序的執行各個任務的場景。                                               
ForkJoinsPool

 newWorkStealingPool適合使用在很耗時的操作,但是newWorkStealingPool不是ThreadPoolExecutor的擴展,它是新的線程池類ForkJoinPool的擴展,但是都是在統一的一個Executors類中實現,由於能夠合理的使用CPU進行對任務操作(并行操作),所以適合使用在很耗時的任務中 

   
Future Future接口和實現了該接口的FutureTask類來表示異步計算的結果    
Runnable和Callable接口

Runnable和Callable接口的實現類,都可以被ThreadPoolExecutor、ScheduledThreadPool、ForkJoinThred執行;除了可以自己實現Callable接口外,我們還可以使用工廠類Executors來把一個Runnable包裝成一個Callable

   

 

ThreadPoolExecutor詳解

1、ThreadPoolExecutor

  (1)FixedThreadPool

  構造函數如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

  構造函數中,核心線程數和最大線程數一致,keepAliveTime為0,隊列使用的是無界阻塞隊列LinkedBlockingQueue(最大值是Integer.MAX_VALUE);

  核心線程數和最大線程數保持一致,表明:如果隊列滿了之後,不會再創建新的線程;

  keepAliveTime為0,表明:如果運行線程數大於核心線程數時,如果線程執行完畢,空閑線程立刻被終止;

  使用無界阻塞隊列,表明:當運行線程到達核心線程數時,不會再創建線程,只會將任務加入阻塞隊列;因此最大線程數參數無效;因此keepAliveTime參數無效;且不會拒絕任務(既不會執行包和策略)

  (2)SingleThreadExecutor

  構造函數如下:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

  構造函數中,核心線程數和最大線程數均為1,keepAliveTime為0,隊列使用的是無界阻塞隊列LinkedBlockingQueue(最大值是Integer.MAX_VALUE)

  除了固定了核心線程數和最大線程數為1外,其餘的參數均與FixedThreadPool一致,那麼就是只有一個線程會反覆循環從阻塞隊列中獲取任務執行

  (3)CacheThreadPool

  構造函數如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

  構造函數中,核心線程數為0,最大線程數為Integer.MAX_VALUE,意味着無界,keepAliveTime為60秒,阻塞隊列使用沒有存儲空間的SynchronousQueue

  核心線程數為0,最大線程數為無界,表明:只要隊列滿了,就會創建新的線程放入線程池

  使用沒有存儲空間的SynchronousQueue表明:線程提交的速度高於線程被消費的速度,那麼線程會被不斷的創建,最終會因為線程創建過多而耗盡CPU和內存資源

2、ScheduledThreadPoolExecutor

  ScheduledThreadPoolExecutor的運行機制如下:

   (1)當調用ScheduledTreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時會向ScheduledThreadPoolExecutor的DelayQueue添加一個實現了RunnableScheduledFuture接口的ScheduledFutureTask

  (2)線程池中的線程從DelayQueue中獲取ScheduledFutureTask,然後執行任務。

  ScheduledFutureTask主要包含以下三個成員變量

成員變量 描述
long time 表示這個任務要被執行的時間
long sequenceNumber 表示該任務被添加到ScheduledThreadPoolExecutor中的序號
long period 表示任務執行的間隔周期

  DelayQueue封裝了一個PriorityQueue,當添加任務時,這個PriorityQueue會對隊列中的ScheduledFutureTask進行排序,time最小的在最前面(最先被執行),如果time一致,就比較sequenceNumber,sequenceNumber小的排在前面。

  當線程執行任務時,先從DelayQueue隊列中獲取已經到期的任務(time大於當前時間),然後執行該任務,執行完畢后,根據任務的執行周期,修改任務下次的執行時間time,並重新將任務添加到DelayQueue

 

 

  FutureTask詳解

  Future接口和實現該接口的FutureTask類,代表異步計算的結果。

  FutureTask的使用方法是將其交給Executor執行,也可以通過ExecutorService.submit()方法返回一個FutureTask,然後執行FutureTask.get()方法或FutureTask.cancel()方法,除此之外,還可以但是使用FutureTask。

  FutureTask有三種狀態:未啟動(FutureTask.run()沒有被執行之前的狀態)、已啟動(FutureTask.run()方法執行過程中)、已完成(FutureTask.run()方法執行完成或被取消),這三種狀態的流轉如下圖所示:

 

  FutureTask的實現是基於AQS(AbstractQueuedSynchrouizer)來實現的,之前已經說過,每一個基於AQS實現的同步器都會至少包含一個acquire操作和至少一個release操作。AQS被作為模板方法模式的基礎類提供給FutureTask的內部子類Sync實現了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通過這兩個方法來檢查和更新同步狀態。

  FutureTask涉及示意圖如下圖所示:

 

  如上圖所示,FutureTask.get()方法會調用AQS的acquireSharedInterruptibly(int)方法,該方法首先會回調在子類Sync中的tryAcquireShared()方法來判斷acquire操作是否成功(state狀態狀態是否為執行完成RAN或取消狀態CANCELED&runner不為null),如果成功則get()方法立刻返回,如果失敗則到線程等待隊列中去等待其他線程執行release操作;當其他線程執行release操作(比如FutureTask.run()或FutureTask.cancel())喚醒當前線程后,當前線程再次執行tryAcquireShared()將返回正值1,當前線程將離開線程等待隊列並喚醒它的後繼線程。

  Run方法執行過程如下:

  執行在構造函數中指定的任務(Callable.call()),然後以原子方式來更新狀態(調用AQS.compareAndSetState(int expect, int update),設置state的狀態為RAN),如果這個原子操作成功,就設置代表計算結果的變量result的值為Callable.call()的返回值,然後調用AQS.release(int)。

  AQS.rease首先會調用子類Sync中實現的tryReleaseShared方法來執行release操作(設置運行任務的線程為null,然後返回false),然後喚醒等待隊列中的第一個線程。

  最後調用Future.done()方法。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務