Spark Shuffle之SortShuffleManager機制原理

1. SparkShuffle 概念

reduceByKey 會將上一個 RDD 中的每一個 key 對應的所有 value 聚合
成一個 value,然後生成一個新的 RDD,元素類型是<key,value>對的
形式,這樣每一個 key 對應一個聚合起來的 value。
問題:聚合之前,每一個 key 對應的 value 不一定都是在一個
partition 中,也不太可能在同一個節點上,因爲 RDD 是分佈式的彈性
的數據集,RDD 的 partition 極有可能分佈在各個節點上。
如何聚合?
– Shuffle Write:上一個 stage 的每個 map task 就必須保證將自己
處理的當前分區的數據相同的 key 寫入一個分區文件中,可能會寫入多
個不同的分區文件中。
– Shuffle Read:reduce task 就會從上一個 stage 的所有 task 所在
的機器上尋找屬於己的那些分區文件,這樣就可以保證每一個 key 所對
應的 value 都會匯聚到同一個節點上去處理和聚合。
Spark 中有兩種 Shuffle 管理類型,HashShufflManager 和
SortShuffleManager,Spark1.2 之前是 HashShuffleManager,
Spark1.2 引入 SortShuffleManager,在 Spark 2.0+版本中已經將
HashShuffleManager 丟棄。

1) 普通機制

普通機制示意圖:
在這裏插入圖片描述
執行流程
a) map task 的計算結果會寫入到一個內存數據結構裏面,內存
數據結構默認是 5M
b) 在 shuffle 的時候會有一個定時器,不定期的去估算這個內存
結構的大小,當內存結構中的數據超過 5M 時,比如現在內
存結構中的數據爲 5.01M,那麼他會申請 5.012-5=5.02M
內存給內存數據結構。
c) 如果申請成功不會進行溢寫,如果申請不成功,這時候會發
生溢寫磁盤。
d) 在溢寫之前內存結構中的數據會進行排序分區
e) 然後開始溢寫磁盤,寫磁盤是以 batch 的形式去寫,一個
batch 是 1 萬條數據,
f) map task 執行完成後,會將這些磁盤小文件合併成一個大的
磁盤文件,同時生成一個索引文件。
g) reduce task 去 map 端拉取數據的時候,首先解析索引文件,
根據索引文件再去拉取對應的數據。
總結
產生磁盤小文件的個數: 2
M(map task 的個數)

2) bypass 機制

bypass 機制示意圖:
在這裏插入圖片描述
總結 1 .bypass 運行機制的觸發條件如下: 算子不能有 map 端的預聚合操作。 shuffle reduce task 的數量小於 spark.shuffle.sort.bypassMergeThreshold 的參數值。這 個值默認是 200。 2 .產生的磁盤小文件爲:2*M(map task 的個數)