《java 8 實戰》讀書筆記 -第七章 並行數據處理與性能

1、並行流

1.將順序流轉換爲並行流

對順序流調用parallel方法:java

public static long parallelSum(long n) { 
 return Stream.iterate(1L, i -> i + 1) 
 .limit(n) 
 .parallel() 
 .reduce(0L, Long::sum); 
}

它在內部實際上就是設了一個boolean標誌,表示你想讓調用parallel以後進行的全部操做都並行執行。相似地,你只須要對並行流調用sequential方法就能夠把它變成順序流。但最後一次parallel或sequential調用會影響整個流水線。算法

2.測量流性能

  • iterate生成的是裝箱的對象,必須拆箱成數字才能求和;
  • 咱們很難把iterate分紅多個獨立塊來並行執行。

iterate很難分割成可以獨立執行的小塊,由於每次應用這個函數都要依賴前一次應用的結果,整張數字列表在概括過程開始時沒有準備好,於是沒法有效地把流劃分爲小塊來並行處理。把流標記成並行,你實際上是給順序處理增長了開銷,它還要把每次求和操做分到一個不一樣的線程上。segmentfault

3.正確使用並行流

錯用並行流而產生錯誤的首要緣由,就是使用的算法改變了某些共享狀態。數據結構

public class Accumulator { 
 public long total = 0; 
 public void add(long value) { total += value; } 
}

public static long sideEffectParallelSum(long n) { 
 Accumulator accumulator = new Accumulator(); 
 LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); 
 return accumulator.total; 
}
上面的示例在本質上就是順序的,每次訪問total都會出現數據競爭.因爲多個線程在同時訪問累加器,執行total += value,而這一句雖然看似簡單,卻不是一個原子操做。所得的結果也是不可控的(錯誤的)。

4.高效使用並行流

  • 留意裝箱
  • 有些操做自己在並行流上的性能就比順序流差
  • 還要考慮流的操做流水線的總計算成本。設N是要處理的元素的總數,Q是一個元素經過流水線的大體處理成本,則N*Q就是這個對成本的一個粗略的定性估計。Q值較高就意味着使用並行流時性能好的可能性比較大
  • 對於較小的數據量,選擇並行流幾乎歷來都不是一個好的決定
  • 要考慮流背後的數據結構是否易於分解
  • 流自身的特色,以及流水線中的中間操做修改流的方式,均可能會改變分解過程的性能。
  • 還要考慮終端操做中合併步驟的代價是大是小

圖片描述

2、分支/合併框架(Fork/Join)

詳見第六章相關內容
注意:不該該在RecursiveTask內部使用ForkJoinPool的invoke方法。相反,你應該始終直接調用compute或fork方法,只有順序代碼才應該用invoke來啓動並行計算。框架

3、Spliterator

Spliterator是Java 8中加入的另外一個新接口;這個名字表明「可分迭代器」(splitable iterator)。和Iterator同樣,Spliterator也用於遍歷數據源中的元素,但它是爲了並行執行而設計的。
Spliterator接口ide

public interface Spliterator<T> { 
 boolean tryAdvance(Consumer<? super T> action); 
 Spliterator<T> trySplit(); 
 long estimateSize(); 
 int characteristics(); 
}

與往常同樣,T是Spliterator遍歷的元素的類型。tryAdvance方法的行爲相似於普通的Iterator,由於它會按順序一個一個使用Spliterator中的元素,而且若是還有其餘元素要遍歷就返回true。但trySplit是專爲Spliterator接口設計的,由於它能夠把一些元素劃出去分給第二個Spliterator(由該方法返回),讓它們兩個並行處理。Spliterator還可經過estimateSize方法估計還剩下多少元素要遍歷,由於即便不那麼確切,能快速算出來是一個值也有助於讓拆分均勻一點.函數

1.拆分過程

將Stream拆分紅多個部分的算法是一個遞歸過程,如圖所示。第一步是對第一個Spliterator調用trySplit,生成第二個Spliterator。第二步對這兩個Spliterator調用trysplit,這樣總共就有了四個Spliterator。這個框架不斷對Spliterator調用trySplit直到它返回null,代表它處理的數據結構不能再分割,如第三步所示。最後,這個遞歸拆分過程到第四步就終止了,這時全部的Spliterator在調用trySplit時都返回了null。
圖片描述性能

2.實現你本身的 Spliterator

文中提到了reduce的三參數重載方法this

<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner)

它的三個參數:spa

  • identity: 一個初始化的值;這個初始化的值其類型是泛型U,與Reduce方法返回的類型一致;注意此時Stream中元素的類型是T,與U能夠不同也能夠同樣,這樣的話操做空間就大了;無論Stream中存儲的元素是什麼類型,U均可以是任何類型,如U能夠是一些基本數據類型的包裝類型Integer、Long等;或者是String,又或者是一些集合類型ArrayList等;後面會說到這些用法。
  • accumulator: 其類型是BiFunction,輸入是U與T兩個類型的數據,而返回的是U類型;也就是說返回的類型與輸入的第一個參數類型是同樣的,而輸入的第二個參數類型與Stream中元素類型是同樣的。
  • combiner: 其類型是BinaryOperator,支持的是對U類型的對象進行操做;

第三個參數combiner主要是使用在並行計算的場景下;若是Stream是非並行時,第三個參數其實是不生效的。

代碼實現:

class WordCounter {
    private final int counter;
    private final boolean lastSpace;

    public WordCounter(int counter, boolean lastSpace) {
        this.counter = counter;
        this.lastSpace = lastSpace;
    }

    public WordCounter accumulate(Character c) {
        if (Character.isWhitespace(c)) {
            return lastSpace ? this : new WordCounter(counter, true);
        } else {
            return lastSpace ? new WordCounter(counter + 1, false) : this;
        }
    }

    public WordCounter combine(WordCounter wordCounter) {
        return new WordCounter(counter + wordCounter.counter,
            wordCounter.lastSpace);
    }

    public int getCounter() {
        return counter;
    }
}
class WordCounterSpliterator implements Spliterator<Character> {
    private final String string;
    private int currentChar = 0;

    public WordCounterSpliterator(String string) {
        this.string = string;
    }

    @Override
    public boolean tryAdvance(Consumer<?super Character> action) {
        action.accept(string.charAt(currentChar++));

        return currentChar < string.length();
    }

    @Override
    public Spliterator<Character> trySplit() {
        int currentSize = string.length() - currentChar;

        if (currentSize < 10) {
            return null;
        }

        for (int splitPos = (currentSize / 2) + currentChar;
                splitPos < string.length(); splitPos++) {
            if (Character.isWhitespace(string.charAt(splitPos))) {
                Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(
                            currentChar, splitPos));
                currentChar = splitPos;

                return spliterator;
            }
        }

        return null;
    }

    @Override
    public long estimateSize() {
        return string.length() - currentChar;
    }

    @Override
    public int characteristics() {
        return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
    }
}
final String SENTENCE = 
 " Nel mezzo del cammin di nostra vita " + 
 "mi ritrovai in una selva oscura" + 
 " ché la dritta via era smarrita ";


private int countWords(Stream<Character> stream) { 
 WordCounter wordCounter = stream.reduce(new WordCounter(0, true), 
 WordCounter::accumulate, 
 WordCounter::combine); 
 return wordCounter.getCounter(); 
}

Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); 
Stream<Character> stream = StreamSupport.stream(spliterator, true);

System.out.println("Found " + countWords(stream) + " words");

最後打印顯示

Found 19 words