|
17 | 17 |
|
18 | 18 | package org.apache.spark.deploy.yarn |
19 | 19 |
|
20 | | -import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream} |
| 20 | +import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException} |
21 | 21 | import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} |
22 | 22 | import java.nio.ByteBuffer |
23 | 23 | import java.security.PrivilegedExceptionAction |
@@ -515,7 +515,28 @@ private[spark] class Client( |
515 | 515 | logInfo("Setting up container launch context for our AM") |
516 | 516 | val appId = newAppResponse.getApplicationId |
517 | 517 | val appStagingDir = getAppStagingDir(appId) |
518 | | - val localResources = prepareLocalResources(appStagingDir) |
| 518 | + var localResources: Map[String, LocalResource] = null |
| 519 | + try { |
| 520 | + localResources = prepareLocalResources(appStagingDir) |
| 521 | + } catch { |
| 522 | + case e: Throwable => |
| 523 | + var stagingDirPath: Path = null |
| 524 | + try { |
| 525 | + val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false) |
| 526 | + if (!preserveFiles) { |
| 527 | + stagingDirPath = new Path(appStagingDir) |
| 528 | + logInfo("Deleting staging directory " + stagingDirPath) |
| 529 | + val fs = FileSystem.get(hadoopConf) |
| 530 | + fs.delete(stagingDirPath, true) |
| 531 | + } |
| 532 | + } catch { |
| 533 | + case ioe: IOException => |
| 534 | + logError("Failed to cleanup staging dir " + stagingDirPath, ioe) |
| 535 | + } finally { |
| 536 | + throw e |
| 537 | + } |
| 538 | + } |
| 539 | + |
519 | 540 | val launchEnv = setupLaunchEnv(appStagingDir) |
520 | 541 | val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) |
521 | 542 | amContainer.setLocalResources(localResources) |
|
0 commit comments