從零到一spark進階之路(一)web
RDD是spark特有的數據模型,談到RDD就會提到什麼彈性分佈式數據集,什麼有向無環圖,本文暫時不去展開這些高深概念。算法
最重要的記住,RRD是不可變的,也就是說,已有的RDD不能被修改或者更新,但能夠從已有的RDD轉化成一個新的RDD.緩存
上面的特性解讀:
RDD能夠cache到內存中,每次對RDD數據集的操做以後的結果,均可以存放到內存中,下一個操做能夠直接從內存中輸入,省去了MapReduce大量的磁盤IO操做。這對於迭代運算比較常見的機器學習算法, 交互式數據挖掘來講,效率提高比較大。
下面咱們來列舉如下RDD的基本特性:機器學習
1)建立:只能經過轉換 ( transformation ,如map/filter/groupBy/join 等,區別於動做 action) 從兩種數據源中建立 RDD 1 )穩定存儲中的數據; 2 )其餘 RDD。 2)只讀:狀態不可變,不能修改。 3)分區:支持使 RDD 中的元素根據那個 key 來分區 ( partitioning ) ,保存到多個結點上。還原時只會從新計算丟失分區的數據,而不會影響整個系統。 4)路徑:在 RDD 中叫世族或血統 ( lineage ) ,即 RDD 有充足的信息關於它是如何從其餘 RDD 產生而來的。 5)持久化:支持將會被重用的 RDD 緩存 ( 如 in-memory 或溢出到磁盤 )。 6)延遲計算: Spark 也會延遲計算 RDD ,使其可以將轉換管道化 (pipeline transformation)。 7)操做:豐富的轉換(transformation)和動做 ( action ) , count/reduce/collect/save 等。 執行了多少次transformation操做,RDD都不會真正執行運算(記錄lineage),只有當action操做被執行時,運算纔會觸發。
1)RDD只能從持久存儲或經過Transformations操做產生,相比於分佈式共享內存(DSM)能夠更高效實現容錯,對於丟失部分數據分區只需根據它的lineage就可從新計算出來,而不須要作特定的Checkpoint。 2)RDD的不變性,能夠實現類Hadoop MapReduce的推測式執行。 3)RDD的數據分區特性,能夠經過數據的本地性來提升性能,這不Hadoop MapReduce是同樣的。 4)RDD都是可序列化的,在內存不足時可自動降級爲磁盤存儲,把RDD存儲於磁盤上,這時性能會有大的降低但不會差於如今的MapReduce。 5)批量操做:任務可以根據數據本地性 (data locality) 被分配,從而提升性能。
經過RDD的內部屬性,用戶能夠獲取相應的元數據信息。經過這些信息能夠支持更復雜的算法或優化。 1)分區列表:經過分區列表能夠找到一個RDD中包含的全部分區及其所在地址。 2)計算每一個分片的函數:經過函數能夠對每一個數據塊進行RDD須要進行的用戶自定義函數運算。 3)對父RDD的依賴列表,依賴還具體分爲寬依賴和窄依賴,但並非全部的RDD都有依賴。 4)可選:key-value型的RDD是根據哈希來分區的,相似於mapreduce當中的Paritioner接口,控制key分到哪一個reduce。 5)可選:每個分片的優先計算位置(preferred locations),好比HDFS的block的所在位置應該是優先計算的位置。(存儲的是一個表,能夠將處理的分區「本地化」)
RDD的轉化操做是返回新RDD的操做, 經常使用轉化操做總結以下:分佈式
表1: 對一個數據爲{1,2,3,3}的RDD進行基本的轉化操做svg
函數名 | 目的 | 示例 | 結果 |
---|---|---|---|
map() | 將函數應用於RDD中每一個元素, 將返回值構成新的RDD | rdd.map(x=>x+1) | {2,3,4,5} |
flatMap() | 將函數應用於RDD中的每一個元素, 將返回的迭代器的全部內容構成新的RDD, 經常使用來切分單詞 | rdd.flatMap(x=>x.to(2)) | {1,2,2} |
filter() | 返回一個經過傳入給filter()的函數的元素組成的RDD | rdd.filter(x=> x>2) | {3,3} |
distinct() | 去重 | rdd.distinct() | {1,2,3} |
sample(withReplacement, fraction, [seed]) | 對RDD採樣, 以及是否替換 | rdd.sample(false, 0.5) | 非肯定的 |
表2: 對數據分別爲{1,2,3}和{2,3,4}RDD進行鍼對2個RDD的轉化操做函數
函數名 | 目的 | 示例 | 結果 |
---|---|---|---|
redcue() | 並行整合RDD中的全部元素 | rdd.reduce((x, y) => x+y) | 9 |
collect() | 返回RDD中的全部元素 | rdd.collect() | {1,2,3,4} |
count() | 求RDD中的元素個數 | rdd.count() | 4 |
countByValue() | 各元素在RDD中出現的次數 | rdd.countByValue() | {1,1}, {2, 1}, {3,2} |
take(n) | 從RDD中返回n個元素 | rdd.take(2) | {1,2} |
top(n) | 從RDD中返回前n個元素 | rdd.top(3) | {3,3,2} |
foreach(func) | 對RDD中的每一個元素使用給定的函數 | rdd.foreach(print) | 1,2,3,3 |
RDD的行動操做會把最終求得的結果返回驅動器程序, 或者寫入外部存儲系統中。oop
表3: 對一個數據爲{1,2,3,3}的RDD進行基本RDD的行動操做性能
函數名 | 目的 | 示例 | 結果 |
---|---|---|---|
redcue() | 並行整合RDD中的全部元素 | rdd.reduce((x, y) => x+y) | 9 |
collect() | 返回RDD中的全部元素 | rdd.collect() | {1,2,3,4} |
count() | 求RDD中的元素個數 | rdd.count() | 4 |
countByValue() | 各元素在RDD中出現的次數 | rdd.countByValue() | {1,1}, {2, 1}, {3,2} |
take(n) | 從RDD中返回n個元素 | rdd.take(2) | {1,2} |
top(n) | 從RDD中返回前n個元素 | rdd.top(3) | {3,3,2} |
foreach(func) | 對RDD中的每一個元素使用給定的函數 | rdd.foreach(print) | 1,2,3,3 |