Java多線程 - 線程池

在咱們開發中常常會使用到多線程,好比在Android中,網絡請求或一些耗時操做必須放在子線程中運行,每每會經過Thread開啓一個子線程去執行耗時操做,待子線程執行完畢後再經過Handler切換到主線程中運行;若是N屢次建立子線程,就沒法管理所建立的子線程,它們相互之間競爭,頗有可能因爲佔用過多資源而致使死機或者OOM。因此java提供了線程池來管理咱們建立的線程。html

線程池的優點:java

1. 下降資源消耗。經過重複利用已經建立的線程下降線程的建立與銷燬形成的消耗。數組

2. 提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。緩存

3. 提升線程的可管理性。線程是稀缺資源,若是無限制第建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。網絡

ThreadPoolExecutor

能夠經過ThreadPoolExecutor來建立一個線程池,ThreadPoolExecutor類一共用4個構造函數。多線程

ExecutorService threadPool = new ThreadPoolExecutor( …… );

下面是擁有最多參數的構造函數:less

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
……
}

1. corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會建立ide

一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,等到須要執行的任務數大於線程池基本大小時就再也不建立。若是調用了線程池的prestartAllCoreThreads方法,線程池會提早建立並啓動全部基本線程。函數

2. maximumPoolSize(線程池最大數量):線程池容許建立的最大線程數。若是隊列滿了,而且建立的線程數小於最大線程數,則線程池會再建立新的執行任務。this

3. keepAliveTime(線程活動保持時間):線程池的工做線程空閒後,保持存活的時間。因此,若是任務不少,而且每一個任務執行的時間比較短,能夠調大時間,提升線程的利用率。

4. TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒等

5. workQueue(線程池中保存等待執行任務的阻塞隊列):經過線程池中的execute方法提交的Runable對象會存儲在該隊列中。咱們能夠選擇下面幾個阻塞隊列:

 

阻塞隊列 說明
ArrayBlockingQueue 基於數組實現的有界阻塞隊列,該隊列按照FIFO(先進先出)原則對隊列中的元素進行排序
LinkedBlockingQueue       基於鏈表實現的阻塞隊列,該隊列按照FIFO(先進先出)原則對隊列中的元素進行排序。
SyschronousQueue 內部沒有任何容量的阻塞隊列。在它內部沒有任何的緩存空間,對於該隊列中的數據元素只有當咱們試着取走的時候纔可能存在
PriorityBlockingQueue 具備優先級的無限阻塞隊列

能夠經過實現BlockingQueue接口來自定義所需的阻塞隊列。

6. ThreadFactory(線程工廠,爲線程池提供新線程的建立):能夠給每一個建立出來的線程設置名字。ThreadFactory是一個接口,裏面只有一個newThread方法。

7. RejectedExecutionHandler(飽和策略):RejectedExecutionHandler是一個接口,裏面只有rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法。當任務隊列已滿而且線程池中的活動線程已經達到所限定的最大值或者沒法成功執行任務,這時候ThreadPoolExecutor 會調用RejectedExecutionHandler中的rejectedExecution方法。在線程池中默認是AbortPolicy,在沒法處理新任務是會拋出RejectedExecutionException異常。此外還有3中策略,分別是:

可選值 說明
CallerRunsPolicy 用調用者所在的線程來處理任務。此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度
DiscardPolicy 不能執行的任務,並將該任務刪除
DiscardOldestPolicy 丟棄隊列最近的任務,並執行當前任務

ThreadPoolExecutor的使用

1. 建立:一個擁有5個核心線程,最大能容入10個線程的線程池

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,
				10, TimeUnit.SECONDS, new ArrayBlockingQueue(10));

2. 線程池的任務提交有兩種方式:execute方法和submit方法。就是將線程放入線程池

execute方法:沒有返回值,沒法判斷任務是否被線程池執行成功。

executor.execute(new Runnable() {			
	@Override
	public void run() {
		System.out.println("線程:"+ Thread.currentThread().getName());
					
    }
});

 

submit方法:使用submit方法提交任務,會返回一個Future(接口)對象,能夠經過該對象來判斷任務是否執行成功,經過還能夠改對象的get方法來獲取返回值。若是子線程任務沒有執行完畢,會一直阻塞在get無參方法中直到執行完成。若是使用getget(long timeout, TimeUnit unit)方法則會阻塞等待所設的時間值,這時可能子線程中任務還沒執行完畢。關於Future的介紹點擊打開連接

 

final int result=i;
Future<Integer> future = executor.submit(new Callable<Integer>() {

	@Override
	public Integer call() throws Exception {
		System.out.println("線程:"+Thread.currentThread().getName());
			return result;
		}
				
});
						
try {
    Integer integer = future.get();
    System.out.println(integer);
} catch (InterruptedException e) {
	e.printStackTrace();
} catch (ExecutionException e) {
	e.printStackTrace();
}

關於sbumit(Callable)源碼

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

 

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

從源碼上可看出,把Callable實例做爲參數傳入,生成FutureTask對象,而後把這個對象當成一個Runnable,做爲參數另起線程。
 

3. 線程池的關閉

關閉線程池提供了兩個方法:shutdown()shutdownNow()

shutdown:將線程池狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程

shutdownNow:將線程池的狀態設置成STOP狀態,而後中斷全部任務(包括正在執行的線程)的線程,而且返回執行任務列表。

例子:

public class ThreadPoolTest {

	public static void main(String[] args) {

		ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10,
				TimeUnit.SECONDS, new ArrayBlockingQueue(10));

		
		for (int i = 0; i < 10; i++) {
			//提交任務方法一:execute方法
			executor.execute(new Runnable() {

				@Override
				public void run() {
					System.out
							.println("線程:" + Thread.currentThread().getName());

				}
			});

			//提交任務方法一:submit方法
			final int result = i;
			Future<Integer> future = executor.submit(new Callable<Integer>() {

				@Override
				public Integer call() throws Exception {
					System.out
							.println("線程:" + Thread.currentThread().getName());
					return result;
				}

			});

			try {
				Integer integer = future.get();
				System.out.println(integer);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}

		}
		executor.shutdown();

	}

}

execute方法的執行結果:

線程:pool-1-thread-1
線程:pool-1-thread-2
線程:pool-1-thread-2
線程:pool-1-thread-2
線程:pool-1-thread-2
線程:pool-1-thread-2
線程:pool-1-thread-2
線程:pool-1-thread-3
線程:pool-1-thread-5
線程:pool-1-thread-4

submit方法的執行結果:

線程:pool-1-thread-1
0
線程:pool-1-thread-2
1
線程:pool-1-thread-3
2
線程:pool-1-thread-4
3
線程:pool-1-thread-5
4
線程:pool-1-thread-1
5
線程:pool-1-thread-2
6
線程:pool-1-thread-3
7
線程:pool-1-thread-4
8
線程:pool-1-thread-5
9

線程池的執行流程

1. 提交任務後,首先會判斷核心線程數是否已滿,若是在線程池中的線程數量沒有達到核心線程數,這時會建立核心線程來執行任務;不然,進入下一步操做。

2. 接着判斷線程池任務隊列是否已滿。若是沒滿,則將任務添加到任務隊列中;不然,進入下一步操做。

3. 接着判斷線程池中的線程數是否達到最大線程數(執行這步的前提是任務隊列已滿了)。若是未達到,則建立非核心線程執行任務;不然,就執行飽和策略,默認會拋出RejectedExecutionException異常。

線程類種類

java提供了常見四種線程池,分別能夠經過Executors類獲取:

FixedThreadPool、CachedThreadPool、ScheduledThreadPool、SingleThreadExecutor。

FixedThreadPool

經過Executors中的newFixedThreadPool方法來建立,該線程池是一種線程數量固定的線程池。

ExecutorService service = Executors.newFixedThreadPool(4);

 

在這個線程池中 所容納最大的線程數就是咱們設置的核心線程數。 若是線程池的線程處於空閒狀態的話,它們並不會被回收,除非是這個線程池被關閉。若是全部的線程都處於活動狀態的話,新任務就會處於等待狀態,直到有線程空閒出來。

因爲newFixedThreadPool只有核心線程,而且這些線程都不會被回收,也就是 它可以更快速的響應外界請求 。從下面的newFixedThreadPool方法的實現能夠看出,newFixedThreadPool只有核心線程,而且不存在超時機制,採用LinkedBlockingQueue,因此對於任務隊列的大小也是沒有限制的。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
}

CachedThreadPool

經過Executors中的newCachedThreadPool方法來建立。

ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

經過newCachedThreadPool方法在這裏咱們能夠看出它的 核心線程數爲0, 線程池的最大線程數Integer.MAX_VALUE。而Integer.MAX_VALUE是一個很大的數,也差很少能夠說 這個線程池中的最大線程數能夠任意大。當線程池中的線程都處於活動狀態的時候,線程池就會建立一個新的線程來處理任務。該線程池中的線程超時時長爲60秒,因此當線程處於閒置狀態超過60秒的時候便會被回收。 這也就意味着如果整個線程池的線程都處於閒置狀態超過60秒之後,在newCachedThreadPool線程池中是不存在任何線程的,因此這時候它幾乎不佔用任何的系統資源。

對於newCachedThreadPool他的任務隊列採用的是SynchronousQueue,上面說到在SynchronousQueue內部沒有任何容量的阻塞隊列。SynchronousQueue內部至關於一個空集合,咱們沒法將一個任務插入到SynchronousQueue中。因此說在線程池中若是現有線程沒法接收任務,將會建立新的線程來執行任務。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());
}

 

ScheduledThreadPool

 

經過Executors中的newScheduledThreadPool方法來建立。

ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(4);

它的核心線程數是固定的,對於非核心線程幾乎能夠說是沒有限制的,而且當非核心線程處於限制狀態的時候就會當即被回收。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

SingleThreadExecutor

經過Executors中的newSingleThreadExecutor方法來建立

ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();

在這個線程池中只有一個核心線程,對於任務隊列沒有大小限制,也就意味着這一個任務處於活動狀態時,其餘任務都會在任務隊列中排隊等候依次執行

newSingleThreadExecutor將全部的外界任務統一到一個線程中支持,因此在這個任務執行之間咱們不須要處理線程同步的問題。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>()));
}