@@ -20,11 +20,12 @@ package org.apache.spark
2020import scala .collection .mutable .{ArrayBuffer , HashSet }
2121
2222import org .apache .spark .rdd .RDD
23- import org .apache .spark .storage .{BlockManager , RDDBlockId , StorageLevel }
23+ import org .apache .spark .storage .{BlockId , BlockManager , BlockStatus , RDDBlockId , StorageLevel }
2424
25- /** Spark class responsible for passing RDDs split contents to the BlockManager and making
26- sure a node doesn't load two copies of an RDD at once.
27- */
25+ /**
26+ * Spark class responsible for passing RDDs split contents to the BlockManager and making
27+ * sure a node doesn't load two copies of an RDD at once.
28+ */
2829private [spark] class CacheManager (blockManager : BlockManager ) extends Logging {
2930
3031 /** Keys of RDD splits that are being computed/loaded. */
@@ -49,11 +50,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4950 try {loading.wait()} catch {case _ : Throwable => }
5051 }
5152 logInfo(" Finished waiting for %s" .format(key))
52- // See whether someone else has successfully loaded it. The main way this would fail
53- // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
54- // partition but we didn't want to make space for it. However, that case is unlikely
55- // because it's unlikely that two threads would work on the same RDD partition. One
56- // downside of the current code is that threads wait serially if this does happen.
53+ /* See whether someone else has successfully loaded it. The main way this would fail
54+ * is for the RDD-level cache eviction policy if someone else has loaded the same RDD
55+ * partition but we didn't want to make space for it. However, that case is unlikely
56+ * because it's unlikely that two threads would work on the same RDD partition. One
57+ * downside of the current code is that threads wait serially if this does happen. */
5758 blockManager.get(key) match {
5859 case Some (values) =>
5960 return new InterruptibleIterator (context, values.asInstanceOf [Iterator [T ]])
@@ -69,32 +70,45 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
6970 // If we got here, we have to load the split
7071 logInfo(" Partition %s not found, computing it" .format(key))
7172 val computedValues = rdd.computeOrReadCheckpoint(split, context)
73+
7274 // Persist the result, so long as the task is not running locally
7375 if (context.runningLocally) { return computedValues }
74- if (storageLevel.useDisk && ! storageLevel.useMemory) {
75- // In the case that this RDD is to be persisted using DISK_ONLY
76- // the iterator will be passed directly to the blockManager (rather then
77- // caching it to an ArrayBuffer first), then the resulting block data iterator
78- // will be passed back to the user. If the iterator generates a lot of data,
79- // this means that it doesn't all have to be held in memory at one time.
80- // This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
81- // blocks aren't dropped by the block store before enabling that.
82- blockManager.put(key, computedValues, storageLevel, tellMaster = true )
83- return blockManager.get(key) match {
84- case Some (values) =>
85- return new InterruptibleIterator (context, values.asInstanceOf [Iterator [T ]])
86- case None =>
87- logInfo(" Failure to store %s" .format(key))
88- throw new Exception (" Block manager failed to return persisted valued" )
76+
77+ // Keep track of blocks with updated statuses
78+ var updatedBlocks = Seq [(BlockId , BlockStatus )]()
79+ val returnValue : Iterator [T ] = {
80+ if (storageLevel.useDisk && ! storageLevel.useMemory) {
81+ /* In the case that this RDD is to be persisted using DISK_ONLY
82+ * the iterator will be passed directly to the blockManager (rather then
83+ * caching it to an ArrayBuffer first), then the resulting block data iterator
84+ * will be passed back to the user. If the iterator generates a lot of data,
85+ * this means that it doesn't all have to be held in memory at one time.
86+ * This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
87+ * blocks aren't dropped by the block store before enabling that. */
88+ updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true )
89+ blockManager.get(key) match {
90+ case Some (values) =>
91+ new InterruptibleIterator (context, values.asInstanceOf [Iterator [T ]])
92+ case None =>
93+ logInfo(" Failure to store %s" .format(key))
94+ throw new Exception (" Block manager failed to return persisted valued" )
95+ }
96+ } else {
97+ // In this case the RDD is cached to an array buffer. This will save the results
98+ // if we're dealing with a 'one-time' iterator
99+ val elements = new ArrayBuffer [Any ]
100+ elements ++= computedValues
101+ updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true )
102+ elements.iterator.asInstanceOf [Iterator [T ]]
89103 }
90- } else {
91- // In this case the RDD is cached to an array buffer. This will save the results
92- // if we're dealing with a 'one-time' iterator
93- val elements = new ArrayBuffer [Any ]
94- elements ++= computedValues
95- blockManager.put(key, elements, storageLevel, tellMaster = true )
96- return elements.iterator.asInstanceOf [Iterator [T ]]
97104 }
105+
106+ // Update task metrics to include any blocks whose storage status is updated
107+ val metrics = context.taskMetrics
108+ metrics.updatedBlocks = Some (updatedBlocks)
109+
110+ returnValue
111+
98112 } finally {
99113 loading.synchronized {
100114 loading.remove(key)
0 commit comments