Skip to content

Commit b92752b

Browse files
author
Evan Chan
committed
SPARK-1154: Add a periodic task to clean up app directories
This adds two config params: spark.worker.cleanup_interval spark.worker.app_data_ttl
1 parent 47ebea5 commit b92752b

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ private[spark] class Worker(
6464
val REGISTRATION_TIMEOUT = 20.seconds
6565
val REGISTRATION_RETRIES = 3
6666

67+
// How often worker will clean up old app folders
68+
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup_interval", 60 * 30) * 1000
69+
// TTL for app folders/data; after TTL expires it will be cleaned up
70+
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.app_data_ttl", 15 * 24 * 3600)
71+
6772
// Index into masterUrls that we're currently trying to register with.
6873
var masterIndex = 0
6974

@@ -179,12 +184,26 @@ private[spark] class Worker(
179184
registered = true
180185
changeMaster(masterUrl, masterWebUiUrl)
181186
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
187+
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
188+
CLEANUP_INTERVAL_MILLIS millis, self, Worker.AppDirCleanup)
182189

183190
case SendHeartbeat =>
184191
masterLock.synchronized {
185192
if (connected) { master ! Heartbeat(workerId) }
186193
}
187194

195+
case Worker.AppDirCleanup =>
196+
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
197+
val cleanupFuture = concurrent.future {
198+
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
199+
Utils.findOldestFiles(workDir, APP_DATA_RETENTION_SECS)
200+
.foreach(Utils.deleteRecursively(_))
201+
}
202+
cleanupFuture onFailure {
203+
case e: Throwable =>
204+
logError("App dir cleanup failed: " + e.getMessage, e)
205+
}
206+
188207
case MasterChanged(masterUrl, masterWebUiUrl) =>
189208
logInfo("Master has changed, new master is at " + masterUrl)
190209
changeMaster(masterUrl, masterWebUiUrl)
@@ -331,6 +350,7 @@ private[spark] class Worker(
331350
}
332351

333352
private[spark] object Worker {
353+
case object AppDirCleanup // Sent to Worker actor periodically for cleaning up app folders
334354

335355
def main(argStrings: Array[String]) {
336356
val args = new WorkerArguments(argStrings)

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,22 @@ private[spark] object Utils extends Logging {
536536
}
537537
}
538538

539+
/**
540+
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
541+
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
542+
* @param cutoff filter for files is lastModified < (currentTimeMillis/1000 - cutoff)
543+
*/
544+
def findOldestFiles(dir: File, cutoff: Long): Seq[File] = {
545+
if (dir.isDirectory) {
546+
val files = listFilesSafely(dir)
547+
files.filter { file =>
548+
file.lastModified < ((System.currentTimeMillis / 1000) - cutoff)
549+
}
550+
} else {
551+
throw new IllegalArgumentException(dir + " is not a directory!")
552+
}
553+
}
554+
539555
/**
540556
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
541557
*/

0 commit comments

Comments
 (0)