Skip to content

Commit 331f05e

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7330][CARMEL-4598] Refine Context Cleaner to improve the clean performance (apache#93)
1 parent c7508aa commit 331f05e

File tree

8 files changed

+319
-119
lines changed

8 files changed

+319
-119
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 274 additions & 109 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1780,6 +1780,12 @@ package object config {
17801780
.booleanConf
17811781
.createWithDefault(false)
17821782

1783+
private[spark] val CLEANER_REFERENCE_TRACKING_BLOCKING_BROADCAST =
1784+
ConfigBuilder("spark.cleaner.referenceTracking.blocking.broadcast")
1785+
.version("3.5.0")
1786+
.booleanConf
1787+
.createWithDefault(false)
1788+
17831789
private[spark] val CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS =
17841790
ConfigBuilder("spark.cleaner.referenceTracking.cleanCheckpoints")
17851791
.version("1.4.0")

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1847,7 +1847,9 @@ abstract class RDD[T: ClassTag](
18471847
dep match {
18481848
case dependency: ShuffleDependency[_, _, _] =>
18491849
val shuffleId = dependency.shuffleId
1850-
cleaner.doCleanupShuffle(shuffleId, blocking)
1850+
cleaner.getContextCleanupWorker(classOf[CleanShuffle].getName).
1851+
asInstanceOf[ShuffleCleanupWorker].
1852+
doCleanupShuffle(shuffleId, blocking)
18511853
case _ => // do nothing
18521854
}
18531855
val rdd = dep.rdd

core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
122122
val tester = new CleanerTester(sc, rddIds = Seq(rdd.id))
123123

124124
// Explicit cleanup
125-
cleaner.doCleanupRDD(rdd.id, blocking = true)
125+
cleaner.getContextCleanupWorker(classOf[CleanRDD].getName).
126+
asInstanceOf[RDDCleanupWorker].
127+
doCleanupRDD(rdd.id, blocking = true)
126128
tester.assertCleanup()
127129

128130
// Verify that RDDs can be re-executed after cleaning up
@@ -135,7 +137,8 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
135137
val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))
136138

137139
// Explicit cleanup
138-
shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true))
140+
shuffleDeps.foreach(s => cleaner.getContextCleanupWorker(classOf[CleanShuffle].getName).
141+
asInstanceOf[ShuffleCleanupWorker].doCleanupShuffle(s.shuffleId, blocking = true))
139142
tester.assertCleanup()
140143

141144
// Verify that shuffles can be re-executed after cleaning up
@@ -147,7 +150,8 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
147150
val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
148151

149152
// Explicit cleanup
150-
cleaner.doCleanupBroadcast(broadcast.id, blocking = true)
153+
cleaner.getContextCleanupWorker(classOf[CleanBroadcast].getName).
154+
asInstanceOf[BroadcastCleanupWorker].doCleanupBroadcast(broadcast.id, blocking = true)
151155
tester.assertCleanup()
152156
}
153157

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,12 +245,17 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
245245
assert(sc.env.blockManager.master.getExecutorEndpointRef("0").isEmpty)
246246
}
247247

248-
sc.cleaner.foreach(_.doCleanupShuffle(0, true))
248+
sc.cleaner.foreach(
249+
_.getContextCleanupWorker(classOf[CleanShuffle].getName).doCleanup(CleanShuffle(0)))
249250

250251
if (enabled) {
251-
assert(filesToCheck.forall(!_.exists()))
252+
eventually(timeout(2.seconds), interval(100.milliseconds)) {
253+
assert(filesToCheck.forall(!_.exists()))
254+
}
252255
} else {
253-
assert(filesToCheck.forall(_.exists()))
256+
eventually(timeout(2.seconds), interval(100.milliseconds)) {
257+
assert(filesToCheck.forall(_.exists()))
258+
}
254259
}
255260
} finally {
256261
rpcHandler.applicationRemoved(sc.conf.getAppId, true)

core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,10 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
182182
test("internal accumulators are registered for cleanups") {
183183
sc = new SparkContext("local", "test") {
184184
private val myCleaner = new SaveAccumContextCleaner(this)
185-
override def cleaner: Option[ContextCleaner] = Some(myCleaner)
185+
override def cleaner: Option[ContextCleaner] = {
186+
myCleaner.start()
187+
Some(myCleaner)
188+
}
186189
}
187190
assert(AccumulatorContext.numAccums == 0)
188191
sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ import org.mockito.ArgumentMatchers.any
2727
import org.mockito.Mockito._
2828
import org.mockito.invocation.InvocationOnMock
2929
import org.roaringbitmap.RoaringBitmap
30+
import org.scalatest.concurrent.Eventually.{eventually, interval}
31+
import org.scalatest.concurrent.Eventually
32+
import org.scalatest.time.SpanSugar._
3033

3134
import org.apache.spark.LocalSparkContext._
3235
import org.apache.spark.broadcast.BroadcastManager
@@ -966,8 +969,12 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
966969
true
967970
})
968971

969-
sc.cleaner.get.doCleanupShuffle(SHUFFLE_ID, blocking = true)
970-
assert(foundHosts.asScala == mergerLocs.map(_.host).toSet)
972+
sc.cleaner.foreach(
973+
_.getContextCleanupWorker(classOf[CleanShuffle].getName)
974+
.doCleanup(CleanShuffle(SHUFFLE_ID)))
975+
eventually(Eventually.timeout(2.seconds), interval(100.milliseconds)) {
976+
assert(foundHosts.asScala == mergerLocs.map(_.host).toSet)
977+
}
971978
}
972979
}
973980

sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,14 @@ private[v1] class SqlResource extends BaseAppResource {
118118
}
119119
}
120120

121+
@GET
122+
@Path("context-cleaner")
123+
def contextCleaner(): Seq[(String, Int)] = {
124+
withUI { ui =>
125+
ui.sc.get.cleaner.get.contextCleanupWorkerSummary()
126+
}
127+
}
128+
121129
private def printableMetrics(allNodes: collection.Seq[SparkPlanGraphNode],
122130
metricValues: Map[Long, String]): collection.Seq[Node] = {
123131

0 commit comments

Comments
 (0)