Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
listeners.foreach(_.checkpointCleaned(rddId))
logInfo("Cleaned rdd checkpoint data " + rddId)
}
catch {
Expand All @@ -260,4 +261,5 @@ private[spark] trait CleanerListener {
def shuffleCleaned(shuffleId: Int)
def broadcastCleaned(broadcastId: Long)
def accumCleaned(accId: Long)
def checkpointCleaned(rddId: Long)
}
21 changes: 15 additions & 6 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
assert(fs.exists(path))

// the checkpoint is not cleaned by default (without the configuration set)
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil)
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
Expand All @@ -245,7 +245,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))

// Test that GC causes checkpoint data cleanup after dereferencing the RDD
postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen This is just a test code bug. new CleanerTester(sc, Seq(rddId), Nil, Nil) This code is only to ensure that the RDD is cleaned, but checkpoint. rdd and checkpoint almost simultaneously be cleaned, But there are exceptions, depending on the GC. The checkpointCleaned ensure the checkpoint is cleaned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this fix the assertion that failed? you add a new set of IDs that must now be counted down by a new callback, and you assert it ends up empty, which is good. But the assertion that failed was earlier:

     assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tips are more confusing.
The jira display the error in: ContextCleanerSuite.scala#L252.
The code in #L252 is assert(!fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, it is the following one. Still, how does this affect whether the checkpoint files exist?

rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
Expand Down Expand Up @@ -406,12 +406,14 @@ class CleanerTester(
sc: SparkContext,
rddIds: Seq[Int] = Seq.empty,
shuffleIds: Seq[Int] = Seq.empty,
broadcastIds: Seq[Long] = Seq.empty)
broadcastIds: Seq[Long] = Seq.empty,
checkpointIds: Seq[Long] = Seq.empty)
extends Logging {

val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal

val cleanerListener = new CleanerListener {
Expand All @@ -427,12 +429,17 @@ class CleanerTester(

def broadcastCleaned(broadcastId: Long): Unit = {
toBeCleanedBroadcstIds -= broadcastId
logInfo("Broadcast" + broadcastId + " cleaned")
logInfo("Broadcast " + broadcastId + " cleaned")
}

def accumCleaned(accId: Long): Unit = {
logInfo("Cleaned accId " + accId + " cleaned")
}

def checkpointCleaned(rddId: Long): Unit = {
toBeCheckpointIds -= rddId
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen When the checkpoint is cleaned, checkpointCleaned is called asynchronous, isAllCleanedUp returns true, and then postGCTester.assertCleanup() return.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, that's a good explanation. I see that this makes it effectively wait longer to proceed, by which time some necessary conditions are true, like that checkpoint cleanup has happened. Thank you for that detail, and to the limits of my knowledge, this looks good.

logInfo("checkpoint " + rddId + " cleaned")
}
}

val MAX_VALIDATION_ATTEMPTS = 10
Expand All @@ -456,7 +463,8 @@ class CleanerTester(

/** Verify that RDDs, shuffles, etc. occupy resources */
private def preCleanupValidate() {
assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, "Nothing to cleanup")
assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty ||
checkpointIds.nonEmpty, "Nothing to cleanup")

// Verify the RDDs have been persisted and blocks are present
rddIds.foreach { rddId =>
Expand Down Expand Up @@ -547,7 +555,8 @@ class CleanerTester(
private def isAllCleanedUp =
toBeCleanedRDDIds.isEmpty &&
toBeCleanedShuffleIds.isEmpty &&
toBeCleanedBroadcstIds.isEmpty
toBeCleanedBroadcstIds.isEmpty &&
toBeCheckpointIds.isEmpty

private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {
Expand Down