diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 1b7bb8af79a9..9290b5b36a8f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -35,7 +35,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, WrappedArray} import scala.concurrent.duration._ import scala.util.control.NonFatal -import com.google.common.cache.CacheBuilder +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification} import com.google.common.util.concurrent.ThreadFactoryBuilder import org.slf4j.MDC @@ -196,11 +196,24 @@ private[spark] class Executor( // Classloader isolation // The default isolation group - val defaultSessionState = newSessionState(JobArtifactState("default", None)) + val defaultSessionState: IsolatedSessionState = newSessionState(JobArtifactState("default", None)) - val isolatedSessionCache = CacheBuilder.newBuilder() + val isolatedSessionCache: Cache[String, IsolatedSessionState] = CacheBuilder.newBuilder() .maximumSize(100) .expireAfterAccess(30, TimeUnit.MINUTES) + .removalListener(new RemovalListener[String, IsolatedSessionState]() { + override def onRemoval( + notification: RemovalNotification[String, IsolatedSessionState]): Unit = { + val state = notification.getValue + // Cache is always used for isolated sessions. + assert(!isDefaultState(state.sessionUUID)) + val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), state.sessionUUID) + if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) { + Utils.deleteRecursively(sessionBasedRoot) + } + logInfo(s"Session evicted: ${state.sessionUUID}") + } + }) .build[String, IsolatedSessionState] // Set the classloader for serializer