Skip to content

Commit fba8ec3

Browse files
Nathan Kronenfeldpwendell
authored andcommitted
Add caching information to rdd.toDebugString
I find it useful to see where in an RDD's DAG data is cached, so I figured others might too. I've added both the caching level, and the actual memory state of the RDD. Some of this is redundant with the web UI (notably the actual memory state), but (a) that is temporary, and (b) putting it in the DAG tree shows some context that can help a lot. For example: ``` (4) ShuffledRDD[3] at reduceByKey at <console>:14 +-(4) MappedRDD[2] at map at <console>:14 | MapPartitionsRDD[1] at mapPartitions at <console>:12 | ParallelCollectionRDD[0] at parallelize at <console>:12 ``` should change to ``` (4) ShuffledRDD[3] at reduceByKey at <console>:14 [Memory Deserialized 1x Replicated] | CachedPartitions: 4; MemorySize: 50.8 MB; TachyonSize: 0.0 B; DiskSize: 0.0 B +-(4) MappedRDD[2] at map at <console>:14 [Memory Deserialized 1x Replicated] | MapPartitionsRDD[1] at mapPartitions at <console>:12 [Memory Deserialized 1x Replicated] | CachedPartitions: 4; MemorySize: 109.1 MB; TachyonSize: 0.0 B; DiskSize: 0.0 B | ParallelCollectionRDD[0] at parallelize at <console>:12 [Memory Deserialized 1x Replicated] ``` Author: Nathan Kronenfeld <[email protected]> Closes #1535 from nkronenfeld/feature/debug-caching2 and squashes the following commits: 40490bc [Nathan Kronenfeld] Back out DeveloperAPI and arguments to RDD.toDebugString, reinstate memory output 794e6a3 [Nathan Kronenfeld] Attempt to merge mima changes from master 6fe9e80 [Nathan Kronenfeld] Add exclusions to allow for signature change in toDebugString (will back out if necessary) 31d6769 [Nathan Kronenfeld] Attempt to get rid of style errors. Add comments for the new memory usage parameter. a0f6f76 [Nathan Kronenfeld] Add parameter to RDD.toDebugString to allow detailed memory info to be shown or not. Default is for it not to be shown. f8f565a [Nathan Kronenfeld] Fix code style error 8f54287 [Nathan Kronenfeld] Changed string addition to string interpolation as per PR comments 2a0cd4d [Nathan Kronenfeld] Fixed a small formatting issue I forgot to copy over from the old branch 8fbecb6 [Nathan Kronenfeld] Add caching information to rdd.toDebugString
1 parent e1b85f3 commit fba8ec3

File tree

1 file changed

+25
-5
lines changed
  • core/src/main/scala/org/apache/spark/rdd

1 file changed

+25
-5
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,6 +1299,19 @@ abstract class RDD[T: ClassTag](
12991299

13001300
/** A description of this RDD and its recursive dependencies for debugging. */
13011301
def toDebugString: String = {
1302+
// Get a debug description of an rdd without its children
1303+
def debugSelf (rdd: RDD[_]): Seq[String] = {
1304+
import Utils.bytesToString
1305+
1306+
val persistence = storageLevel.description
1307+
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
1308+
" CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
1309+
info.numCachedPartitions, bytesToString(info.memSize),
1310+
bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
1311+
1312+
s"$rdd [$persistence]" +: storageInfo
1313+
}
1314+
13021315
// Apply a different rule to the last child
13031316
def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = {
13041317
val len = rdd.dependencies.length
@@ -1324,7 +1337,11 @@ abstract class RDD[T: ClassTag](
13241337
val partitionStr = "(" + rdd.partitions.size + ")"
13251338
val leftOffset = (partitionStr.length - 1) / 2
13261339
val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))
1327-
Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
1340+
1341+
debugSelf(rdd).zipWithIndex.map{
1342+
case (desc: String, 0) => s"$partitionStr $desc"
1343+
case (desc: String, _) => s"$nextPrefix $desc"
1344+
} ++ debugChildren(rdd, nextPrefix)
13281345
}
13291346
def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
13301347
val partitionStr = "(" + rdd.partitions.size + ")"
@@ -1334,17 +1351,20 @@ abstract class RDD[T: ClassTag](
13341351
thisPrefix
13351352
+ (if (isLastChild) " " else "| ")
13361353
+ (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)))
1337-
Seq(thisPrefix + "+-" + partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
1354+
1355+
debugSelf(rdd).zipWithIndex.map{
1356+
case (desc: String, 0) => s"$thisPrefix+-$partitionStr $desc"
1357+
case (desc: String, _) => s"$nextPrefix$desc"
1358+
} ++ debugChildren(rdd, nextPrefix)
13381359
}
13391360
def debugString(rdd: RDD[_],
13401361
prefix: String = "",
13411362
isShuffle: Boolean = true,
13421363
isLastChild: Boolean = false): Seq[String] = {
13431364
if (isShuffle) {
13441365
shuffleDebugString(rdd, prefix, isLastChild)
1345-
}
1346-
else {
1347-
Seq(prefix + rdd) ++ debugChildren(rdd, prefix)
1366+
} else {
1367+
debugSelf(rdd).map(prefix + _) ++ debugChildren(rdd, prefix)
13481368
}
13491369
}
13501370
firstDebugString(this).mkString("\n")

0 commit comments

Comments
 (0)