多線程與線程池

前言: 目前進入的多線程的深刻學習,此次仍是在學習進階之光的一個階段php

1、什麼是進程?什麼是線程?


  相信這是許多剛學多線程的人會被問傻的一個問題,明明本身在java se的時候是學過了java多線程編寫,new一個Thread啊,Runnable接口啊之類的,可是被人問到這個問題的時候,我相信沒有去繼續專研的你90%答不出來(排除大牛)…

好了,廢話很少說,讓咱們看看什麼是進程?
  在操做系統裏:進程是操做系統結構的基礎,是程序在一個數據集合上運行的過程,是系統資源分配和調度的基本單位。

上面的描述可能很專業,我來通俗的說一下把:
  其實咱們平時控制管理器的每個內容就是一個進程(以我mac系統爲例):
進程
如圖上的微信,Chrome這些應用程序其實就是一個進程,因此它們擁有內存資源的分配和調度。因此也能夠說進程就是應用程序的實體。
  好了,那咱們知道了進程是什麼,那線程那?
什麼是線程?
  線程其實就是進程的各個內部任務,就拿微信來講事,你的聊天,我的信息,支付,全都是用各個線程來完成的,因此線程也說是操做系統的最小調度單元,一個進程裏能夠建立多個線程。
  而且線程有一個最關鍵的不一樣於進程的東西,就是線程沒有內存資源的分配,因此大大減小了資源的浪費。
那麼我再問一個問題:爲何要使用多線程呢?我開多幾個進程難道就不行嗎?
  這裏我就列出幾點線程的優點之處:java

  • 使用多線程能夠減小程序的響應時間。
  • 相比進程,線程的開銷就顯得很是小了,同時多線程在數據共享方面效率很高。
  • 多CPU或多核計算機自己就具有多線程的功能。若是隻使用單線程,那麼沒法重複利用好計算機的資源,形成資源浪費。
  • 使用多線程能夠簡化程序的結構,便於理解和維護。

就如同Activity有生命週期同樣,線程也有生命週期。線程擁有6種不一樣的狀態:web

  • New:線程被新建立時。
  • Runnable:線程可運行時(調用了start方法時)。
  • Blocked:阻塞狀態。被阻塞的線程不能活動
  • Waiting:等待狀態。線程暫時不活動,可是隨時能夠從新激活它。
  • Timed Waiting:超時等待狀態。它能夠在指定的時間自行返回。
  • Terminated: 終止狀態。表示當前線程已經執行完畢。一種是run方法完畢後正常退出;另外一種就是終止了run方法(異常或手動),致使線程終止。

2、建立線程


1.繼承Thread類,重寫run()方法

  Thread本質上也是實現了Runnable接口的一個實例
代碼以下:sql

public class TestThread extends Thread{
        @Override
        public void run() {
            super.run();
        }
    }

主方法中:數組

Thread mThread = new TestThread();
mThread.start();

2.實現Runnable接口,並實現該接口的run()方法

public class TestThread implements Runnable{

        @Override
        public void run() {

        }
}

主方法中:緩存

TestThread testThread = new TestThread();
        Thread mThread = new Thread(testThread);
        mThread.start();

3.實現Callable接口,重寫call()方法

  Callable接口實現多線程有兩種途徑:
FutureTask和Callable實現:安全

Callable<Integer> callable = new Callable<Integer>() {
            public Integer call() throws Exception {
               return "咱們要返回的內容";
            }
        };
        FutureTask<Integer> future = new FutureTask<Integer>(callable);
        new Thread(future).start();
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    //也能夠寫成
public static class MyTestCallable implements Callable<E>{
    public E call() throws Exception {
               return "咱們要返回的內容";
            }

MyTestCallable callable = new MyTestCallable();
FutureTask<E> future = new FutureTask<E>(callable);
Thread mThread = new Thread(future);
mThread.start();
try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

Future和Callable實現:(經過ExecutorService的submit方法執行Callable)微信

public static class MyTestCallable implements Callable{

        @Override
        public String call() throws Exception {
            return "咱們要返回的內容";
        }
    }

主方法中:多線程

MyTestCallable myTestCallable = new MyTestCallable();
//利用線程池實現
ExecutorService mExecutorService = Executors.newSingleThreadExecutor();
Future mFuture = mExecutorService.submit(myTestCallable);
try{
//等待線程結束,並返回結果
System.out.println(future.get());
}catch (Exception e){
            e.printStackTrace();
  }
}


3、線程中斷/終止


  咱們能夠利用interrupt()方法來中斷線程,調用Thread.currentThread().isInterrupted()方法來判斷是否終止線程。

安全的終止線程
  有兩種方式去終止線程:
thread.interrupt()方法:ide

public class StopThread{
 public static void main(String[] args)throws InterruptedException{
 MoonRunner runnable = new Thread(runnable,"MoonThread");
 thread.start();
 TimeUnit.MILLISECONDS.sleep(10);
 thread.interrupt();
 }
public static class MoonRunner implements Runnable{
   private long i;
   @Override
   public void run(){
   while(!Thread.currentThread().isInterrupted()){
   i++;
   System.out.println("i="+i);
   }
   System.out.println("stop");
  }
 }
}

採用volatile關鍵字的boolean變量判斷:

public class StopThread{
 public static void main(String[] args)throws InterruptedException{
 MoonRunner runnable = new Thread(runnable,"MoonThread");
 thread.start();
 TimeUnit.MILLISECONDS.sleep(10);
 runnable.cancel;
 }
 public static class MoonRunner implements Runnable{
   private long i;
   private volatile boolean on = true;
   @Override
   public void run(){
   while(on){
   i++;
   System.out.println("i="+i);
   }
   System.out.println("stop");
  }
  public void cancel(){
    on = false;
    }
  }
}


4、同步


  java提供了兩種同步的方式
重入鎖ReentrantLock():

Lock mLock = new ReentrantLock();
mLock.lock();
try{
...
}
finally{
mLock.unlock();
}

咱們能夠利用條件對象對重入鎖添加條件判斷阻塞線程,並放棄鎖。

public  class Alipay{
    private double[] accounts;
    private Lock alipaylock;
    private Condition condition;
    public Alipay(int n,double money){
        accounts = new double[n];
        alipaylock = new ReentrantLock();
        condition = alipaylock.newCondition();
        for(int i=0;i<accounts.length;i++){
            accounts[i] = money;
        }
    }
    public void transfer(int from,int to,int amount) throws InterruptedException{
            alipaylock.lock();
            try{

                while(accounts[from]<amount){
                    System.out.println("線程阻塞中..."+Thread.currentThread());
                    condition.await();
                }
                accounts[from] = accounts[from]-amount;
                accounts[to] = accounts[to]+amount;
                condition.signalAll();
            }finally{
                alipaylock.unlock();
            }
            System.out.println("傳出帳號:"+accounts[from]+"轉入帳號:"+accounts[to]);
        }
}

只要調用了condition.await()語句,便會阻塞當前線程,並放棄鎖
  一旦一個線程調用了await方法,它就會進入一個等待集而且處於阻塞狀態,直到另外一個線程調用了同一條件下的signalAll方法時,從新激活由於這一條件而等待的因此線程,從新判斷條件。

  好比代碼的這個例子,第一個帳戶向第二個帳戶轉入60元,但是第一個帳戶並無這麼多錢accounts[from] < amount 即是true,線程執行condition.await()便處於阻塞狀態,並放棄鎖。而當第三個帳戶向第一個帳戶帳戶轉入30元的時候,執行了condition.signalAll();第一個帳戶向第二個帳戶轉入60元的線程就從新的激活了,再次判斷accounts[from] < amount 時,第一個帳戶已經收到了第第三個帳戶轉來的錢,因此判斷爲false,執行轉帳語句。

public class ThreadTest {

    public static void main(String[] args)  {
        Alipay a = new Alipay(3,50.0);
        new Thread(new Runnable(){
            public void run(){
                try {
                    a.transfer(0, 1, 60);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable(){
            public void run(){
                try {
                    a.transfer(2, 0, 30);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }).start(); 
    }   
}

關鍵字 synchronized:

public synchronized void method(){ ... } //等價於上面的重入鎖方法

對於上面的轉帳例子,咱們也能夠用聲明synchronized實現:

public class AlipaySyn {
        private double[] accounts;
        public AlipaySyn(int n,double money){
            accounts = new double[n];
            for(int i=0;i<accounts.length;i++){
                accounts[i] = money;
            }
        }
public synchronized void transfer(int from,int to,int amount) throws InterruptedException{

                    while(accounts[from]<amount){
                        System.out.println("線程阻塞中..."+Thread.currentThread());
                        wait();
                    }
                    accounts[from] = accounts[from]-amount;
                    accounts[to] = accounts[to]+amount;
                    notifyAll();
                System.out.println("傳出帳號:"+accounts[from]+"轉入帳號:"+accounts[to]);
            }
    }

主方法中:

public class ThreadTest {

    public static void main(String[] args)  {
        AlipaySyn aSyn = new AlipaySyn(3,50.0);
        new Thread(new Runnable(){
            public void run(){
                try {
                    aSyn.transfer(0, 1, 60);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable(){
            public void run(){
                try {
                    aSyn.transfer(2, 0, 30);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }).start(); 
    }   
}

能夠對比得出結論,使用synchronized關鍵字寫代碼要簡潔不少。wait()方法至關於condition.await();notifyAll()至關於condition.signalAll()。

volatile
1.java內存模型:
java內存模型
線程A與線程B之間若要通訊,必需要經歷下面兩個步驟:

  1. 線程A把線程A本地內存中更新過的共享變量刷新到主內存中去。
  2. 線程B到主內存中去讀取線程A以前已經更新過的共享變量。

2.原子性、可見性、有序性
原子性:
  對基本數據類型變量的讀取和賦值操做是原子性的操做,即這些操做是不能夠被中斷的,要麼執行完畢,要麼不執行。
  而自增和自減就不是原子性操做。
可見性:
  當一個共享變量被volatile修飾時,它會保證修改的值馬上被更新到主內存中去,因此對其餘線程是可見的。
有序性:
  synchronized和lock能夠保證有序性;由於它們保證每一個時刻只有一個線程執行同步代碼,這至關因而讓線程順序執行同步代碼,從而保證了有序性。

3.volatile關鍵字
  volatile不保證原子性
  volatile保證有序性
使用場景:
(1) 狀態標誌

volatile boolean shutdownRequested;
...
public void shutdown(){
  shutdownRequested = true;
  }
public void doWork(){
  while(!shutdownRequested){
  ...
  }
}

(2) 雙重檢查模式

public class Singleton{
private volatile static Singleton instance =null;
public static Singleton getInstance(){
  if(instance == null){
    synchronized(Singleton.class){
      if(instance == null){
        instance = new Singleton();
        }
       }
      }
      return instance;
     }
  }


5、阻塞隊列


BlockingQueue的核心方法

放入數據:

  • offer(object):將object加到BlockingQueue裏。若是能夠容納,則返回true,不然返回false。
  • offer(E o,long timeout,TimeUnit unit): 能夠設定等待的時間。指定時間內還不能往隊列中加入BlockingQueue,則返回失敗
  • put(object): 將object加到BlockingQueue裏。

    獲取數據:

  • poll(time): 取走BlockingQueue裏排在首位的對象。若不能取出能夠等time參數規定的時間。取不到返回null。

  • poll(long timeout,TimeUnit unit): 取走BlockingQueue裏排在首位的對象。在必定時間內一旦有數據能夠取,則馬上返回隊列中的數據;不然直到時間超過尚未數據能夠取,返回false。
  • take(): 取走BlockingQueue裏排在首位的對象。若BlockingQueue爲空,則阻斷進入等待狀態,直到BlockingQueue有新數據加入。
  • drainTo(): 一次性取走BlockingQueue全部獲取的數據對象。

java中的阻塞隊列
  java中提供了7個阻塞隊列:

  • ArrayBlockingQueue: 由數組結構組成的有界阻塞隊列
  • LinkedBlockingQueue: 由鏈表結構組成的阻塞隊列(不設定長度時爲無界阻塞隊列)
  • PriorityBlockingQueue: 支持優先級的無界阻塞隊列
  • DelayQueue: 支持延時獲取元素的無界阻塞隊列,使用優先級PriorityQueue隊列來實現
  • SynchronousQueue: 不儲存元素的阻塞隊列
  • LinkedTransferQueue: 由鏈表結構組成的無界阻塞隊列
  • LinkedBlockingDeque: 由鏈表結構組成的雙向阻塞隊列


6、線程池


  能夠經過ThreadPoolExecutor來建立一個線程池
擁有最多參數的構造方法以下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
                              ...
                              }
  • corePoolSize: 核心線程數。若是當前運行的線程數少於corePoolSize,則建立新線程來處理任務。
  • maximumPoolSize: 線程池容許建立的最大線程數。若是任務隊列滿了而且線程數小於maximumPoolSize時,則線程池仍然會建立線程來處理任務。
  • keepAliveTime: 非核心線程閒置的超時時間。超過這個時間則回收。
  • TimeUnit: keepAliveTime參數的時間單位。可選的單位有天DAYS、小時HOURS、分鐘MINUTES、秒SECONDS、毫秒MILLSECONDS。
  • workQueue: 任務隊列。若是當前線程數大於corePoolSize,則將任務添加到任務隊列中。
  • ThreadFactory: 線程工廠。能夠爲每一個線程設置名字。
  • RejectedExecutionHandler: 飽和策略。默認是AbordPolicy。

    線程池的處理流程和原理咱們能夠看兩個圖便可:
    線程池的處理流程

線程池原理

線程池的種類
  線程池有經常使用的4大類,其實都是基於ThreadPoolExecutor實現的;分別是FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool。
  1.FixedThreadPool
FixedThreadPool只有核心線程,而且線程數量固定,keepAliveTime爲無效參數,採用無界阻塞隊列LinkedBlockingQueue。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

FixedThreadPool
  2.CachedThreadPool
CachedThreadPool沒有核心線程,非核心線程是無界的。keepAliveTime設置爲60秒,空閒線程等待新任務的最長時間爲60秒。採用了SynchronousQueue不儲存的元素阻塞隊列。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool
  3.SingleThreadExecutor
SingleThreadExecutor核心線程數和總線程數都爲1,其餘配置都和FixedThreadPool相同。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

SingleThreadExecutor

  4.ScheduledThreadPool
ScheduledThreadPool的corePoolSize爲固定值,採用了DelayWorkQueue,因此maximumPoolSize是無效的。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
        public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

ScheduledThreadPool

4種線程池的使用
1.FixedThreadPool

// 1. 建立定長線程池對象 & 設置線程池線程數量固定爲3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

// 2. 建立好Runnable類線程對象 & 需執行的任務
Runnable task =new Runnable(){
  public void run(){
    System.out.println("執行任務啦");
     }
    };

// 3. 向線程池提交任務:execute()
fixedThreadPool.execute(task);

// 4. 關閉線程池
fixedThreadPool.shutdown();

2.CachedThreadPool

// 1. 建立可緩存線程池對象
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

// 2. 建立好Runnable類線程對象 & 需執行的任務
Runnable task =new Runnable(){
  public void run(){
        System.out.println("執行任務啦");
            }
    };

// 3. 向線程池提交任務:execute()
cachedThreadPool.execute(task);

// 4. 關閉線程池
cachedThreadPool.shutdown();

//當執行第二個任務時第一個任務已經完成
//那麼會複用執行第一個任務的線程,而不用每次新建線程。

3.SingleThreadExecutor

// 1. 建立單線程化線程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

// 2. 建立好Runnable類線程對象 & 需執行的任務
Runnable task =new Runnable(){
  public void run(){
        System.out.println("執行任務啦");
            }
    };

// 3. 向線程池提交任務:execute()
singleThreadExecutor.execute(task);

// 4. 關閉線程池
singleThreadExecutor.shutdown();

4.ScheduledThreadPool

// 1. 建立 定時線程池對象 & 設置線程池線程數量固定爲5
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

// 2. 建立好Runnable類線程對象 & 需執行的任務
Runnable task =new Runnable(){
       public void run(){
              System.out.println("執行任務啦");
          }
    };
// 3. 向線程池提交任務:schedule()
scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延遲1s後執行任務
scheduledThreadPool.scheduleAtFixedRate(task,10,1000,TimeUnit.MILLISECONDS);
// 延遲10ms後、每隔1000ms執行任務

// 4. 關閉線程池
scheduledThreadPool.shutdown();