spark rdd緩存及緩存清理

首先咱們看看官方的定義和用法介紹html


RDD Persistence(持久化)java

Spark 中一個很重要的能力是將數據persisting持久化(或稱爲caching緩存),在多個操做間均可以訪問這些持久化的數據。當持久化一個 RDD 時,每一個節點的其它分區均可以使用 RDD 在內存中進行計算,在該數據上的其餘 action 操做將直接使用內存中的數據。這樣會讓之後的 action 操做計算速度加快(一般運行速度會加速 10 倍)。緩存是迭代算法和快速的交互式使用的重要工具。python

RDD 可使用persist()方法或cache()方法進行持久化。數據將會在第一次 action 操做時進行計算,並緩存在節點的內存中。Spark 的緩存具備容錯機制,若是一個緩存的 RDD 的某個分區丟失了,Spark 將按照原來的計算過程,自動從新計算並進行緩存。算法

另外,每一個持久化的 RDD 可使用不一樣的storage level存儲級別進行緩存,例如,持久化到磁盤、已序列化的 Java 對象形式持久化到內存(能夠節省空間)、跨節點間複製、以 off-heap 的方式存儲在 Tachyon。這些存儲級別經過傳遞一個StorageLevel對象 (Scala,Java,Python) 給persist()方法進行設置。cache()方法是使用默認存儲級別的快捷設置方法,默認的存儲級別是StorageLevel.MEMORY_ONLY(將反序列化的對象存儲到內存中)。詳細的存儲級別介紹以下:apache

Storage Level(存儲級別)Meaning(含義)api

MEMORY_ONLY  將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中. 若是內存空間不夠,部分數據分區將再也不緩存,在每次須要用到這些數據時從新進行計算. 這是默認的級別.數組

MEMORY_AND_DISK  將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中。若是內存空間不夠,將未緩存的數據分區存儲到磁盤,在須要使用這些分區時從磁盤讀取.緩存

MEMORY_ONLY_SER網絡

(Java and Scala)ide

將 RDD 以序列化的 Java 對象的形式進行存儲(每一個分區爲一個 byte 數組)。這種方式會比反序列化對象的方式節省不少空間,尤爲是在使用fast serializer時會節省更多的空間,可是在讀取時會增長 CPU 的計算負擔.

MEMORY_AND_DISK_SER

(Java and Scala)

相似於 MEMORY_ONLY_SER ,可是溢出的分區會存儲到磁盤,而不是在用到它們時從新計算.

DISK_ONLY  只在磁盤上緩存 RDD.

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.  與上面的級別功能相同,只不過每一個分區在集羣中兩個節點上創建副本.

OFF_HEAP (experimental 實驗性) 相似於 MEMORY_ONLY_SER, 可是將數據存儲在off-heap memory中. 這須要啓用 off-heap 內存.

Note:在 Python 中, stored objects will 老是使用Picklelibrary 來序列化對象, 因此不管你選擇序列化級別都不要緊. 在 Python 中可用的存儲級別有MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY, 和DISK_ONLY_2.

在 shuffle 操做中(例如reduceByKey),即使是用戶沒有調用persist方法,Spark 也會自動緩存部分中間數據.這麼作的目的是,在 shuffle 的過程當中某個節點運行失敗時,不須要從新計算全部的輸入數據。若是用戶想屢次使用某個 RDD,強烈推薦在該 RDD 上調用 persist 方法.

如何選擇存儲級別 ?

Spark 的存儲級別的選擇,核心問題是在 memory 內存使用率和 CPU 效率之間進行權衡。建議按下面的過程進行存儲級別的選擇:

若是您的 RDD 適合於默認存儲級別 (MEMORY_ONLY), leave them that way.這是CPU效率最高的選項,容許RDD上的操做盡量快地運行.

若是不是, 試着使用MEMORY_ONLY_SER和selecting a fast serialization library以使對象更加節省空間,但仍然可以快速訪問。 (Java和Scala)

不要溢出到磁盤,除非計算您的數據集的函數是昂貴的, 或者它們過濾大量的數據. 不然, 從新計算分區可能與從磁盤讀取分區同樣快.

若是須要快速故障恢復,請使用複製的存儲級別 (e.g. 若是使用Spark來服務來自網絡應用程序的請求).All存儲級別經過從新計算丟失的數據來提供完整的容錯能力,但複製的數據可以讓您繼續在 RDD 上運行任務,而無需等待從新計算一個丟失的分區.

刪除數據

Spark 會自動監視每一個節點上的緩存使用狀況,並使用 least-recently-used(LRU)的方式來丟棄舊數據分區。 若是您想手動刪除 RDD 而不是等待它掉出緩存,使用RDD.unpersist()方法。



而後咱們摘下要點:

緩存操做是lazy的,只有在active操做時才能觸發

```

final def iterator(split:Partition,context:TaskContext):Iterator[T]={

if(storageLevel!=StorageLevel.NONE){

getOrCompute(split,context)

}else{

computeOrReadCheckpoint(split,context)}

}

```

該方法返回一個迭代器,迭代可遍歷的全部數據。若是沒有被緩存,那麼就調用第二個方法。若是有緩存,緩存水平就不爲空。


若是有緩存進入

```

private[spark] def getOrCompute(partition:Partition,context:TaskContext):Iterator[T]=

{

val blockId=RDDBlockId(id,partition.index)

varreadCachedBlock=true

sc.env.SparkEnv.get.blockManager.getOrElseUpdate(blockId,storageLevel,elementClassTag,()=>{

readCachedBlock=falsecomputeOrReadCheckpoint(partition,context)

})

match{...}

}

```

該方法經過RDDid和partition獲得一個blockId,並經過blockManager獲得該block。


下面進入blockManager的方法中

```

def getOrElseUpdate[T](blockId:BlockId,

level:StorageLevel,

classTag:ClassTag[T],

makeIterator:()=>Iterator[T]):Either[BlockResult,Iterator[T]]={

get[T](blockId)(classTag)match{

case Some(block)=>returnLeft(block)

case_=>// Need to compute the block.}

block.doPutIterator(blockId,makeIterator,level,classTag,keepReadLock=true)

match{...}

}

若是沒有緩存,就會去checkpoint中查找,若是也沒有被check過,那麼之可以從新的計算了。


這段代碼說明了cache()和persist()的區別。

```

def cache():this.type=persist()

def persist():this.type=persist(StorageLevel.MEMORY_ONLY)

defpersist(newLevel:StorageLevel):this.type={

if(isLocallyCheckpointed){//該RDD以前被checkpoint過,說明RDD已經被緩存過。//咱們只須要直接覆蓋原來的存儲級別便可persist(LocalRDDCheckpointData.transformStorageLevel(newLevel),allowOverride=true)}

else{persist(newLevel,allowOverride=false)}}

privatedefpersist(newLevel:StorageLevel,allowOverride:Boolean):this.type={// 原來的存儲級別不爲NONE;新存儲級別!=原來的存儲界別;不容許覆蓋if(storageLevel!=StorageLevel.NONE&&newLevel!=storageLevel&&!allowOverride){thrownewUnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")}

if(storageLevel==StorageLevel.NONE){// 第一次調用persistsc.cleaner.foreach(_.registerRDDForCleanup(this))// 經過sc來清理註冊sc.persistRDD(this)//緩存RDD}storageLevel=newLevel//跟新存儲級別

this}

```

其實cache()內部調用了persist(),並且默認其爲Memory_Noly等級。


Spark緩存清理機制:

MetadataCleaner對象中有一個定時器,用於清理下列的元數據信息:

MAP_OUTPUT_TRACKER:Maptask的輸出元信息

SPARK_CONTEXT:persistentRdds中的rdd

HTTP_BROADCAST, http廣播的元數據

BLOCK_MANAGER:blockmanager中存儲的數據

SHUFFLE_BLOCK_MANAGER:shuffle的輸出數據

BROADCAST_VARS:Torrent方式廣播broadcast的元數據

contextcleaner清理真實數據:

ContextCleaner爲RDD、shuffle、broadcast、accumulator、Checkpoint維持了一個弱引用,當相關對象不可達時,就會將對象插入referenceQueue中。有一個單獨的線程來處理這個隊列中的對象。

RDD:最終從各節點的blockmanager的memoryStore、diskStore中刪除RDD數據

shuffle:刪除driver中的mapstatuses關於該shuffleId的信息;刪除全部節點中關於該shuffleId的全部分區的數據文件和索引文件

broadcast:最終從各節點的blockmanager的memoryStore、diskStore中刪除broadcast數據

Checkpoint:清理checkpointDir目錄下關於該rddId的文件

舉個RDD的例子,說明一下這樣作有什麼好處?

默認狀況下,RDD是不緩存的,即計算完以後,下一次用須要從新計算。若是要避免從新計算的開銷,就要將RDD緩存起來,這個道理誰都明白。可是,緩存的RDD何時去釋放呢?這就用到了上面提到的弱引用。當咱們調用persist緩存一個RDD時,會調用registerRDDForCleanup(this),這就是將自己的RDD註冊到一個弱引用中。當這個RDD變爲不可達時,會自動將該RDD對象插入到referenceQueue中,等到下次GC時就會走doCleanupRDD分支。RDD可能保存在內存或者磁盤中,這樣就能保證,不可達的RDD在GC到來時能夠釋放blockmanager中的RDD真實數據。

再考慮一下,何時RDD不可達了呢?爲了讓出內存供其餘地方使用,除了手動unpersist以外,須要有機制定時清理緩存的RDD數據,這就是MetadataCleaner的SPARK_CONTEXT乾的事情。它就是按期的清理persistentRdds中過時的數據,其實與unpersist產生的做用是同樣的。一旦清理了,那這個緩存的RDD就沒有強引用了。

清理機制原文:https://blog.csdn.net/u014033218/article/details/77853323