Skip to content

Commit a0f6f76

Browse files
author
Nathan Kronenfeld
committed
Add parameter to RDD.toDebugString to allow detailed memory info to be shown or not. Default is for it not to be shown.
1 parent f8f565a commit a0f6f76

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
487487

488488
/** A description of this RDD and its recursive dependencies for debugging. */
489489
def toDebugString(): String = {
490-
rdd.toDebugString
490+
rdd.toDebugString()
491+
}
492+
493+
/** A description of this RDD and its recursive dependencies for debugging. */
494+
def toDebugString(debugMemory: Boolean): String = {
495+
rdd.toDebugString(debugMemory)
491496
}
492497

493498
/**

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,16 +1268,19 @@ abstract class RDD[T: ClassTag](
12681268
}
12691269

12701270
/** A description of this RDD and its recursive dependencies for debugging. */
1271-
def toDebugString: String = {
1271+
@DeveloperApi
1272+
def toDebugString (debugMemory: Boolean = false): String = {
12721273
// Get a debug description of an rdd without its children
12731274
def debugSelf (rdd: RDD[_]): Seq[String] = {
12741275
import Utils.bytesToString
12751276

12761277
val persistence = storageLevel.description
1277-
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
1278-
" CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
1279-
info.numCachedPartitions, bytesToString(info.memSize),
1280-
bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
1278+
val storageInfo = if (debugMemory)
1279+
rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
1280+
" CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
1281+
info.numCachedPartitions, bytesToString(info.memSize),
1282+
bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
1283+
else Array[String]()
12811284

12821285
s"$rdd [$persistence]" +: storageInfo
12831286
}

core/src/test/scala/org/apache/spark/CheckpointSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -255,13 +255,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
255255
val partitionsBeforeCheckpoint = operatedRDD.partitions
256256

257257
// Find serialized sizes before and after the checkpoint
258-
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
258+
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString())
259259
val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
260260
operatedRDD.checkpoint()
261261
val result = operatedRDD.collect()
262262
operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
263263
val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
264-
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
264+
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString())
265265

266266
// Test whether the checkpoint file has been created
267267
assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
@@ -313,13 +313,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
313313
initializeRdd(operatedRDD)
314314

315315
// Find serialized sizes before and after the checkpoint
316-
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
316+
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString())
317317
val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
318318
parentRDDs.foreach(_.checkpoint()) // checkpoint the parent RDD, not the generated one
319319
val result = operatedRDD.collect() // force checkpointing
320320
operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
321321
val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
322-
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
322+
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString())
323323

324324
// Test whether the data in the checkpointed RDD is same as original
325325
assert(operatedRDD.collect() === result)

0 commit comments

Comments
 (0)