【併發隊列】阻塞隊列 SynchronousQueue源碼解析

    SynchronousQueue是一種特殊的阻塞隊列,不一樣於LinkedBlockingQueue、ArrayBlockingQueue和PriorityBlockingQueue,其內部沒有任何容量,任何的入隊操做都須要等待其餘線程的出隊操做,反之亦然。若是將SynchronousQueue用於生產者/消費者模式,那麼至關於生產者和消費者手遞手交易,即生產者生產出一個貨物,則必須等到消費者過來取貨,方可完成交易。 
SynchronousQueue有一個fair選項,若是fair爲true,稱爲fair模式,不然就是unfair模式。fair模式使用一個先進先出的隊列保存生產者或者消費者線程,unfair模式則使用一個後進先出的棧保存。java

基本原理

SynchronousQueue經過將入隊出隊的線程綁定到隊列的節點上,並藉助LockSupport的park()和unpark()實現等待,先到達的線程A需調用LockSupport的park()方法將當前線程進入阻塞狀態,知道另外一個與之匹配的線程B調用LockSupport.unpark(Thread)來喚醒在該節點上等待的線程A。 
基本邏輯:node

  1. 初始狀態隊列爲null
  2. 當一個線程到達,若是隊列爲null,無與之匹配的線程,則進入隊列等待;隊列不爲null,參考3
  3. 當另外一個線程到達,若是隊列不爲null,則判斷隊列中的第一個元素(針對fair和unfair不一樣)是否與其匹配,若是匹配則完成交易,不匹配則也入隊;隊列爲null,參考2

經常使用方法解析

在深刻分析其實現機制以前,咱們先了解對於SynchronousQueue可執行哪些操做,因爲SynchronousQueue的容量爲0,因此一些針對集合的操做,如:isEmpty()/size()/clear()/remove(Object)/contains(Object)等操做都是無心義的,一樣peek()也老是返回null。因此針對SynchronousQueue只有兩類操做:安全

  • 入隊(put(E)/offer(E, long, TimeUnit)/offer(E))
  • 出隊(take()/poll(long, TimeUnit)/poll())

這兩類操做內部都是調用Transferer的transfer(Object, boolean, long)方法,經過第一個參數是否爲null,來區分是生產者仍是消費者(生產者不爲null)。 
針對以上狀況,咱們將着重分析Transferer的transfer(Object, boolean, long)方法,這裏因爲兩種不一樣的公平模式,會存在兩個Transferer的派生類:併發

public SynchronousQueue(boolean fair) {
    transferer = (fair)? new TransferQueue() : new TransferStack();
}

可見fair模式使用TransferQueue,unfair模式使用TransferStack,下面咱們將分別對這兩種模式進行着重分析。app

fair模式

fair模式使用一個FIFO的隊列保存線程,TransferQueue的結構以下:oop

/** Dual Queue */
static final class TransferQueue extends Transferer {
    /** Node class for TransferQueue. */
    static final class QNode {
        volatile QNode next;          // next node in queue
        volatile Object item;         // CAS'ed to or from null
        volatile Thread waiter;       // to control park/unpark
        final boolean isData;

        QNode(Object item, boolean isData) {
            this.item = item;
            this.isData = isData;
        }

        ...
    }

    /** Head of queue */
    transient volatile QNode head;
    /** Tail of queue */
    transient volatile QNode tail;
    /**
     * Reference to a cancelled node that might not yet have been
     * unlinked from queue because it was the last inserted node
     * when it cancelled.
     */
    transient volatile QNode cleanMe;

    TransferQueue() {
        QNode h = new QNode(null, false); // initialize to dummy node.
        head = h;
        tail = h;
    }

    ...
}

以上是TransferQueue的大體結構,能夠看到TransferQueue同一個普通的隊列,同時存在一個指向隊列頭部的指針——head,和一個指向隊列尾部的指針——tail;cleanMe的存在主要是解決不可清楚隊列的尾節點的問題,後面會介紹到;隊列的節點經過內部類QNode封裝,QNode包含四個變量:this

  • next:指向隊列中的下一個節點
  • item:節點包含的數據
  • waiter:等待在該節點上的線程
  • isData:表示該節點由生產者建立仍是由消費者建立,因爲生產者是放入數據,因此isData==true,而消費者==false

其餘的內容就是一些CAS變量以及操做,下面主要分析TransferQueue的三個重要方法:transfer(Object, boolean, long)、awaitFulfill(QNode, Object, boolean, long)、clean(QNode, QNode)。這三個方法是TransferQueue的核心,入口是transfer(),下面具體看代碼。atom

transfer

/**
 * @By Vicky:交換數據,生產者和消費者經過e==null來區分
 */
Object transfer(Object e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    int mode = (e == null)? REQUEST : DATA;// 根據e==null判斷生產者仍是消費者,對應不一樣的mode值

    for (;;) {
        SNode h = head;
        // 棧爲null或者棧頂元素的模式同當前模式,則進行入棧操做
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 不等待,則直接返回null,返回以前順帶清理下被取消的元素
            if (timed && nanos <= 0) {      // can't wait
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {// 入棧,更新棧頂爲新節點
                // 等待,返回值m==s,則被取消,需清除
                SNode m = awaitFulfill(s, timed, nanos);
                // m==s說明s被取消了,清除
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                // 幫忙出棧
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                // 消費者則返回生產者的數據,生產者則返回本身的數據
                return mode == REQUEST? m.item : s.item;
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill   // 棧頂未開始匹配,則開始匹配
            // h被取消,則出棧
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            // 更新棧頂爲新插入的節點,並更新節點的mode爲FULFILLING,對應判斷是否正在出棧的方法
            // 匹配須要先將待匹配的節點入棧,因此不論是匹配仍是不匹配都須要建立一個節點入棧
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 循環直到找到一個能夠匹配的節點
                for (;;) { // loop until matched or waiters disappear
                    // m即與s匹配的節點
                    SNode m = s.next;       // m is s's match
                    // m==null說明棧s以後無元素了,直接將棧頂設置爲null,並從新進行最外層的循環
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    // 將s設置爲m的匹配節點,並更新棧頂爲m.next,即將s和m同時出棧
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (mode == REQUEST)? m.item : s.item;
                    } else                  // lost match
                        // 設置匹配失敗,則說明m正準備出棧,幫助出棧
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller // 棧頂已開始匹配,幫助匹配
            // 此處的操做邏輯同上面的操做邏輯一致,目的就是幫助上面進行操做,由於此處完成匹配須要分紅兩步:
            // a.m.tryMatch(s)和b.casHead(s, mn)
            // 因此必然會插入其餘線程,只要插入的線程也按照這個步驟執行那麼就避免了不一致問題
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

從上面的代碼能夠看出TransferQueue.transfer()的總體流程:spa

  1. 判斷當前隊列是否爲null或者隊尾線程是否與當前線程匹配,爲null或者不匹配都將進行入隊操做
  2. 入隊主要很簡單,分紅兩步:修改tail的next爲新的節點,修改tail爲新的節點,這兩步操做有可能分在兩個不一樣的線程執行,不過不影響執行結果
  3. 入隊以後須要將當前線程阻塞,調用LockSupport.park()方法,直到打斷/超時/被匹配的線程喚醒
  4. 若是被取消,則須要調用clean()方法進行清除
  5. 因爲FIFO,因此匹配老是發生在隊列的頭部,匹配將修改等待節點的item屬性傳遞數據,同時喚醒等待在節點上的線程

awaitFulfill

下面看看具體如何讓一個線程進入阻塞。線程

/**
 *@ By Vicky:等待匹配,該方法會進入阻塞,直到三種狀況下才返回:
 *  a.等待被取消了,返回值爲s
 *  b.匹配上了,返回另外一個線程傳過來的值
 *  c.線程被打斷,會取消,返回值爲s
 */
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
    // timed==false,則不等待,lastTime==0便可
    long lastTime = (timed)? System.nanoTime() : 0;
    // 當前線程
    Thread w = Thread.currentThread();
    // 循環次數,原理同自旋鎖,若是不是隊列的第一個元素則不自旋,由於壓根輪不上他,自旋只是浪費CPU
    // 若是等待的話則自旋的次數少些,不等待就多些
    int spins = ((head.next == s) ?
                 (timed? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())// 支持打斷
            s.tryCancel(e);
        // 若是s的item不等於e,有三種狀況:
        // a.等待被取消了,此時x==s
        // b.匹配上了,此時x==另外一個線程傳過來的值
        // c.線程被打斷,會取消,此時x==s
        // 不論是哪一種狀況都不要再等待了,返回便可
        Object x = s.item;
        if (x != e)
            return x;
        // 等到,直接超時取消
        if (timed) {
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel(e);
                continue;
            }
        }
        // 自旋,直到spins==0,進入等待
        if (spins > 0)
            --spins;
        // 設置等待線程
        else if (s.waiter == null)
            s.waiter = w;
        // 調用LockSupport.park進入等待
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

awaitFulfill()主要涉及自旋以及LockSupport.park()兩個關鍵點,自旋可去了解自旋鎖的原理。

自旋鎖原理:經過空循環則霸佔着CPU,避免當前線程進入睡眠,由於睡眠/喚醒是須要進行線程上下文切換的,因此若是線程睡眠的時間很段,那麼使用空循環可以避免線程進入睡眠的耗時,從而快速響應。可是因爲空循環會浪費CPU,因此也不能一直循環。自旋鎖通常適合同步快很小,競爭不是很激烈的場景。

LockSupport.park()可到API文檔進行了解。

clean

下面再看看如何清除被取消的節點。

/**
 *@By Vicky:清除節點被取消的節點 
 */
void clean(QNode pred, QNode s) {
    s.waiter = null; // forget thread
    // 若是pred.next!=s則說明s已經出隊了
    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;
        QNode hn = h.next;   // Absorb cancelled first node as head
        // 從隊列頭部開始遍歷,遇到被取消的節點則將其出隊 
        if (hn != null && hn.isCancelled()) {
            advanceHead(h, hn);
            continue;
        }
        QNode t = tail;      // Ensure consistent read for tail
        // t==h則隊列爲null
        if (t == h)
            return;
        QNode tn = t.next;
        if (t != tail)
            continue;
        // 幫助其餘線程入隊
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }
        // 只能出隊非尾節點
        if (s != t) {        // If not tail, try to unsplice
            // 出隊方式很簡單,將pred.next指向s.next便可
            QNode sn = s.next;
            if (sn == s || pred.casNext(s, sn))
                return;
        }
        // 若是s是隊尾元素,那麼就須要cleanMe出場了,若是cleanMe==null,則只需將pred賦值給cleanMe便可,
        // 賦值cleanMe的意思是等到s不是隊尾時再進行清除,畢竟隊尾只有一個
        // 同時將上次的cleanMe清除掉,正常狀況下此時的cleanMe已經不是隊尾了,由於當前須要清除的節點是隊尾
        // (上面說的cleanMe實際上是須要清除的節點的前繼節點)
        QNode dp = cleanMe;
        if (dp != null) {    // Try unlinking previous cancelled node
            QNode d = dp.next;
            QNode dn;
            // d==null說明須要清除的節點已經沒了
            // d==dp說明dp已經被清除了,那麼dp.next也一併被清除了
            // 若是d未被取消,說明哪裏出錯了,將cleanMe清除,不清除這個節點了
            // 後面括號將清除cleanMe的next出局,前提是cleanMe.next沒有已經被出局
            if (d == null ||               // d is gone or
                d == dp ||                 // d is off list or
                !d.isCancelled() ||        // d not cancelled or
                (d != t &&                 // d not tail and
                 (dn = d.next) != null &&  //   has successor
                 dn != d &&                //   that is on list
                 dp.casNext(d, dn)))       // d unspliced
                casCleanMe(dp, null);
            // dp==pred說明cleanMe.next已經其餘線程被更新了
            if (dp == pred)
                return;      // s is already saved node
        } else if (casCleanMe(null, pred))
            return;          // Postpone cleaning s
    }
}

清除節點時有個原則:不能清除隊尾節點。因此若是對尾節點須要被清除,則將其保存到cleanMe變量,等待下次進行清除。在清除cleanMe時可能說的有點模糊,由於涉及到太多的併發會出現不少狀況,因此if條件太多,致使難以分析所有狀況。

以上就是TransferQueue的操做邏輯,下面看看後進先出的TransferStack。

unfair模式

unfair模式使用一個LIFO的隊列保存線程,TransferStack的結構以下:

/** Dual stack */
static final class TransferStack extends Transferer {
    /* Modes for SNodes, ORed together in node fields */
    /** Node represents an unfulfilled consumer */
    static final int REQUEST    = 0;// 消費者請求數據
    /** Node represents an unfulfilled producer */
    static final int DATA       = 1;// 生產者生產數據
    /** Node is fulfilling another unfulfilled DATA or REQUEST */
    static final int FULFILLING = 2;// 正在匹配中...

    /** 只須要判斷mode的第二位是否==1便可,==1則正在匹配中...*/
    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

    /** Node class for TransferStacks. */
    static final class SNode {
        volatile SNode next;        // next node in stack
        volatile SNode match;       // the node matched to this
        volatile Thread waiter;     // to control park/unpark
        Object item;                // data; or null for REQUESTs
        int mode;
        // Note: item and mode fields don't need to be volatile
        // since they are always written before, and read after,
        // other volatile/atomic operations.

        SNode(Object item) {
            this.item = item;
        }
    }

    /** The head (top) of the stack */
    volatile SNode head;

    static SNode snode(SNode s, Object e, SNode next, int mode) {
        if (s == null) s = new SNode(e);
        s.mode = mode;
        s.next = next;
        return s;
    }
}

TransferStacks比TransferQueue的結構複雜些。使用一個head指向棧頂元素,使用內部類SNode封裝棧中的節點信息,SNode包含5個變量:

  • next:指向棧中下一個節點
  • match:與之匹配的節點
  • waiter:等待的線程
  • item:數據
  • mode:模式,對應REQUEST/DATA/FULFILLING(第三個並非FULFILLING,而是FULFILLING | REQUEST或者FULFILLING | DATA)

SNode的5個變量,三個是volatile的,另外兩個item和mode沒有volatile修飾,代碼註釋給出的解釋是:對這兩個變量的寫老是發生在volatile/原子操做的以前,讀老是發生在volatile/原子操做的以後。

上面提到SNode.mode的三個常量表示棧中節點的狀態,f分別爲:

  • REQUEST:0,消費者的請求生成的節點
  • DATA:1,生產者的請求生成的節點
  • FULFILLING:2,正在匹配中的節點,具體對應的mode值是FULFILLING | REQUEST和FULFILLING | DATA

其餘內部基本同TransferQueue,不一樣之處是當匹配到一個節點時並不是是將被匹配的節點出棧,而是將匹配的節點入棧,而後同時將匹配上的兩個節點一塊兒出棧。下面咱們參照TransferQueue來看看TransferStacks的三個方法:transfer(Object, boolean, long)、awaitFulfill(QNode, Object, boolean, long)、clean(QNode, QNode)。

transfer

/**
 * @By Vicky:交換數據,生產者和消費者經過e==null來區分
 */
Object transfer(Object e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    int mode = (e == null)? REQUEST : DATA;// 根據e==null判斷生產者仍是消費者,對應不一樣的mode值

    for (;;) {
        SNode h = head;
        // 棧爲null或者棧頂元素的模式同當前模式,則進行入棧操做
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 不等待,則直接返回null,返回以前順帶清理下被取消的元素
            if (timed && nanos <= 0) {      // can't wait
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {// 入棧,更新棧頂爲新節點
                // 等待,返回值m==s,則被取消,需清除
                SNode m = awaitFulfill(s, timed, nanos);
                // m==s說明s被取消了,清除
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                // 幫忙出棧
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                // 消費者則返回生產者的數據,生產者則返回本身的數據
                return mode == REQUEST? m.item : s.item;
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill   // 棧頂未開始匹配,則開始匹配
            // h被取消,則出棧
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            // 更新棧頂爲新插入的節點,並更新節點的mode爲FULFILLING,對應判斷是否正在出棧的方法
            // 匹配須要先將待匹配的節點入棧,因此不論是匹配仍是不匹配都須要建立一個節點入棧
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 循環直到找到一個能夠匹配的節點
                for (;;) { // loop until matched or waiters disappear
                    // m即與s匹配的節點
                    SNode m = s.next;       // m is s's match
                    // m==null說明棧s以後無元素了,直接將棧頂設置爲null,並從新進行最外層的循環
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    // 將s設置爲m的匹配節點,並更新棧頂爲m.next,即將s和m同時出棧
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (mode == REQUEST)? m.item : s.item;
                    } else                  // lost match
                        // 設置匹配失敗,則說明m正準備出棧,幫助出棧
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller // 棧頂已開始匹配,幫助匹配
            // 此處的操做邏輯同上面的操做邏輯一致,目的就是幫助上面進行操做,由於此處完成匹配須要分紅兩步:
            // a.m.tryMatch(s)和b.casHead(s, mn)
            // 因此必然會插入其餘線程,只要插入的線程也按照這個步驟執行那麼就避免了不一致問題
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}
  •  

從上面的代碼能夠看出TransferStack.transfer()的總體流程:

  1. 判斷當前棧是否爲null或者棧頂線程是否與當前線程匹配,爲null或者不匹配都將進行入棧操做
  2. 入棧主要很簡單,分紅兩步:插入一個節點入棧,該步無需同步,第二步須要head指針指向新節點,該步經過CAS保證安全
  3. 入棧以後須要將當前線程阻塞,調用LockSupport.park()方法,直到打斷/超時/被匹配的線程喚醒
  4. 若是被取消,則須要調用clean()方法進行清除
  5. 因爲LIFO,因此匹配的節點老是棧頂的兩個節點,分紅兩步:原子性更新節點的match變量,更新head。因爲兩步沒法保證原子性,因此經過將棧頂元素的mode更新爲FULFILLING,阻止其餘線程在棧頂發生匹配時進行其餘操做,同時其餘線程需幫助棧頂進行的匹配操做

awaitFulfill

下面看看TransferStack是如何讓一個線程進入阻塞。

/**
 *@ By Vicky:等待匹配,邏輯大體同TransferQueue可參考閱讀
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    long lastTime = (timed)? System.nanoTime() : 0;
    Thread w = Thread.currentThread();
    SNode h = head;
    // 計算自旋的次數,邏輯大體同TransferQueue
    int spins = (shouldSpin(s)?
                 (timed? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel();
        // 若是s的match不等於null,有三種狀況:
        // a.等待被取消了,此時x==s
        // b.匹配上了,此時match==另外一個節點
        // c.線程被打斷,會取消,此時x==s
        // 不論是哪一種狀況都不要再等待了,返回便可
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            // 等待
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel();
                continue;
            }
        }
        // 自旋
        if (spins > 0)
            spins = shouldSpin(s)? (spins-1) : 0;
        // 設置等待線程
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        // 等待
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

邏輯基本同TransferQueue,不一樣之處是經過修改SNode的match變量標示匹配,以及取消。

clean

下面再看看如何清除被取消的節點。

/**
 * @By Vicky:清除節點
 */
void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread
    // 清除
    SNode past = s.next;
    if (past != null && past.isCancelled())
        past = past.next;

    // Absorb cancelled nodes at head
    // 從棧頂節點開始清除,一直到遇到未被取消的節點,或者直到s.next
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        casHead(p, p.next);

    // Unsplice embedded nodes
    // 若是p自己未取消(上面的while碰到一個未取消的節點就會退出,但這個節點和past節點之間可能還有取消節點),
    // 再把p到past之間的取消節點都移除。
    while (p != null && p != past) {
        SNode n = p.next;
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            p = n;
    }
}

以上即所有的TransferStack的操做邏輯。

看完了TransferQueue和TransferStack的邏輯,SynchronousQueue的邏輯基本清楚了。

應用場景

SynchronousQueue的應用場景得看具體業務需求,J.U.C下有一個應用案例:Executors.newCachedThreadPool()就是使用SynchronousQueue做爲任務隊列。