Spark的Shuffle機制

MapReduce中的Shuffle

轉載於: http://www.jianshu.com/p/60bab35bc01e

在MapReduce框架中,shuffle是連接Map和Reduce之間的橋樑,Map的輸出要用到Reduce中必須經過shuffle這個環節,shuffle的性能高低直接影響了整個程序的性能和吞吐量。
Shuffle是MapReduce框架中的一個特定的phase,介於Map phase和Reduce phase之間,當Map的輸出結果要被Reduce使用時,輸出結果需要按key哈希,並且分發到每一個Reducer上去,這個過程就是shuffle。由於shuffle涉及到了磁盤的讀寫和網絡的傳輸,因此shuffle性能的高低直接影響到了整個程序的運行效率。
下圖描述了MapReduce算法的整個流程,其中shuffle phase是介於Map phase和Reduce phase之間:

在Hadoop, 在mapper端每次當memory buffer中的數據快滿的時候, 先將memory中的數據, 按partition進行劃分, 然後各自存成小文件, 這樣當buffer不斷的spill的時候, 就會產生大量的小文件。
所以Hadoop後面直到reduce之前做的所有的事情其實就是不斷的merge, 基於文件的多路並歸排序,在map端的將相同partition的merge到一起, 在reduce端, 把從mapper端copy來的數據文件進行merge, 以用於最終的reduce
多路歸併排序, 達到兩個目的。
merge, 把相同key的value都放到一個arraylist裏面;sort, 最終的結果是按key排序的。

這個方案擴展性很好, 面對大數據也沒有問題, 當然問題在效率, 畢竟需要多次進行基於文件的多路歸併排序,多輪的和磁盤進行數據讀寫。

Spark的Shuffle機制


下圖爲例簡單描述一下Spark中shuffle的整一個流程:

·     首先每一個Mapper會根據Reducer的數量創建出相應的bucket,bucket的數量是M×RM×R,其中MM是Map的個數,RR是Reduce的個數。

·     其次Mapper產生的結果會根據設置的partition算法填充到每個bucket中去。這裏的partition算法是可以自定義的,當然默認的算法是根據key哈希到不同的bucket中去。

·     當Reducer啓動時,它會根據自己task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應的bucket作爲Reducer的輸入進行處理。

· 這裏的bucket是一個抽象概念,在實現中每個bucket可以對應一個文件,可以對應文件的一部分或是其他等。

·      Apache Spark 的 Shuffle 過程與 Apache Hadoop 的 Shuffle 過程有着諸多類似,一些概念可直接套用,例如,Shuffle 過程中,提供數據的一端,被稱作 Map 端, Map 端每個生成數據的任務稱爲 Mapper,對應的,接收數據的一端,被稱作 Reduce 端,Reduce 端每個拉取數據的任務稱爲 Reducer。

· Shuffle 過程本質上都是將 Map 端獲得的數據使用分區器進行劃分,並將數據發送給對應的 Reducer 的過程。

shuffle 和 stage
shuffle 是劃分 DAG 中 stage 的標識,同時影響 Spark 執行速度的關鍵步驟. 
RDD 的 Transformation 函數中,又分爲窄依賴(narrow dependency)和寬依賴(wide dependency)的操作.窄依賴跟寬依賴的區別是是否發生 shuffle(洗牌) 操作.寬依賴會發生 shuffle 操作. 窄依賴是子 RDD的各個分片(partition)不依賴於其他分片,能夠獨立計算得到結果,寬依賴指子 RDD 的各個分片會依賴於父RDD 的多個分片,所以會造成父 RDD 的各個分片在集羣中重新分片, 看如下兩個示例:

第一個 Map 操作將 RDD 裏的各個元素進行映射, RDD 的各個數據元素之間不存在依賴,可以在集羣的各個內存中獨立計算,也就是並行化,第二個 groupby 之後的 Map 操作,爲了計算相同 key 下的元素個數,需要把相同 key 的元素聚集到同一個 partition 下,所以造成了數據在內存中的重新分佈,即 shuffle 操作.shuffle 操作是 spark 中最耗時的操作,應儘量避免不必要的 shuffle. 

shuffle過程可能需要完成以下過程
重新進行數據分區

數據傳輸

數據壓縮

磁盤I/O

shuffle工作流程
在運行job時,spark是一個stage一個stage執行的。先把任務分成stage,在任務提交階段會把任務形成taskset,在執行任務。 
    spark的DAGScheduler根據RDD的ShuffleDependency來構建Stages:

例如:ShuffleRDD/CoGroupedRDD有一個ShuffleDependency。

很多操作通過鉤子函數來創建ShuffleRDD

每個ShuffleDependency會map到spark的job的一個stage,然後會導致一個shuffle過程。

基於sort的shuffle
sort-based的shuffle,會把輸入的記錄根據目標分區id(partition ids)進行排序。然後寫入單個的map輸出文件中。爲了讀取map的輸出部分,Reducers獲取此文件的連續區域 。當map輸出的數據太大而內存無法存放時,輸出的排序子集可以保存到磁盤,這些磁盤文件被合併後,生成最終的輸出文件。 
sort shuffle有兩個不同的輸出路徑來產生map的輸出文件:

序列化排序(Serialized sorting) 
在使用序列化排序時,需要滿足以下3個條件: 

shuffle不指定聚合(aggregation)或輸出排序方法。

shuffle的序列化程序支持序列化值的重定位(KryoSerializer和Spark SQL的自定義序列化程序目前支持此操作)。

shuffle產生小於16777216個輸出分區。

反序列化排序(Deserialized sorting) 
用來處理所有其他情況。

Sort Shuffle Writer
·    每個map任務都會產生一個shuffle數據文件,和一個Index文件 

· 通過外部排序類ExternalSorter對數據進行排序

·     若map-side需要進行合併(combine)操作,數據將會按key和分區進行排序,若沒有合併操作數據只會根據分區進行排序。

參考

https://blog.csdn.net/zg_hover/article/details/73018918