Skip to content

Commit 1440154

Browse files
Evan Chanpwendell
authored andcommitted
SPARK-1154: Clean up app folders in worker nodes
This is a fix for [SPARK-1154](https://issues.apache.org/jira/browse/SPARK-1154). The issue is that worker nodes fill up with a huge number of app-* folders after some time. This change adds a periodic cleanup task which asynchronously deletes app directories older than a configurable TTL. Two new configuration parameters have been introduced: spark.worker.cleanup_interval spark.worker.app_data_ttl This change does not include moving the downloads of application jars to a location outside of the work directory. We will address that if we have time, but that potentially involves caching so it will come either as part of this PR or a separate PR. Author: Evan Chan <[email protected]> Author: Kelvin Chu <[email protected]> Closes #288 from velvia/SPARK-1154-cleanup-app-folders and squashes the following commits: 0689995 [Evan Chan] CR from @aarondav - move config, clarify for standalone mode 9f10d96 [Evan Chan] CR from @pwendell - rename configs and add cleanup.enabled f2f6027 [Evan Chan] CR from @andrewor14 553d8c2 [Kelvin Chu] change the variable name to currentTimeMillis since it actually tracks in seconds 8dc9cb5 [Kelvin Chu] Fixed a bug in Utils.findOldFiles() after merge. cb52f2b [Kelvin Chu] Change the name of findOldestFiles() to findOldFiles() 72f7d2d [Kelvin Chu] Fix a bug of Utils.findOldestFiles(). file.lastModified is returned in milliseconds. ad99955 [Kelvin Chu] Add unit test for Utils.findOldestFiles() dc1a311 [Evan Chan] Don't recompute current time with every new file e3c408e [Evan Chan] Document the two new settings b92752b [Evan Chan] SPARK-1154: Add a periodic task to clean up app directories
1 parent 4106558 commit 1440154

File tree

5 files changed

+83
-4
lines changed

5 files changed

+83
-4
lines changed

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ private[deploy] object DeployMessages {
8686

8787
case class KillDriver(driverId: String) extends DeployMessage
8888

89+
// Worker internal
90+
91+
case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
92+
8993
// AppClient to Master
9094

9195
case class RegisterApplication(appDescription: ApplicationDescription)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ private[spark] class Worker(
6464
val REGISTRATION_TIMEOUT = 20.seconds
6565
val REGISTRATION_RETRIES = 3
6666

67+
val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true)
68+
// How often worker will clean up old app folders
69+
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
70+
// TTL for app folders/data; after TTL expires it will be cleaned up
71+
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
72+
6773
// Index into masterUrls that we're currently trying to register with.
6874
var masterIndex = 0
6975

@@ -179,12 +185,28 @@ private[spark] class Worker(
179185
registered = true
180186
changeMaster(masterUrl, masterWebUiUrl)
181187
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
188+
if (CLEANUP_ENABLED) {
189+
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
190+
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
191+
}
182192

183193
case SendHeartbeat =>
184194
masterLock.synchronized {
185195
if (connected) { master ! Heartbeat(workerId) }
186196
}
187197

198+
case WorkDirCleanup =>
199+
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
200+
val cleanupFuture = concurrent.future {
201+
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
202+
Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
203+
.foreach(Utils.deleteRecursively)
204+
}
205+
cleanupFuture onFailure {
206+
case e: Throwable =>
207+
logError("App dir cleanup failed: " + e.getMessage, e)
208+
}
209+
188210
case MasterChanged(masterUrl, masterWebUiUrl) =>
189211
logInfo("Master has changed, new master is at " + masterUrl)
190212
changeMaster(masterUrl, masterWebUiUrl)
@@ -331,7 +353,6 @@ private[spark] class Worker(
331353
}
332354

333355
private[spark] object Worker {
334-
335356
def main(argStrings: Array[String]) {
336357
val args = new WorkerArguments(argStrings)
337358
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,9 +597,24 @@ private[spark] object Utils extends Logging {
597597
}
598598

599599
if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
600-
return false;
600+
return false
601601
} else {
602-
return true;
602+
return true
603+
}
604+
}
605+
606+
/**
607+
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
608+
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
609+
* @param cutoff measured in seconds. Files older than this are returned.
610+
*/
611+
def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
612+
val currentTimeMillis = System.currentTimeMillis
613+
if (dir.isDirectory) {
614+
val files = listFilesSafely(dir)
615+
files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
616+
} else {
617+
throw new IllegalArgumentException(dir + " is not a directory!")
603618
}
604619
}
605620

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.util
1919

2020
import scala.util.Random
2121

22-
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
22+
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
2323
import java.nio.{ByteBuffer, ByteOrder}
2424

2525
import com.google.common.base.Charsets
@@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite {
154154
val iterator = Iterator.range(0, 5)
155155
assert(Utils.getIteratorSize(iterator) === 5L)
156156
}
157+
158+
test("findOldFiles") {
159+
// create some temporary directories and files
160+
val parent: File = Utils.createTempDir()
161+
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
162+
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
163+
// set the last modified time of child1 to 10 secs old
164+
child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
165+
166+
val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
167+
assert(result.size.equals(1))
168+
assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
169+
}
157170
}
158171

docs/configuration.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,32 @@ Apart from these, the following properties are also available, and may be useful
348348
receives no heartbeats.
349349
</td>
350350
</tr>
351+
<tr>
352+
<td>spark.worker.cleanup.enabled</td>
353+
<td>true</td>
354+
<td>
355+
Enable periodic cleanup of worker / application directories. Note that this only affects standalone
356+
mode, as YARN works differently.
357+
</td>
358+
</tr>
359+
<tr>
360+
<td>spark.worker.cleanup.interval</td>
361+
<td>1800 (30 minutes)</td>
362+
<td>
363+
Controls the interval, in seconds, at which the worker cleans up old application work dirs
364+
on the local machine.
365+
</td>
366+
</tr>
367+
<tr>
368+
<td>spark.worker.cleanup.appDataTtl</td>
369+
<td>7 * 24 * 3600 (7 days)</td>
370+
<td>
371+
The number of seconds to retain application work directories on each worker. This is a Time To Live
372+
and should depend on the amount of available disk space you have. Application logs and jars are
373+
downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space,
374+
especially if you run jobs very frequently.
375+
</td>
376+
</tr>
351377
<tr>
352378
<td>spark.akka.frameSize</td>
353379
<td>10</td>

0 commit comments

Comments
 (0)