【併發隊列】無界阻塞優先級隊列 PriorityBlockingQueue源碼解析

1、 前言

PriorityBlockingQueue是帶優先級的無界阻塞隊列,每次出隊都返回優先級最高的元素,是二叉樹最小堆的實現,研究過數組方式存放最小堆節點的都知道,直接遍歷隊列元素是無序的。java

2、 PriorityBlockingQueue類圖結構

如圖PriorityBlockingQueue內部有個數組queue用來存放隊列元素,size用來存放隊列元素個數,allocationSpinLockOffset是用來在擴容隊列時候作cas的,目的是保證只有一個線程能夠進行擴容。算法

因爲這是一個優先級隊列因此有個比較器comparator用來比較元素大小。lock獨佔鎖對象用來控制同時只能有一個線程能夠進行入隊出隊操做。notEmpty條件變量用來實現take方法阻塞模式。這裏沒有notFull 條件變量是由於這裏的put操做是非阻塞的,爲啥要設計爲非阻塞的是由於這是無界隊列。
最後PriorityQueue q用來搞序列化的。數組

以下構造函數,默認隊列容量爲11,默認比較器爲null;併發

private static final int DEFAULT_INITIAL_CAPACITY = 11;
 
 
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
 
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
 
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

 

 

3、 offer操做

在隊列插入一個元素,因爲是無界隊列,因此一直爲成功返回true;框架

public boolean offer(E e) {
 
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
 
    //若是當前元素個數>=隊列容量,則擴容(1)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
 
 
    try {
        Comparator<? super E> cmp = comparator;
 
        //默認比較器爲null
        if (cmp == null)(2)
            siftUpComparable(n, e, array);
        else
            //自定義比較器(3)
            siftUpUsingComparator(n, e, array, cmp);
 
        //隊列元素增長1,而且激活notEmpty的條件隊列裏面的一個阻塞線程
        size = n + 1;(9)
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

 

主流程比較簡單,下面看看兩個主要函數函數

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); //must release and then re-acquire main lock
    Object[] newArray = null;
 
    //cas成功則擴容(4)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //oldGap<64則擴容新增oldcap+2,否者擴容50%,而且最大爲MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
 
    //第一個線程cas成功後,第二個線程會進入這個地方,而後第二個線程讓出cpu,儘可能讓第一個線程執行下面點獲取鎖,可是這得不到確定的保證。(5)
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();(6)
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }

tryGrow目的是擴容,這裏要思考下爲啥在擴容前要先釋放鎖,而後使用cas控制只有一個線程能夠擴容成功。個人理解是爲了性能,由於擴容時候是須要花時間的,若是這些操做時候還佔用鎖那麼其餘線程在這個時候是不能進行出隊操做的,也不能進行入隊操做,這大大下降了併發性。oop

因此在擴容前釋放鎖,這容許其餘出隊線程能夠進行出隊操做,可是因爲釋放了鎖,因此也容許在擴容時候進行入隊操做,這就會致使多個線程進行擴容會出現問題,因此這裏使用了一個spinlock用cas控制只有一個線程能夠進行擴容,失敗的線程調用Thread.yield()讓出cpu,目的意在讓擴容線程擴容後優先調用lock.lock從新獲取鎖,可是這得不到必定的保證,有可能調用Thread.yield()的線程先獲取了鎖。性能

那copy元素數據到新數組爲啥放到獲取鎖後面那?緣由應該是由於可見性問題,由於queue並無被volatile修飾。另外有可能在擴容時候進行了出隊操做,若是直接拷貝可能看到的數組元素不是最新的。而經過調用Lock後,獲取的數組則是最新的,而且在釋放鎖前 數組內容不會變化。ui

具體建堆算法:this

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
 
    //隊列元素個數>0則判斷插入位置,否者直接入隊(7)
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;(8)
}


第一次offer(2)時候下面用圖說話模擬下過程:
假設隊列容量爲2

執行(1)爲false因此執行(2),因爲k=n=size=0;因此執行(8)元素入隊,然執行(9)size+1;
如今隊列狀態:

  • 第二次offer(4)時候

執行(1)爲false,因此執行(2)因爲k=1,因此進入while循環,parent=0;e=2;key=4;key>e因此break;而後把4存到數據下標爲1的地方,這時候隊列狀態爲:

  • 第三次offer(6)時候

執行(1)爲true,因此調用tryGrow,因爲2<64因此newCap=2 + (2+2)=6;而後建立新數組並拷貝,而後調用siftUpComparable;k=2>0進入循環 parent=0;e=2;key=6;key>e因此break;而後把6放入下標爲2的地方,如今隊列狀態:

  • 第四次offer(1)時候

執行(1)爲false,因此執行(2)因爲k=3,因此進入while循環,parent=0;e=2;key=1; key<e;因此把2複製到數組下標爲3的地方,而後k=0退出循環;而後把2存放到下標爲0地方,如今狀態:

4、 poll操做

在隊列頭部獲取並移除一個元素,若是隊列爲空,則返回null

 

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

主要看dequeue

private E dequeue() {
 
    //隊列爲空,則返回null
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        //獲取隊頭元素(1)
        Object[] array = queue;
        E result = (E) array[0];
 
        //獲取對尾元素,並值null(2)
        E x = (E) array[n];
        array[n] = null;
 
        Comparator<? super E> cmp = comparator;
        if (cmp == null)//cmp=null則調用這個,把對尾元素位置插入到0位置,而且調整堆爲最小堆(3)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;(4)
        return result;
    }
}

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                            int n) {
     if (n > 0) {
         Comparable<? super T> key = (Comparable<? super T>)x;
         int half = n >>> 1;           // loop while a non-leaf
         while (k < half) {
             int child = (k << 1) + 1; // assume left child is least
             Object c = array[child];(5)
             int right = child + 1;(6)
             if (right < n &&
                 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
                 c = array[child = right];
             if (key.compareTo((T) c) <= 0)(8)
                 break;
             array[k] = c;
             k = child;
         }
         array[k] = key;(9)
     }
 }


第一次調用poll()下面用圖說話模擬下過程:

首先執行(1) result=1;而後執行(2)x=2;這時候隊列狀態

而後執行(3)後狀態爲:

執行(4)後的結果:

下面重點說說siftDownComparable這個屌屌的創建最小堆的算法:

首先說下思想,其中k一開始爲0,x爲數組裏面最後一個元素,因爲第0個元素爲樹根,被出隊時候要被搞掉,因此建堆要從它的左右孩子節點找一個最小的值來當樹根,子樹根被搞掉後,會找子樹的左右孩子最小的元素來代替,直到樹節點爲止,還不明白,不要緊,看圖說話:
假如當前隊列元素:

那麼對於樹爲:

這時候若是調用了poll();那麼result=2;x=11;如今樹爲:

而後看leftChildVal = 4;rightChildVal = 6; 4<6;因此c=4;也就是獲取根節點的左右孩子值小的那一個; 而後看11>4也就是key>c;而後把c放入樹根,如今樹爲:

而後看根的左邊孩子4爲根的子樹咱們要爲這個字樹找一個根節點

看leftChildVal = 8;rightChildVal = 10; 8<10;因此c=8;也就是獲取根節點的左右孩子值小的那一個; 而後看11>8也就是key>c;而後把c放入樹根,如今樹爲:

這時候k=3;half=3因此推出循環,執行(9)後結果爲:

這時候隊列爲:

5、 put操做

內部調用的offer,因爲是無界隊列,因此不須要阻塞

public void put(E e) {
    offer(e); // never need to block
}


獲取隊列頭元素,若是隊列爲空則阻塞。

6、 take操做

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
 
        //若是隊列爲空,則阻塞,把當前線程放入notEmpty的條件隊列
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}


7、 size操做

這裏是阻塞實現,阻塞後直到入隊操做調用notEmpty.signal 纔會返回。

獲取隊列元個數,因爲加了獨佔鎖因此返回結果是精確的

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

8、 開源框架中使用

9、總結

PriorityBlockingQueue相似於ArrayBlockingQueue內部使用一個獨佔鎖來控制同時只有一個線程能夠進行入隊和出隊,另外前者只使用了一個notEmpty條件變量而沒有notFull這是由於前者是無界隊列,當put時候永遠不會處於await因此也不須要被喚醒。

PriorityBlockingQueue始終保證出隊的元素是優先級最高的元素,而且能夠定製優先級的規則,內部經過使用一個二叉樹最小堆算法來維護內部數組,這個數組是可擴容的,噹噹前元素個數>=最大容量時候會經過算法擴容。

值得注意的是爲了不在擴容操做時候其餘線程不能進行出隊操做,實現上使用了先釋放鎖,而後經過cas保證同時只有一個線程能夠擴容成功。