Skip to content

Commit f2f6027

Browse files
author
Evan Chan
committed
CR from @andrewor14
1 parent 553d8c2 commit f2f6027

File tree

3 files changed

+11
-9
lines changed

3 files changed

+11
-9
lines changed

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ private[deploy] object DeployMessages {
8686

8787
case class KillDriver(driverId: String) extends DeployMessage
8888

89+
// Worker internal
90+
91+
case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
92+
8993
// AppClient to Master
9094

9195
case class RegisterApplication(appDescription: ApplicationDescription)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ private[spark] class Worker(
6565
val REGISTRATION_RETRIES = 3
6666

6767
// How often worker will clean up old app folders
68-
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup_interval", 60 * 30) * 1000
68+
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanupInterval", 60 * 30) * 1000
6969
// TTL for app folders/data; after TTL expires it will be cleaned up
70-
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.app_data_ttl", 7 * 24 * 3600)
70+
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.appDataTTL", 7 * 24 * 3600)
7171

7272
// Index into masterUrls that we're currently trying to register with.
7373
var masterIndex = 0
@@ -185,19 +185,19 @@ private[spark] class Worker(
185185
changeMaster(masterUrl, masterWebUiUrl)
186186
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
187187
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
188-
CLEANUP_INTERVAL_MILLIS millis, self, Worker.AppDirCleanup)
188+
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
189189

190190
case SendHeartbeat =>
191191
masterLock.synchronized {
192192
if (connected) { master ! Heartbeat(workerId) }
193193
}
194194

195-
case Worker.AppDirCleanup =>
195+
case WorkDirCleanup =>
196196
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
197197
val cleanupFuture = concurrent.future {
198198
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
199199
Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
200-
.foreach(Utils.deleteRecursively(_))
200+
.foreach(Utils.deleteRecursively)
201201
}
202202
cleanupFuture onFailure {
203203
case e: Throwable =>
@@ -350,8 +350,6 @@ private[spark] class Worker(
350350
}
351351

352352
private[spark] object Worker {
353-
case object AppDirCleanup // Sent to Worker actor periodically for cleaning up app folders
354-
355353
def main(argStrings: Array[String]) {
356354
val args = new WorkerArguments(argStrings)
357355
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

docs/configuration.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -587,15 +587,15 @@ Apart from these, the following properties are also available, and may be useful
587587
</td>
588588
</tr>
589589
<tr>
590-
<td>spark.worker.cleanup_interval</td>
590+
<td>spark.worker.cleanupInterval</td>
591591
<td>1800 (30 minutes)</td>
592592
<td>
593593
Controls the interval, in seconds, at which the worker cleans up old application work dirs
594594
on the local machine.
595595
</td>
596596
</tr>
597597
<tr>
598-
<td>spark.worker.app_data_ttl</td>
598+
<td>spark.worker.appDataTTL</td>
599599
<td>7 * 24 * 3600 (7 days)</td>
600600
<td>
601601
The number of seconds to retain application work directories on each worker. This is a Time To Live

0 commit comments

Comments
 (0)