Skip to content

Commit 11a14a3

Browse files
committed
make sure client process could clean the staging files without getting token expire issue
Change-Id: I851f829d38ab93a4caafb7ca029d34da76f92f76
1 parent ebc124d commit 11a14a3

File tree

1 file changed

+23
-9
lines changed
  • resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn

1 file changed

+23
-9
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
2121
import java.net.{InetAddress, UnknownHostException, URI}
2222
import java.nio.ByteBuffer
2323
import java.nio.charset.StandardCharsets
24+
import java.security.PrivilegedExceptionAction
2425
import java.util.{Locale, Properties, UUID}
2526
import java.util.zip.{ZipEntry, ZipOutputStream}
2627

@@ -192,16 +193,29 @@ private[spark] class Client(
192193
* Cleanup application staging directory.
193194
*/
194195
private def cleanupStagingDir(appId: ApplicationId): Unit = {
195-
val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
196-
try {
197-
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
198-
val fs = stagingDirPath.getFileSystem(hadoopConf)
199-
if (!preserveFiles && fs.delete(stagingDirPath, true)) {
200-
logInfo(s"Deleted staging directory $stagingDirPath")
196+
def cleanupStagingDirInternal(): Unit = {
197+
val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
198+
try {
199+
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
200+
val fs = stagingDirPath.getFileSystem(hadoopConf)
201+
if (!preserveFiles && fs.delete(stagingDirPath, true)) {
202+
logInfo(s"Deleted staging directory $stagingDirPath")
203+
}
204+
} catch {
205+
case ioe: IOException =>
206+
logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
201207
}
202-
} catch {
203-
case ioe: IOException =>
204-
logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
208+
}
209+
210+
if (isClusterMode && principal != null && keytab != null) {
211+
val newUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
212+
newUgi.doAs(new PrivilegedExceptionAction[Unit] {
213+
override def run(): Unit = {
214+
cleanupStagingDirInternal()
215+
}
216+
})
217+
} else {
218+
cleanupStagingDirInternal()
205219
}
206220
}
207221

0 commit comments

Comments
 (0)