Skip to content

Commit b08b3c9

Browse files
committed
Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint
1 parent e5949c2 commit b08b3c9

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
236236
try {
237237
logDebug("Cleaning rdd checkpoint data " + rddId)
238238
RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
239+
listeners.foreach(_.checkpointCleaned(rddId))
239240
logInfo("Cleaned rdd checkpoint data " + rddId)
240241
}
241242
catch {
@@ -260,4 +261,5 @@ private[spark] trait CleanerListener {
260261
def shuffleCleaned(shuffleId: Int)
261262
def broadcastCleaned(broadcastId: Long)
262263
def accumCleaned(accId: Long)
264+
def checkpointCleaned(rddId: Long)
263265
}

core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,12 +406,14 @@ class CleanerTester(
406406
sc: SparkContext,
407407
rddIds: Seq[Int] = Seq.empty,
408408
shuffleIds: Seq[Int] = Seq.empty,
409-
broadcastIds: Seq[Long] = Seq.empty)
409+
broadcastIds: Seq[Long] = Seq.empty,
410+
checkpointIds: Seq[Long] = Seq.empty)
410411
extends Logging {
411412

412413
val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
413414
val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
414415
val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
416+
val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
415417
val isDistributed = !sc.isLocal
416418

417419
val cleanerListener = new CleanerListener {
@@ -433,6 +435,11 @@ class CleanerTester(
433435
def accumCleaned(accId: Long): Unit = {
434436
logInfo("Cleaned accId " + accId + " cleaned")
435437
}
438+
439+
def checkpointCleaned(rddId: Long): Unit = {
440+
toBeCheckpointIds -= rddId
441+
logInfo("checkpoint rddId " + rddId + " cleaned")
442+
}
436443
}
437444

438445
val MAX_VALIDATION_ATTEMPTS = 10
@@ -547,7 +554,8 @@ class CleanerTester(
547554
private def isAllCleanedUp =
548555
toBeCleanedRDDIds.isEmpty &&
549556
toBeCleanedShuffleIds.isEmpty &&
550-
toBeCleanedBroadcstIds.isEmpty
557+
toBeCleanedBroadcstIds.isEmpty &&
558+
toBeCheckpointIds.isEmpty
551559

552560
private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
553561
blockManager.master.getMatchingBlockIds( _ match {

0 commit comments

Comments
 (0)