【併發隊列】阻塞隊列 LinkedBlockingQueue 源碼解析

1、概述

    LinkedBlockingQueue是基於鏈表的阻塞FIFO隊列,能夠指定一個最大的長度限制以防止過分擴展,未指定狀況下其大小爲Integer.MAX_VALUE。java

2、同步策略

    putlock只負責添加,takelock只負責刪除。這樣添加和刪除操做就能夠分開作,這樣也是其吞吐量高於ArrayBlockingQueue的緣由,而對於count則使用一個AtomicInteger來進行同步。可是對於迭代器操做和remove(Object)操做則是不一樣的,須要同時加上兩種鎖才行。對於notFull和notEmpty的條件變量這裏實際也是分開的。數組

3、可見性

讀者和寫者之間的可見性問題,當元素是隊尾元素時,獲取put鎖,而且count更新,隨後的讀者經過fullLock得到put鎖或得到take鎖來保證其對於隊尾元素的可見性。而後讀取n=count.get(),這個保證得到對第n個元素的可見性。併發

4、源碼解析

 LinkedBlockingQueue 類中定義的變量有:高併發

/** The capacity bound, or Integer.MAX_VALUE if none */  
private final int capacity;  
  
/** Current number of elements */  
private final AtomicInteger count = new AtomicInteger(0);  
  
/** Head of linked list */  
private transient Node<E> head;  
  
/** Tail of linked list */  
private transient Node<E> last;  
  
/** Lock held by take, poll, etc */  
private final ReentrantLock takeLock = new ReentrantLock();  
  
/** Wait queue for waiting takes */  
private final Condition notEmpty = takeLock.newCondition();  
  
/** Lock held by put, offer, etc */  
private final ReentrantLock putLock = new ReentrantLock();  
  
/** Wait queue for waiting puts */  
private final Condition notFull = putLock.newCondition();

該類中定義了兩個ReentrantLock鎖:putLock和takeLock,分別用於put端和take端。也就是說,生成端和消費端各自獨立擁有一把鎖,避免了讀(take)寫(put)時互相競爭鎖的狀況。性能

public boolean offer(E e)this

原理:在隊尾插入一個元素, 若是隊列沒滿,當即返回true; 若是隊列滿了,當即返回false。spa

/**
     * 在隊尾插入一個元素, 容量沒滿,能夠當即插入,返回true; 隊列滿了,直接返回false
     * 注:若是使用了限制了容量的隊列,這個方法比add()好,由於add()插入失敗就會拋出異常
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final AtomicInteger count = this.count;// 獲取隊列中的元素個數
        if (count.get() == capacity)// 隊列滿了
            return false;
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        putLock.lock();// 獲取入隊鎖
        try {
            if (count.get() < capacity) {// 容量沒滿
                enqueue(e);// 入隊
                c = count.getAndIncrement();// 容量+1,返回舊值(注意)
                if (c + 1 < capacity)// 若是添加元素後的容量,還小於指定容量(說明在插入當前元素後,至少還能夠再插一個元素)
                    notFull.signal();// 喚醒等待notFull條件的其中一個線程
            }
        } finally {
            putLock.unlock();// 釋放入隊鎖
        }
        if (c == 0)// 若是c==0,這是什麼狀況?一開始若是是個空隊列,就會是這樣的值,要注意的是,上邊的c返回的是舊值
            signalNotEmpty();
        return c >= 0;
    }


    /**
     * 建立一個節點,並加入鏈表尾部
     * @param x
     */
    private void enqueue(E x) {
        /*
         * 封裝新節點,並賦給當前的最後一個節點的下一個節點,而後在將這個節點設爲最後一個節點
         */
        last = last.next = new Node<E>(x);
    }

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//獲取出隊鎖
        try {
            notEmpty.signal();//喚醒等待notEmpty條件的線程中的一個
        } finally {
            takeLock.unlock();//釋放出隊鎖
        }
    }

public void put(E e)線程

原理:在隊尾插入一個元素,若是隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷指針

/**
     * 在隊尾插一個元素
     * 若是隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;//入隊鎖
        final AtomicInteger count = this.count;//當前隊列中的元素個數
        putLock.lockInterruptibly();//加鎖
        try {
            while (count.get() == capacity) {//若是隊列滿了 
                /*
                 * 加入notFull等待隊列,直到隊列元素不滿了,
                 * 被其餘線程使用notFull.signal()喚醒
                 */
                notFull.await();
            }
            enqueue(e);//入隊
            c = count.getAndIncrement();//入隊數量+1
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

public E take() code

原理:將隊頭元素出隊,若是隊列空了,一直阻塞,直到隊列不爲空或者線程被中斷

/**
     * 出隊:
     * 若是隊列空了,一直阻塞,直到隊列不爲空或者線程被中斷
     */
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;//獲取隊列中的元素總量
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//獲取出隊鎖
        try {
            while (count.get() == 0) {//若是沒有元素,一直阻塞
                /*
                 * 加入等待隊列, 一直等待條件notEmpty(即被其餘線程喚醒)
                 * (喚醒其實就是,有線程將一個元素入隊了,而後調用notEmpty.signal()喚醒其餘等待這個條件的線程,同時隊列也不空了)
                 */
                notEmpty.await();
            }
            x = dequeue();//出隊
            c = count.getAndDecrement();//元素數量-1
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    /**
     * 從隊列頭部移除一個節點
     */
    private E dequeue() {
        Node<E> h = head;//獲取頭節點:x==null
        Node<E> first = h.next;//將頭節點的下一個節點賦值給first
        h.next = h; // 將當前將要出隊的節點置null(爲了使其作head節點作準備)
        head = first;//將當前將要出隊的節點做爲了頭節點
        E x = first.item;//獲取出隊節點的值
        first.item = null;//將出隊節點的值置空
        return x;
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

public E poll()

原理:若是沒有元素,直接返回null;若是有元素,出隊

/**
     * 出隊: 
     * 一、若是沒有元素,直接返回null 
     * 二、若是有元素,出隊
     */
    public E poll() {
        final AtomicInteger count = this.count;// 獲取元素數量
        if (count.get() == 0)// 沒有元素
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();// 獲取出隊鎖
        try {
            if (count.get() > 0) {// 有元素
                x = dequeue();// 出隊
                // 元素個數-1(注意:該方法是一個無限循環,直到減1成功爲止,且返回舊值)
                c = count.getAndDecrement();
                if (c > 1)// 還有元素(若是舊值c==1的話,那麼經過上邊的操做以後,隊列就空了)
                    notEmpty.signal();// 喚醒等待在notEmpty隊列中的其中一條線程
            }
        } finally {
            takeLock.unlock();// 釋放出隊鎖
        }
        if (c == capacity)// c == capacity是怎麼發生的?若是隊列是一個滿隊列,注意:上邊的c返回的是舊值
            signalNotFull();
        return x;
    }

5、總結

一、具體入隊與出隊的原理圖

圖中每個節點前半部分表示封裝的數據x,後邊的表示指向的下一個引用。

1.一、初始化

 初始化以後,初始化一個數據爲null,且head和last節點都是這個節點。

1.二、入隊兩個元素事後

1.三、出隊一個元素後

表面上看,只是將頭節點的next指針指向了要刪除的x1.next,事實上這樣我覺的就徹底能夠,可是jdk其實是將原來的head節點刪除了,而上邊看到的這個head節點,正是剛剛出隊的x1節點,只是其值被置空了。

二、三種入隊對比:

  • offer(E e):若是隊列沒滿,當即返回true; 若是隊列滿了,當即返回false-->不阻塞
  • put(E e):若是隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷-->阻塞
  • offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,若是隊列已滿,則進入等待,直到出現如下三種狀況:-->阻塞
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

三、三種出隊對比:

  • poll():若是沒有元素,直接返回null;若是有元素,出隊
  • take():若是隊列空了,一直阻塞,直到隊列不爲空或者線程被中斷-->阻塞
  • poll(long timeout, TimeUnit unit):若是隊列不空,出隊;若是隊列已空且已經超時,返回null;若是隊列已空且時間未超時,則進入等待,直到出現如下三種狀況:
    • 被喚醒
    • 等待時間超時
    • 當前線程被中斷

四、ArrayBlockingQueue與LinkedBlockingQueue對比

  • ArrayBlockingQueue:
    • 一個對象數組+一把鎖+兩個條件
    • 入隊與出隊都用同一把鎖
    • 在只有入隊高併發或出隊高併發的狀況下,由於操做數組,且不須要擴容,性能很高
    • 採用了數組,必須指定大小,即容量有限
  • LinkedBlockingQueue:
    • 一個單向鏈表+兩把鎖+兩個條件
    • 兩把鎖,一把用於入隊,一把用於出隊,有效的避免了入隊與出隊時使用一把鎖帶來的競爭。
    • 在入隊與出隊都高併發的狀況下,性能比ArrayBlockingQueue高不少
    • 採用了鏈表,最大容量爲整數最大值,可看作容量無限