Java多線程:線程池

1、 背景

線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,合理的使用線程池能夠對線程進行統一的分配、調優和監控,並有如下好處:
    第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
    第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。
    第三:提升線程的可管理性。
 

2、線程池的架構

3、Executors

    用於建立線程池, 包含五種建立線程池的方法

newFixedThreadPool(固定大小線程池)

   
    初始化一個定長線程數的線程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene做爲阻塞隊列,不過當線程池沒有可執行任務時,也不會釋放線程,超出的線程會在隊列中等待
 
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
 

newCachedThreadPool(無界線程池,能夠進行自動線程回收)

    一、初始化一個能夠緩存線程的線程池,默認緩存60s,使用SynchronousQueue做爲阻塞隊列;
    二、newCachedThreadPool在沒有任務執行時,當線程的空閒時間超過keepAliveTime,會自動釋放線程資源,當提交新任務時,若是沒有空閒線程,則建立新線程執行任務,會致使必定的系統開銷;
因此,該方法返回的線程池是沒有線程上限的,在使用時必定要小心,由於沒有辦法控制整體的線程數量,而每一個線程都是消耗內存的,這可能會致使過多的內存被佔用。建議儘可能不要用這個方法返回的線程池,而要使用有上限的線程池
 
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 
 
 

newSingleThreadExecutor(單個後臺線程)

   
    初始化的線程池中只有一個線程,若是該線程異常結束,會從新建立一個新的線程繼續執行任務,惟一的線程能夠保證所提交任務的順序執行,內部使用LinkedBlockingQueue做爲阻塞隊列。
 
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 
 
 

newScheduledThreadPool

  建立一個固定長度的線程池,並且以延遲或定時的方式來執行任務。
 
經過如上配置的線程池的建立方法源代碼,咱們能夠發現:
   1. 除了CachedThreadPool使用的是直接提交策略的緩衝隊列之外,其他兩個採用的都是無界緩衝隊列
   2. 三個線程池採用的ThreadPoolExecutor構造方法都是同一個,使用的都是默認的ThreadFactory和handler:
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
 

4、ExecutorService任務週期管理接口

    Executor的實現一般都會建立線程來執行任務,可是使用異步方式來執行任務時,因爲以前提交任務的狀態不是當即可見的,因此若是要關閉應用程序時,就須要將受影響的任務狀態反饋給應用程序。
    爲了解決執行服務的生命週期問題,Executor擴展了EecutorService接口,添加了一些用於生命週期管理的方法。以下:
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 省略部分方法 
}

submit() 與execute()區別數據庫

 

一、接收的參數不同 緩存

  submit()能夠接受runnable無返回值和callable有返回值 
  execute()接受runnable 無返回值多線程

 

二、submit有返回值,而execute沒有架構

 

  Method submit extends base method Executor.execute by creating and returning a Future that can be used to cancel execution and/or wait for completion.app

 

  用到返回值的例子,好比說我有不少個作validation的task,我但願全部的task執行完,而後每一個task告訴我它的執行結果,是成功仍是失敗,若是是失敗,緣由是什麼。dom

 

三、submit方便Exception處理異步

 

  There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don’t have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task’s return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.ide

 

  意思就是若是你在你的task裏會拋出checked或者unchecked exception,而你又但願外面的調用者可以感知這些exception並作出及時的處理,那麼就須要用到submit,經過捕獲Future.get拋出的異常。this

public class ExecutorServiceTest {  
    public static void main(String[] args) {  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        List<Future<String>> resultList = new ArrayList<Future<String>>();  

        // 建立10個任務並執行  
        for (int i = 0; i < 10; i++) {  
            // 使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中  
            Future<String> future = executorService.submit(new TaskWithResult(i));  
            // 將任務執行結果存儲到List中  
            resultList.add(future);  
        }  
        executorService.shutdown();  

        // 遍歷任務的結果  
        for (Future<String> fs : resultList) {  
            try {  
                System.out.println(fs.get()); // 打印各個線程(任務)執行的結果  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (ExecutionException e) {                    
                e.printStackTrace();    
            }  finally {
        executorService.shutdownNow();
       }
        }  
    }  
}  

class TaskWithResult implements Callable<String> {  
    private int id;  

    public TaskWithResult(int id) {  
        this.id = id;  
    }  

    /** 
     * 任務的具體過程,一旦任務傳給ExecutorService的submit方法,則該方法自動在一個線程上執行。 
     *  
     * @return 
     * @throws Exception 
     */  
    public String call() throws Exception {  
        System.out.println("call()方法被自動調用,幹活!!!             " + Thread.currentThread().getName());  
        if (new Random().nextBoolean())  
            throw new TaskException("Meet error in task." + Thread.currentThread().getName());  
        // 一個模擬耗時的操做  
        for (int i = 999999999; i > 0; i--)  
            ;  
        return "call()方法被自動調用,任務的結果是:" + id + "    " + Thread.currentThread().getName();  
    }  
}  

class TaskException extends Exception {  
    public TaskException(String message) {  
        super(message);  
    }  
}

 

 

5、ThreadPoolExecutor提交任務流程

線程池的主要工做流程以下圖:
 
 
 
類中定義的重要變量,以下:
 
  1. private final BlockingQueue<Runnable> workQueue;              // 阻塞隊列 
  2. private final ReentrantLock mainLock = new ReentrantLock();   // 互斥鎖 
  3. private final HashSet<Worker> workers = new HashSet<Worker>();// 線程集合.一個Worker對應一個線程 
  4. private final Condition termination = mainLock.newCondition();// 終止條件 
  5. private int largestPoolSize;           // 線程池中線程數量曾經達到過的最大值。 
  6. private long completedTaskCount;       // 已完成任務數量 
  7. private volatile ThreadFactory threadFactory;     // ThreadFactory對象,用於建立線程。 
  8. private volatile RejectedExecutionHandler handler;// 拒絕策略的處理句柄 
  9. private volatile long keepAliveTime;   // 線程池維護線程所容許的空閒時間 
  10. private volatile boolean allowCoreThreadTimeOut; 11. private volatile int corePoolSize;     // 線程池維護線程的最小數量,哪怕是空閒的 
  12. private volatile int maximumPoolSize;  // 線程池維護的最大線程數量

 

其中有幾個重要的規則須要說明一下:

一、 corePoolSize與maximumPoolSize  

   線程池將根據 corePoolSize和 maximumPoolSize設置的邊界自動調整池大小,當新任務在方法 execute() 中提交時:
  • 若是當前線程池中的線程數目<corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
  • 若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;當隊列滿時才建立新線程去處理請求;
  • 若是當前線程池中的線程數目達到maximumPoolSize,即隊列已經滿了,則經過handler所指定的任務拒絕策略來處理新請求;
  • 若是線程池中的線程數量大於corePoolSize時,而且某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;
 
也就是說,處理任務的優先級爲: 
  • 1. 核心線程corePoolSize > 任務隊列workQueue > 最大線程maximumPoolSize,若是三者都滿了,使用handler處理被拒絕的任務。
  • 2. 當池中的線程數大於corePoolSize的時候,多餘的線程會等待keepAliveTime長的時間,若是無請求可處理就自行銷燬。
 

二、 workQueue 

線程池所使用的緩衝隊列,該緩衝隊列的長度決定了可以緩衝的最大數量,緩衝隊列有三種通用策略:
   1) 直接提交。SynchronousQueue,它將任務直接提交給線程執行而不保存它們。在此,若是不存在可用於當即運行任務的線程,則試圖把任務加入隊列將失敗,所以會構造一個新的線程。此策略能夠避免在處理可能具備內部依賴性的請求集時出現鎖。直接提交一般要求無界 maximumPoolSizes 以免拒絕新提交的任務。
   2) 無界隊列。使用無界隊列將致使在全部 corePoolSize 線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。(所以,maximumPoolSize 的值也就無效了。)當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列;
   3) 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O 邊界),則系統可能超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU 使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量.

三、ThreadFactory  

    使用 ThreadFactory 建立新線程。經過提供不一樣的 ThreadFactory,能夠改變線程的名稱、線程組、優先級、守護進程狀態等等。若是從 newThread 返回 null 時 ThreadFactory 未能建立線程,則執行程序將繼續運行,但不能執行任何任務。
public interface ThreadFactory {  
    Thread newThread(Runnable r);  
}
  
而構造方法中的threadFactory對象,是經過 Executors.defaultThreadFactory()返回的。 

四、RejectedExecutionHandler   

    當Executor已經關閉(即執行了executorService.shutdown()方法後),而且Executor將有限邊界用於最大線程和工做隊列容量,且已經飽和時,在方法execute()中提交的新任務將被拒絕.
   在以上述狀況下,execute 方法將調用RejectedExecutionHandler.rejectedExecution() 方法。
下面提供了四種預約義的處理程序策略:
       1) AbortPolicy            直接拋出異常 RejectedExecutionException;
    2) CallerRunsPolicy       用調用者所在的線程來執行任務
    3) DiscardPolicy          不能執行的任務將被刪除;
    4) DiscardOldestPolicy    若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後重試執行程序(若是再次失敗,則重複此過程)。

五、keepAliveTime

線程空閒時的存活時間,即當線程沒有任務執行時,繼續存活的時間;默認狀況下,該參數只在線程數大於corePoolSize時纔有用;

6、線程池的關閉

    經過調用線程池的shutdown或shutdownNow方法來關閉線程池,可是它們的實現原理不一樣:
    shutdown只是將線程池的狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程。
    shutdownNow是遍歷線程池中的工做線程,而後逐個調用線程的interrupt方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。shutdownNow會首先將線程池的狀態設置成STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表。
    只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當全部的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於咱們應該調用哪種方法來關閉線程池,應該由提交到線程池的任務特性決定,一般調用shutdown來關閉線程池,若是任務不必定須要執行完,則能夠調用shutdownNow。
 

7、線程池的配置

能夠從如下幾個角度來進行分析:
    1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
    2. 任務的優先級:高,中和低。
    3. 任務的執行時間:長,中和短。
    4. 任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。
    CPU密集型任務:配置儘量少的線程數量,如配置Ncpu+1個線程的線程池。
    IO密集型任務:因爲須要等待IO操做,線程並非一直在執行任務,則配置儘量多的線程,如2*Ncpu。
    混合型的任務:若是能夠拆分,則將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,若是這兩個任務執行時間相差太大,則不必進行分解。
    咱們能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。
    優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先獲得執行,須要注意的是若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。
    執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者也可使用優先級隊列,讓執行時間短的任務先執行。
    依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,若是等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。
    建議使用有界隊列,有界隊列能增長系統的穩定性和預警能力,能夠根據須要設大一點