從零到一spark進階之路(一)

從零到一spark進階之路(一)web

1.RDD

RDD是spark特有的數據模型,談到RDD就會提到什麼彈性分佈式數據集,什麼有向無環圖,本文暫時不去展開這些高深概念。算法

(0)隨便找個點理解如下

最重要的記住,RRD是不可變的,也就是說,已有的RDD不能被修改或者更新,但能夠從已有的RDD轉化成一個新的RDD.緩存

上面的特性解讀:
RDD能夠cache到內存中,每次對RDD數據集的操做以後的結果,均可以存放到內存中,下一個操做能夠直接從內存中輸入,省去了MapReduce大量的磁盤IO操做。這對於迭代運算比較常見的機器學習算法, 交互式數據挖掘來講,效率提高比較大。
下面咱們來列舉如下RDD的基本特性:機器學習

(1)RDD的特色
1)建立:只能經過轉換 ( transformation ,如map/filter/groupBy/join 等,區別於動做 action) 從兩種數據源中建立 RDD 1 )穩定存儲中的數據; 2 )其餘 RDD。
  2)只讀:狀態不可變,不能修改。
  3)分區:支持使 RDD 中的元素根據那個 key 來分區 ( partitioning ) ,保存到多個結點上。還原時只會從新計算丟失分區的數據,而不會影響整個系統。
  4)路徑:在 RDD 中叫世族或血統 ( lineage ) ,即 RDD 有充足的信息關於它是如何從其餘 RDD 產生而來的。
  5)持久化:支持將會被重用的 RDD 緩存 ( 如 in-memory 或溢出到磁盤 )。
  6)延遲計算: Spark 也會延遲計算 RDD ,使其可以將轉換管道化 (pipeline transformation)。
  7)操做:豐富的轉換(transformation)和動做 ( action ) , count/reduce/collect/save 等。
  執行了多少次transformation操做,RDD都不會真正執行運算(記錄lineage),只有當action操做被執行時,運算纔會觸發。
(2)RDD的好處
1)RDD只能從持久存儲或經過Transformations操做產生,相比於分佈式共享內存(DSM)能夠更高效實現容錯,對於丟失部分數據分區只需根據它的lineage就可從新計算出來,而不須要作特定的Checkpoint。
  2)RDD的不變性,能夠實現類Hadoop MapReduce的推測式執行。
  3)RDD的數據分區特性,能夠經過數據的本地性來提升性能,這不Hadoop MapReduce是同樣的。
  4)RDD都是可序列化的,在內存不足時可自動降級爲磁盤存儲,把RDD存儲於磁盤上,這時性能會有大的降低但不會差於如今的MapReduce。
  5)批量操做:任務可以根據數據本地性 (data locality) 被分配,從而提升性能。
(3)RDD的內部屬性
經過RDD的內部屬性,用戶能夠獲取相應的元數據信息。經過這些信息能夠支持更復雜的算法或優化。
  1)分區列表:經過分區列表能夠找到一個RDD中包含的全部分區及其所在地址。
  2)計算每一個分片的函數:經過函數能夠對每一個數據塊進行RDD須要進行的用戶自定義函數運算。
  3)對父RDD的依賴列表,依賴還具體分爲寬依賴和窄依賴,但並非全部的RDD都有依賴。
  4)可選:key-value型的RDD是根據哈希來分區的,相似於mapreduce當中的Paritioner接口,控制key分到哪一個reduce。
  5)可選:每個分片的優先計算位置(preferred locations),好比HDFS的block的所在位置應該是優先計算的位置。(存儲的是一個表,能夠將處理的分區「本地化」)

2. RDD操做

2.1 轉化操做

RDD的轉化操做是返回新RDD的操做, 經常使用轉化操做總結以下:分佈式

表1: 對一個數據爲{1,2,3,3}的RDD進行基本的轉化操做svg

函數名 目的 示例 結果
map() 將函數應用於RDD中每一個元素, 將返回值構成新的RDD rdd.map(x=>x+1) {2,3,4,5}
flatMap() 將函數應用於RDD中的每一個元素, 將返回的迭代器的全部內容構成新的RDD, 經常使用來切分單詞 rdd.flatMap(x=>x.to(2)) {1,2,2}
filter() 返回一個經過傳入給filter()的函數的元素組成的RDD rdd.filter(x=> x>2) {3,3}
distinct() 去重 rdd.distinct() {1,2,3}
sample(withReplacement, fraction, [seed]) 對RDD採樣, 以及是否替換 rdd.sample(false, 0.5) 非肯定的

表2: 對數據分別爲{1,2,3}和{2,3,4}RDD進行鍼對2個RDD的轉化操做函數

函數名 目的 示例 結果
redcue() 並行整合RDD中的全部元素 rdd.reduce((x, y) => x+y) 9
collect() 返回RDD中的全部元素 rdd.collect() {1,2,3,4}
count() 求RDD中的元素個數 rdd.count() 4
countByValue() 各元素在RDD中出現的次數 rdd.countByValue() {1,1}, {2, 1}, {3,2}
take(n) 從RDD中返回n個元素 rdd.take(2) {1,2}
top(n) 從RDD中返回前n個元素 rdd.top(3) {3,3,2}
foreach(func) 對RDD中的每一個元素使用給定的函數 rdd.foreach(print) 1,2,3,3
2.2 行動操做

RDD的行動操做會把最終求得的結果返回驅動器程序, 或者寫入外部存儲系統中。oop

表3: 對一個數據爲{1,2,3,3}的RDD進行基本RDD的行動操做性能

函數名 目的 示例 結果
redcue() 並行整合RDD中的全部元素 rdd.reduce((x, y) => x+y) 9
collect() 返回RDD中的全部元素 rdd.collect() {1,2,3,4}
count() 求RDD中的元素個數 rdd.count() 4
countByValue() 各元素在RDD中出現的次數 rdd.countByValue() {1,1}, {2, 1}, {3,2}
take(n) 從RDD中返回n個元素 rdd.take(2) {1,2}
top(n) 從RDD中返回前n個元素 rdd.top(3) {3,3,2}
foreach(func) 對RDD中的每一個元素使用給定的函數 rdd.foreach(print) 1,2,3,3