Skip to content
This repository was archived by the owner on Nov 15, 2024. It is now read-only.

Commit 2f4d93a

Browse files
jerryshaoMatthewRBruce
authored andcommitted
[SPARK-21376][YARN] Fix yarn client token expire issue when cleaning the staging files in long running scenario
## What changes were proposed in this pull request? This issue happens in long running application with yarn cluster mode, because yarn#client doesn't sync token with AM, so it will always keep the initial token, this token may be expired in the long running scenario, so when yarn#client tries to clean up staging directory after application finished, it will use this expired token and meet token expire issue. ## How was this patch tested? Manual verification is secure cluster. Author: jerryshao <[email protected]> Closes apache#18617 from jerryshao/SPARK-21376. (cherry picked from commit cb8d5cc)
1 parent 45997f6 commit 2f4d93a

File tree

1 file changed

+26
-9
lines changed
  • resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn

1 file changed

+26
-9
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
2121
import java.net.{InetAddress, UnknownHostException, URI}
2222
import java.nio.ByteBuffer
2323
import java.nio.charset.StandardCharsets
24+
import java.security.PrivilegedExceptionAction
2425
import java.util.{Locale, Properties, UUID}
2526
import java.util.zip.{ZipEntry, ZipOutputStream}
2627

@@ -189,16 +190,32 @@ private[spark] class Client(
189190
* Cleanup application staging directory.
190191
*/
191192
private def cleanupStagingDir(appId: ApplicationId): Unit = {
192-
val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
193-
try {
194-
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
195-
val fs = stagingDirPath.getFileSystem(hadoopConf)
196-
if (!preserveFiles && fs.delete(stagingDirPath, true)) {
197-
logInfo(s"Deleted staging directory $stagingDirPath")
193+
if (sparkConf.get(PRESERVE_STAGING_FILES)) {
194+
return
195+
}
196+
197+
def cleanupStagingDirInternal(): Unit = {
198+
val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
199+
try {
200+
val fs = stagingDirPath.getFileSystem(hadoopConf)
201+
if (fs.delete(stagingDirPath, true)) {
202+
logInfo(s"Deleted staging directory $stagingDirPath")
203+
}
204+
} catch {
205+
case ioe: IOException =>
206+
logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
198207
}
199-
} catch {
200-
case ioe: IOException =>
201-
logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
208+
}
209+
210+
if (isClusterMode && principal != null && keytab != null) {
211+
val newUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
212+
newUgi.doAs(new PrivilegedExceptionAction[Unit] {
213+
override def run(): Unit = {
214+
cleanupStagingDirInternal()
215+
}
216+
})
217+
} else {
218+
cleanupStagingDirInternal()
202219
}
203220
}
204221

0 commit comments

Comments
 (0)