diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 783cf47df169..165e1f347494 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -157,24 +157,35 @@ private[spark] class SparkSubmit extends Logging { def doRunMain(): Unit = { if (args.proxyUser != null) { - val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, - UserGroupInformation.getCurrentUser()) - try { - proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { - override def run(): Unit = { - runMain(args, uninitLog) - } - }) - } catch { - case e: Exception => - // Hadoop's AuthorizationException suppresses the exception's stack trace, which - // makes the message printed to the output by the JVM not very helpful. Instead, - // detect exceptions with empty stack traces here, and treat them differently. - if (e.getStackTrace().length == 0) { - error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") - } else { - throw e - } + // Here we are checking for client mode because when job is sumbitted in cluster + // deploy mode with k8s resource manager, the spark submit in the driver container + // is done in client mode. + val isKubernetesClusterModeDriver = args.master.startsWith("k8s") && + args.deployMode.equals("client") && + args.toSparkConf().getBoolean("spark.kubernetes.submitInDriver", false) + if (isKubernetesClusterModeDriver) { + logInfo("Running driver with proxy user. Cluster manager: Kubernetes") + SparkHadoopUtil.get.runAsSparkUser(() => runMain(args, uninitLog)) + } else { + val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, + UserGroupInformation.getCurrentUser()) + try { + proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + runMain(args, uninitLog) + } + }) + } catch { + case e: Exception => + // Hadoop's AuthorizationException suppresses the exception's stack trace, which + // makes the message printed to the output by the JVM not very helpful. Instead, + // detect exceptions with empty stack traces here, and treat them differently. + if (e.getStackTrace().length == 0) { + error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") + } else { + throw e + } + } } } else { runMain(args, uninitLog)