spark基礎之shuffle機制和原理分析

一 概述緩存

Shuffle就是對數據進行重組,因爲分佈式計算的特性和要求,在實現細節上更加繁瑣和複雜網絡

在MapReduce框架,Shuffle是鏈接Map和Reduce之間的橋樑,Map階段經過shuffle讀取數據並輸出到對應的Reduce;而Reduce階段負責從Map端拉取數據並進行計算。在整個shuffle過程當中,每每伴隨着大量的磁盤和網絡I/O。因此shuffle性能的高低也直接決定了整個程序的性能高低。Spark也會有本身的shuffle實現過程併發



在DAG調度的過程當中,Stage階段的劃分是根據是否有shuffle過程,也就是存在ShuffleDependency寬依賴的時候,須要進行shuffle,這時候會將做業job劃分紅多個Stage;而且在劃分Stage的時候,構建ShuffleDependency的時候進行shuffle註冊,獲取後續數據讀取所須要的ShuffleHandle,最終每個job提交後都會生成一個ResultStage和若干個ShuffleMapStage,其中ResultStage表示生成做業的最終結果所在的Stage. ResultStage與ShuffleMapStage中的task分別對應着ResultTask與ShuffleMapTask。一個做業,除了最終的ResultStage外,其餘若干ShuffleMapStage中各個ShuffleMapTask都須要將最終的數據根據相應的Partitioner對數據進行分組,而後持久化分區的數據。app

 

一 HashShuffle機制框架

1.1 HashShuffle概述分佈式

在spark-1.6版本以前,一直使用HashShuffle,在spark-1.6版本以後使用Sort-Base Shuffle,由於HashShuffle存在的不足因此就替換了HashShuffle. oop

 

咱們知道,Spark的運行主要分爲2部分:一部分是驅動程序,其核心是SparkContext;另外一部分是Worker節點上Task,它是運行實際任務的。程序運行的時候,Driver和Executor進程相互交互:運行什麼任務,即Driver會分配Task到Executor,Driver 跟 Executor 進行網絡傳輸; 任務數據從哪兒獲取,即Task要從 Driver 抓取其餘上游的 Task 的數據結果,因此有這個過程當中就不斷的產生網絡結果。其中,下一個 Stage 向上一個 Stage 要數據這個過程,咱們就稱之爲 Shuffle。性能

 

1.2 沒有優化以前的HashShuffle機制優化


在HashShuffle沒有優化以前,每個ShufflleMapTask會爲每個ReduceTask建立一個bucket緩存,而且會爲每個bucket建立一個文件。這個bucket存放的數據就是通過Partitioner操做(默認是HashPartitioner)以後找到對應的bucket而後放進去,最後將數據spa

刷新bucket緩存的數據到磁盤上,即對應的block file.

 

而後ShuffleMapTask將輸出做爲MapStatus發送到DAGScheduler的MapOutputTrackerMaster,每個MapStatus包含了每個ResultTask要拉取的數據的位置和大小

ResultTask而後去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster獲取MapStatus,看哪一份數據是屬於本身的,而後底層經過BlockManager將數據拉取過來

 

拉取過來的數據會組成一個內部的ShuffleRDD,優先放入內存,內存不夠用則放入磁盤,而後ResulTask開始進行聚合,最後生成咱們但願獲取的那個MapPartitionRDD

 

缺點:

如上圖所示:在這裏有1個worker,2個executor,每個executor運行2個ShuffleMapTask,有三個ReduceTask,因此總共就有4 * 3=12個bucket和12個block file。

# 若是數據量較大,將會生成M*R個小文件,好比ShuffleMapTask有100個,ResultTask有100個,這就會產生100*100=10000個小文件

# bucket緩存很重要,須要將ShuffleMapTask全部數據都寫入bucket,纔會刷到磁盤,那麼若是Map端數據過多,這就很容易形成內存溢出,儘管後面有優化,bucket寫入的數據達到刷新到磁盤的閥值以後,就會將數據一點一點的刷新到磁盤,可是這樣磁盤I/O就多了

 

1.3 優化後的HashShuffle


每個Executor進程根據核數,決定Task的併發數量,好比executor核數是2,就是能夠併發運行兩個task,若是是一個則只能運行一個task

假設executor核數是1,ShuffleMapTask數量是M,那麼它依然會根據ResultTask的數量R,建立R個bucket緩存,而後對key進行hash,數據進入不一樣的bucket中,每個bucket對應着一個block file,用於刷新bucket緩存裏的數據

 

而後下一個task運行的時候,那麼不會再建立新的bucket和block file,而是複用以前的task已經建立好的bucket和block file。即所謂同一個Executor進程裏全部Task都會把相同的key放入相同的bucket緩衝區中

 

這樣的話,生成文件的數量就是(本地worker的executor數量*executor的cores*ResultTask數量)如上圖所示,即2 * 1* 3 = 6個文件,每個Executor的shuffleMapTask數量100,ReduceTask數量爲100,那麼

未優化的HashShuffle的文件數是2 *1* 100*100 =20000,優化以後的數量是2*1*100 = 200文件,至關於少了100倍

 

缺點:若是 Reducer 端的並行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生不少小文件。

 

 

二 Sort-Based Shuffle

2.1 Sort-Based Shuffle概述

HashShuffle回顧

HashShuffle寫數據的時候,內存有一個bucket緩衝區,同時在本地磁盤有對應的本地文件,若是本地有文件,那麼在內存應該也有文件句柄也是須要耗費內存的。也就是說,從內存的角度考慮,即有一部分存儲數據,一部分管理文件句柄。若是Mapper分片數量爲1000,Reduce分片數量爲1000,那麼總共就須要1000000個小文件。因此就會有不少內存消耗,頻繁IO以及GC頻繁或者出現內存溢出。

並且Reducer端讀取Map端數據時,Mapper有這麼多小文件,就須要打開不少網絡通道讀取,很容易形成Reducer(下一個stage)經過driver去拉取上一個stage數據的時候,說文件找不到,其實不是文件找不到而是程序不響應,由於正在GC.

 

2.2 Sorted-Based Shuffle介紹

爲了緩解Shuffle過程產生文件數過多和Writer緩存開銷過大的問題,spark引入了相似於hadoop Map-Reduce的shuffle機制。該機制每個ShuffleMapTask不會爲後續的任務建立單獨的文件,而是會將全部的Task結果寫入同一個文件,而且對應生成一個索引文件。之前的數據是放在內存緩存中,等到數據完了再刷到磁盤,如今爲了減小內存的使用,在內存不夠用的時候,能夠將輸出溢寫到磁盤,結束的時候,再將這些不一樣的文件聯合內存的數據一塊兒進行歸併,從而減小內存的使用量。一方面文件數量顯著減小,另外一方面減小Writer緩存所佔用的內存大小,並且同時避免GC的風險和頻率。

 


Sort-Based Shuffle有幾種不一樣的策略:BypassMergeSortShuffleWriter、SortShuffleWriter和UnasfeSortShuffleWriter。

 

對於BypassMergeSortShuffleWriter,使用這個模式特色:

# 主要用於處理不須要排序和聚合的Shuffle操做,因此數據是直接寫入文件,數據量較大的時候,網絡I/O和內存負擔較重

# 主要適合處理Reducer任務數量比較少的狀況下

# 將每個分區寫入一個單獨的文件,最後將這些文件合併,減小文件數量;可是這種方式須要併發打開多個文件,對內存消耗比較大

 

由於BypassMergeSortShuffleWriter這種方式比SortShuffleWriter更快,因此若是在Reducer數量不大,又不須要在map端聚合和排序,並且

Reducer的數目 <  spark.shuffle.sort.bypassMergeThrshold指定的閥值,就是用的是這種方式。

對於SortShuffleWriter,使用這個模式特色:

# 比較適合數據量很大的場景或者集羣規模很大

# 引入了外部外部排序器,能夠支持在Map端進行本地聚合或者不聚合

# 若是外部排序器enable了spill功能,若是內存不夠,能夠先將輸出溢寫到本地磁盤,最後將內存結果和本地磁盤的溢寫文件進行合併

 

對於UnsafeShuffleWriter因爲須要謹慎使用,咱們暫不作分析。

 

另外這個Sort-Based ShuffleExecutor核數沒有關係,即跟併發度沒有關係,它是每個ShuffleMapTask都會產生一個data文件和index文件,所謂合併也只是將該ShuffleMapTask的各個partition對應的分區文件合併到data文件而已。因此這個就須要個Hash-BasedShuffleconsolidation機制區別開來。