@@ -34,16 +34,17 @@ import org.apache.spark.deploy.master.DriverState.DriverState
3434import org .apache .spark .deploy .master .DriverState
3535import org .apache .spark .deploy .worker .DriverRunner
3636
37- import java .io .File
37+ import java .io .{ IOException , File }
3838import java .util .Date
3939import java .text .SimpleDateFormat
40+ import scala .collection .mutable
4041
41- /*
42- * A dispatcher actor that is responsible for managing drivers, that is intended to
43- * used for Mesos cluster mode.
44- * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
45- * a daemon to launch drivers as Mesos frameworks upon request.
46- */
42+ /*
43+ * A dispatcher actor that is responsible for managing drivers, that is intended to
44+ * used for Mesos cluster mode.
45+ * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
46+ * a daemon to launch drivers as Mesos frameworks upon request.
47+ */
4748class MesosClusterDispatcher (
4849 host : String ,
4950 serverPort : Int ,
@@ -67,6 +68,21 @@ class MesosClusterDispatcher(
6768
6869 def createWorkDir () {
6970 workDir = workDirPath.map(new File (_)).getOrElse(new File (sparkHome, " work" ))
71+
72+ // Attempt to remove the work directory if it exists on startup.
73+ // This is to avoid unbounded growing the work directory as drivers
74+ // are only deleted when it is over the retained count while it's running.
75+ // We don't fail startup if we are not able to remove, as this is
76+ // a short-term solution.
77+ try {
78+ if (workDir.exists()) {
79+ workDir.delete()
80+ }
81+ } catch {
82+ case e : IOException =>
83+ logError(" Unable to remove work directory " + workDir, e)
84+ }
85+
7086 try {
7187 // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
7288 // So attempting to create and then check if directory was created or not.
@@ -112,6 +128,7 @@ class MesosClusterDispatcher(
112128
113129 override def postStop () {
114130 server.stop()
131+ runners.values.foreach(_.kill())
115132 }
116133
117134 override def receiveWithLogging = {
@@ -161,6 +178,15 @@ class MesosClusterDispatcher(
161178 def removeDriver (driverId : String , state : DriverState , exception : Option [Exception ]) {
162179 if (completedDrivers.size >= RETAINED_DRIVERS ) {
163180 val toRemove = math.max(RETAINED_DRIVERS / 10 , 1 )
181+ for (i <- 0 to (toRemove - 1 )) {
182+ val driverId = completedDrivers(i).id
183+ try {
184+ new File (workDir, driverId).delete()
185+ } catch {
186+ case e : Exception =>
187+ logWarning(" Unable to remove work dir for completed driver " + driverId, e)
188+ }
189+ }
164190 completedDrivers.trimStart(toRemove)
165191 }
166192 val driverInfo = drivers.remove(driverId).get
@@ -175,6 +201,13 @@ object MesosClusterDispatcher {
175201 val conf = new SparkConf
176202 val clusterArgs = new ClusterDispatcherArguments (args, conf)
177203 val actorSystem = startSystemAndActor(clusterArgs)
204+ Runtime .getRuntime().addShutdownHook(new Thread (" MesosClusterDispatcherShutdownHook" ) {
205+ override def run () = {
206+ // Makes sure we shut down the actor, which will kill all the drivers.
207+ actorSystem.shutdown()
208+ actorSystem.awaitTermination()
209+ }
210+ })
178211 actorSystem.awaitTermination()
179212 }
180213
0 commit comments