大數據開發-Spark Join原理詳解

數據分析中將兩個數據集進行 Join 操做是很常見的場景。在 Spark 的物理計劃階段,Spark 的 Join Selection 類會根 據 Join hints 策略、Join 表的大小、 Join 是等值 Join 仍是不等值以及參與 Join 的 key 是否能夠排序等條件來選擇最 終的 Join 策略,最後 Spark 會利用選擇好的 Join 策略執行最終的計算。當前 Spark 一共支持五種 Join 策略:算法

  • Broadcast hash join (BHJ)sql

  • Shuffle hash join(SHJ)oop

  • Shuffle sort merge join (SMJ)大數據

  • Shuffle-and-replicate nested loop join,又稱笛卡爾積(Cartesian product join)spa

  • Broadcast nested loop join (BNLJ)code

其中 BHJ SMJ 這兩種 Join 策略是咱們運行 Spark 做業最多見的。JoinSelection 會先根據 Join 的 Key 爲等值 Join 來選擇 Broadcast hash joinShuffle hash join 以及 Shuffle sort merge join 中的一個;若是 Join 的 Key 爲不等值 Join 或者沒有指定 Join 條件,則會選擇 Broadcast nested loop joinShuffle-and-replicate nested loop join。 不一樣的 Join 策略在執行上效率差異很大,瞭解每種 Join 策略的執行過程和適用條件是頗有必要的。blog

一、Broadcast Hash Join

Broadcast Hash Join 的實現是將小表的數據廣播到 Spark 全部的 Executor 端,這個廣播過程和咱們本身去廣播數 據沒什麼區別:排序

利用 collect 算子將小表的數據從 Executor 端拉到 Driver 端 在 Driver 端調用 sparkContext.broadcast 廣播到全部 Executor 端 在 Executor 端使用廣播的數據與大表進行 Join 操做(其實是執行map操做)內存

這種 Join 策略避免了 Shuffle 操做。通常而言,Broadcast Hash Join 會比其餘 Join 策略執行的要快。開發

file

使用這種 Join 策略必須知足如下條件: 小表的數據必須很小,能夠經過 spark.sql.autoBroadcastJoinThreshold 參數來配置,默認是 10MB 若是內存比較大,能夠將閾值適當加大 將 spark.sql.autoBroadcastJoinThreshold 參數設置爲 -1,能夠關閉這種鏈接方式 只能用於等值 Join,不要求參與 Join 的 keys 可排序

二、Shuffle Hash Join

當表中的數據比較大,又不適合使用廣播,這個時候就能夠考慮使用 Shuffle Hash JoinShuffle Hash Join 一樣是在大表和小表進行 Join 的時候選擇的一種策略。它的計算思想是:把大表和小表按照相同 的分區算法和分區數進行分區(根據參與 Join 的 keys 進行分區),這樣就保證了 hash 值同樣的數據都分發到同一 個分區中,而後在同一個 Executor 中兩張表 hash 值同樣的分區就能夠在本地進行 hash Join 了。在進行 Join 之 前,還會對小表的分區構建 Hash Map。Shuffle hash join 利用了分治思想,把大問題拆解成小問題去解決。

file

要啓用 Shuffle Hash Join 必須知足如下條件: 僅支持等值 Join,不要求參與 Join 的 Keys 可排序 spark.sql.join.preferSortMergeJoin 參數必須設置爲 false,參數是從 Spark 2.0.0 版本引入的,默認值爲 true,也就是默認狀況下選擇 Sort Merge Join 小表的大小(plan.stats.sizeInBytes)必須小於 spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默認值200) 並且小表大小(stats.sizeInBytes)的三倍必須小於等於大表的大小(stats.sizeInBytes),也就是 a.stats.sizeInBytes * 3 < = b.stats.sizeInBytes

三、Shuffle Sort Merge Join

前面兩種 Join 策略對錶的大小都有條件的,若是參與 Join 的表都很大,這時候就得考慮用 Shuffle Sort Merge Join 了。 Shuffle Sort Merge Join 的實現思想: 將兩張表按照 join key 進行shuffle,保證join key值相同的記錄會被分在相應的分區 對每一個分區內的數據進行排序 排序後再對相應的分區內的記錄進行鏈接 不管分區有多大,Sort Merge Join都不用把一側的數據所有加載到內存中,而是即用即丟;由於兩個序列都有序。從 頭遍歷,碰到key相同的就輸出,若是不一樣,左邊小就繼續取左邊,反之取右邊。從而大大提升了大數據量下sql join 的穩定性。

file

要啓用 Shuffle Sort Merge Join 必須知足如下條件:

僅支持等值 Join,而且要求參與 Join 的 Keys 可排序

四、Cartesian product join

若是 Spark 中兩張參與 Join 的表沒指定鏈接條件,那麼會產生 Cartesian product join,這個 Join 獲得的結果其實

就是兩張錶行數的乘積。

五、Broadcast nested loop join

能夠把 Broadcast nested loop join 的執行看作下面的計算:

for record_1 in relation_1:

for record_2 in relation_2:

join condition is executed

能夠看出 Broadcast nested loop join 在某些狀況會對某張表重複掃描屢次,效率很是低下。從名字能夠看出,這種

join 會根據相關條件對小表進行廣播,以減小表的掃描次數。

Broadcast nested loop join 支持等值和不等值 Join,支持全部的 Join 類型。 大數據開發,更多關注查看我的資料