Skip to content

Commit c64ff80

Browse files
sarutaksrowen
authored andcommitted
[SPARK-7503] [YARN] Resources in .sparkStaging directory can't be cleaned up on error
When we run applications on YARN with cluster mode, uploaded resources on .sparkStaging directory can't be cleaned up in case of failure of uploading local resources. You can see this issue by running following command. ``` bin/spark-submit --master yarn --deploy-mode cluster --class <someClassName> <non-existing-jar> ``` Author: Kousuke Saruta <[email protected]> Closes apache#6026 from sarutak/delete-uploaded-resources-on-error and squashes the following commits: caef9f4 [Kousuke Saruta] Fixed style 882f921 [Kousuke Saruta] Wrapped Client#submitApplication with try/catch blocks in order to delete resources on error 1786ca4 [Kousuke Saruta] Merge branch 'master' of https://github.com/apache/spark into delete-uploaded-resources-on-error f61071b [Kousuke Saruta] Fixed cleanup problem
1 parent fdf5bba commit c64ff80

File tree

1 file changed

+47
-25
lines changed
  • yarn/src/main/scala/org/apache/spark/deploy/yarn

1 file changed

+47
-25
lines changed

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20-
import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
20+
import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException}
2121
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
2222
import java.nio.ByteBuffer
2323
import java.security.PrivilegedExceptionAction
@@ -91,30 +91,52 @@ private[spark] class Client(
9191
* available in the alpha API.
9292
*/
9393
def submitApplication(): ApplicationId = {
94-
// Setup the credentials before doing anything else, so we have don't have issues at any point.
95-
setupCredentials()
96-
yarnClient.init(yarnConf)
97-
yarnClient.start()
98-
99-
logInfo("Requesting a new application from cluster with %d NodeManagers"
100-
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
101-
102-
// Get a new application from our RM
103-
val newApp = yarnClient.createApplication()
104-
val newAppResponse = newApp.getNewApplicationResponse()
105-
val appId = newAppResponse.getApplicationId()
106-
107-
// Verify whether the cluster has enough resources for our AM
108-
verifyClusterResources(newAppResponse)
109-
110-
// Set up the appropriate contexts to launch our AM
111-
val containerContext = createContainerLaunchContext(newAppResponse)
112-
val appContext = createApplicationSubmissionContext(newApp, containerContext)
113-
114-
// Finally, submit and monitor the application
115-
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
116-
yarnClient.submitApplication(appContext)
117-
appId
94+
var appId: ApplicationId = null
95+
try {
96+
// Setup the credentials before doing anything else,
97+
// so we have don't have issues at any point.
98+
setupCredentials()
99+
yarnClient.init(yarnConf)
100+
yarnClient.start()
101+
102+
logInfo("Requesting a new application from cluster with %d NodeManagers"
103+
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
104+
105+
// Get a new application from our RM
106+
val newApp = yarnClient.createApplication()
107+
val newAppResponse = newApp.getNewApplicationResponse()
108+
appId = newAppResponse.getApplicationId()
109+
110+
// Verify whether the cluster has enough resources for our AM
111+
verifyClusterResources(newAppResponse)
112+
113+
// Set up the appropriate contexts to launch our AM
114+
val containerContext = createContainerLaunchContext(newAppResponse)
115+
val appContext = createApplicationSubmissionContext(newApp, containerContext)
116+
117+
// Finally, submit and monitor the application
118+
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
119+
yarnClient.submitApplication(appContext)
120+
appId
121+
} catch {
122+
case e: Throwable =>
123+
if (appId != null) {
124+
val appStagingDir = getAppStagingDir(appId)
125+
try {
126+
val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
127+
val stagingDirPath = new Path(appStagingDir)
128+
val fs = FileSystem.get(hadoopConf)
129+
if (!preserveFiles && fs.exists(stagingDirPath)) {
130+
logInfo("Deleting staging directory " + stagingDirPath)
131+
fs.delete(stagingDirPath, true)
132+
}
133+
} catch {
134+
case ioe: IOException =>
135+
logWarning("Failed to cleanup staging dir " + appStagingDir, ioe)
136+
}
137+
}
138+
throw e
139+
}
118140
}
119141

120142
/**

0 commit comments

Comments
 (0)