【併發隊列】有界阻塞隊列 ArrayBlockingQueue 源碼解析

1、 前言

上節介紹了無界鏈表方式的阻塞隊列LinkedBlockingQueue,本節來研究下有界使用數組方式實現的阻塞隊列ArrayBlockingQueue。數組

主要實現原理: 經過ReentrantLock、Condition實現。利用Condition的signal、await實現。緩存

在隊列滿時, 若是在put會觸發notFull.await(), 等待take取出操做後經過notFull.signal()喚醒put。併發

在隊列空時, 若是在take會觸發notEmpty.await(), 等待put放入操做後經過notEmpty.signal()喚醒take。函數

經過記錄inputIndex記錄下次要存放的位置,循環。this

2、 ArrayBlockingQueue類圖結構


如圖ArrayBlockingQueue內部有個數組items用來存放隊列元素,putindex下標標示入隊元素下標,takeIndex是出隊下標,count統計隊列元素個數,從定義可知道並無使用volatile修飾,這是由於訪問這些變量使用都是在鎖塊內,並不存在可見性問題。另外有個獨佔鎖lock用來對出入隊操做加鎖,這致使同時只有一個線程能夠訪問入隊出隊,另外notEmpty,notFull條件變量用來進行出入隊的同步。spa

另外構造函數必須傳入隊列大小參數,因此爲有界隊列,默認是Lock爲非公平鎖。線程

1指針

2code

3隊列

4

5

6

7

8

9

10

11

12

public ArrayBlockingQueue(int capacity) {

        this(capacity, false);

  }

 

    public ArrayBlockingQueue(int capacity, boolean fair) {

        if (capacity <= 0)

            throw new IllegalArgumentException();

        this.items = new Object[capacity];

        lock = new ReentrantLock(fair);

        notEmpty = lock.newCondition();

        notFull =  lock.newCondition();

    }

3、offer操做

在隊尾插入元素,若是隊列滿則返回false,否者入隊返回true。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

public boolean offer(E e) {

 

    //e爲null,則拋出NullPointerException異常

    checkNotNull(e);

 

    //獲取獨佔鎖

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        //若是隊列滿則返回false

        if (count == items.length)

            return false;

        else {

            //否者插入元素

            insert(e);

            return true;

        }

    } finally {

        //釋放鎖

        lock.unlock();

    }

}

 

 

private void insert(E x) {

 

    //元素入隊

    items[putIndex] = x;

 

    //計算下一個元素應該存放的下標

    putIndex = inc(putIndex);

    ++count;

    notEmpty.signal();

}

 

//循環隊列,計算下標

final int inc(int i) {

    return (++i == items.length) ? 0 : i;

}

這裏因爲在操做共享變量前加了鎖,因此不存在內存不可見問題,加過鎖後獲取的共享變量都是從主內存獲取的,而不是在CPU緩存或者寄存器裏面的值,釋放鎖後修改的共享變量值會刷新會主內存中。

另外這個隊列是使用循環數組實現,因此計算下一個元素存放下標時候有些特殊。另外insert後調用 notEmpty.signal();是爲了激活調用notEmpty.await()阻塞後放入notEmpty條件隊列中的線程。

4、put操做

在隊列尾部添加元素,若是隊列滿則等待隊列有空位置插入後返回

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

public void put(E e) throws InterruptedException {

    checkNotNull(e);

    final ReentrantLock lock = this.lock;

 

    //獲取可被中斷鎖

    lock.lockInterruptibly();

    try {

 

        //若是隊列滿,則把當前線程放入notFull管理的條件隊列

        while (count == items.length)

            notFull.await();

 

        //插入元素

        insert(e);

    } finally {

        lock.unlock();

    }

}

須要注意的是若是隊列滿了那麼當前線程會阻塞,知道出隊操做調用了notFull.signal方法激活該線程。

代碼邏輯很簡單,可是這裏須要思考一個問題爲啥調用lockInterruptibly方法而不是Lock方法。個人理解是由於調用了條件變量的await()方法,而await()方法會在中斷標誌設置後拋出InterruptedException異常後退出,因此還不如在加鎖時候先看中斷標誌是否是被設置了,若是設置了直接拋出InterruptedException異常,就不用再去獲取鎖了。而後看了其餘併發類裏面凡是調用了await的方法獲取鎖時候都是使用的lockInterruptibly方法而不是Lock也驗證了這個想法。

5、poll操做

從隊頭獲取並移除元素,隊列爲空,則返回null。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

public E poll() {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        //當前隊列爲空則返回null,否者

        return (count == 0) ? null : extract();

    } finally {

        lock.unlock();

    }

}

 

private E extract() {

    final Object[] items = this.items;

 

    //獲取元素值

    E x = this.<E>cast(items[takeIndex]);

 

    //數組中值值爲null;

    items[takeIndex] = null;

 

    //隊頭指針計算,隊列元素個數減一

    takeIndex = inc(takeIndex);

    --count;

 

    //發送信號激活notFull條件隊列裏面的線程

    notFull.signal();

    return x;

}

6、take操做

從隊頭獲取元素,若是隊列爲空則阻塞直到隊列有元素。

1

2

3

4

5

6

7

8

9

10

11

12

13

public E take() throws InterruptedException {

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

 

        //隊列爲空,則等待,直到隊列有元素

        while (count == 0)

            notEmpty.await();

        return extract();

    } finally {

        lock.unlock();

    }

}

須要注意的是若是隊列爲空,當前線程會被掛起放到notEmpty的條件隊列裏面,直到入隊操做執行調用notEmpty.signal後當前線程纔會被激活,await纔會返回。

7、peek操做

返回隊列頭元素但不移除該元素,隊列爲空,返回null

1

2

3

4

5

6

7

8

9

10

11

12

13

14

public E peek() {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        //隊列爲空返回null,否者返回頭元素

        return (count == 0) ? null : itemAt(takeIndex);

    } finally {

        lock.unlock();

    }

}

 

final E itemAt(int i) {

    return this.<E>cast(items[i]);

}

8、 size操做

獲取隊列元素個數,很是精確由於計算size時候加了獨佔鎖,其餘線程不能入隊或者出隊或者刪除元素

1

2

3

4

5

6

7

8

9

public int size() {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        return count;

    } finally {

        lock.unlock();

    }

}

9、總結

ArrayBlockingQueue經過使用全局獨佔鎖實現同時只能有一個線程進行入隊或者出隊操做,這個鎖的粒度比較大,有點相似在方法上添加synchronized的意味。其中offer,poll操做經過簡單的加鎖進行入隊出隊操做,而put,take則使用了條件變量實現若是隊列滿則等待,若是隊列空則等待,而後分別在出隊和入隊操做中發送信號激活等待線程實現同步。另外相比LinkedBlockingQueue,ArrayBlockingQueue的size操做的結果是精確的,由於計算前加了全局鎖。