Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ package object config {
ConfigBuilder("spark.redaction.regex")
.doc("Regex to decide which Spark configuration properties and environment variables in " +
"driver and executor environments contain sensitive information. When this regex matches " +
"a property, its value is redacted from the environment UI and various logs like YARN " +
"and event logs.")
"a property key or value, the value is redacted from the environment UI and various logs " +
"like YARN and event logs.")
.regexConf
.createWithDefault("(?i)secret|password".r)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,17 @@ private[spark] class EventLoggingListener(

private[spark] def redactEvent(
event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
// "Spark Properties" entry will always exist because the map is always populated with it.
val redactedProps = Utils.redact(sparkConf, event.environmentDetails("Spark Properties"))
val redactedEnvironmentDetails = event.environmentDetails +
("Spark Properties" -> redactedProps)
SparkListenerEnvironmentUpdate(redactedEnvironmentDetails)
// environmentDetails maps a string descriptor to a set of properties
// Similar to:
// "JVM Information" -> jvmInformation,
// "Spark Properties" -> sparkProperties,
// ...
// where jvmInformation, sparkProperties, etc. are sequence of tuples.
// We go through the various of properties and redact sensitive information from them.
val redactedProps = event.environmentDetails.map{ case (name, props) =>
name -> Utils.redact(sparkConf, props)
}
SparkListenerEnvironmentUpdate(redactedProps)
}

}
Expand Down
22 changes: 18 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2606,10 +2606,24 @@ private[spark] object Utils extends Logging {
}

private def redact(redactionPattern: Regex, kvs: Seq[(String, String)]): Seq[(String, String)] = {
kvs.map { kv =>
redactionPattern.findFirstIn(kv._1)
.map { _ => (kv._1, REDACTION_REPLACEMENT_TEXT) }
.getOrElse(kv)
// If the sensitive information regex matches with either the key or the value, redact the value
// While the original intent was to only redact the value if the key matched with the regex,
// we've found that especially in verbose mode, the value of the property may contain sensitive
// information like so:
// "sun.java.command":"org.apache.spark.deploy.SparkSubmit ... \
// --conf spark.executorEnv.HADOOP_CREDSTORE_PASSWORD=secret_password ...
//
// And, in such cases, simply searching for the sensitive information regex in the key name is
// not sufficient. The values themselves have to be searched as well and redacted if matched.
// This does mean we may be accounting more false positives - for example, if the value of an
// arbitrary property contained the term 'password', we may redact the value from the UI and
// logs. In order to work around it, user would have to make the spark.redaction.regex property
// more specific.
kvs.map { case (key, value) =>
redactionPattern.findFirstIn(key)
.orElse(redactionPattern.findFirstIn(value))
.map { _ => (key, REDACTION_REPLACEMENT_TEXT) }
.getOrElse((key, value))
}
}

Expand Down
34 changes: 34 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import java.io._
import java.nio.charset.StandardCharsets

import scala.collection.mutable.ArrayBuffer
import scala.io.Source

import com.google.common.io.ByteStreams
import org.apache.hadoop.fs.Path
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
Expand All @@ -34,6 +36,7 @@ import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.TestUtils.JavaSourceFromString
import org.apache.spark.scheduler.EventLoggingListener
import org.apache.spark.util.{CommandLineUtils, ResetSystemProperties, Utils}


Expand Down Expand Up @@ -404,6 +407,37 @@ class SparkSubmitSuite
runSparkSubmit(args)
}

test("launch simple application with spark-submit with redaction") {
val testDir = Utils.createTempDir()
testDir.deleteOnExit()
val testDirPath = new Path(testDir.getAbsolutePath())
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val fileSystem = Utils.getHadoopFileSystem("/",
SparkHadoopUtil.get.newConfiguration(new SparkConf()))
try {
val args = Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
"--conf", "spark.executorEnv.HADOOP_CREDSTORE_PASSWORD=secret_password",
"--conf", "spark.eventLog.enabled=true",
"--conf", "spark.eventLog.testing=true",
"--conf", s"spark.eventLog.dir=${testDirPath.toUri.toString}",
"--conf", "spark.hadoop.fs.defaultFS=unsupported://example.com",
unusedJar.toString)
runSparkSubmit(args)
val listStatus = fileSystem.listStatus(testDirPath)
val logData = EventLoggingListener.openEventLog(listStatus.last.getPath, fileSystem)
Source.fromInputStream(logData).getLines().foreach { line =>
assert(!line.contains("secret_password"))
}
} finally {
Utils.deleteRecursively(testDir)
}
}

test("includes jars passed in through --jars") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
Expand Down
10 changes: 7 additions & 3 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1010,15 +1010,19 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
"spark.my.password",
"spark.my.sECreT")
secretKeys.foreach { key => sparkConf.set(key, "secret_password") }
secretKeys.foreach { key => sparkConf.set(key, "sensitive_value") }
// Set a non-secret key
sparkConf.set("spark.regular.property", "not_a_secret")
sparkConf.set("spark.regular.property", "regular_value")
// Set a property with a regular key but secret in the value
sparkConf.set("spark.sensitive.property", "has_secret_in_value")

// Redact sensitive information
val redactedConf = Utils.redact(sparkConf, sparkConf.getAll).toMap

// Assert that secret information got redacted while the regular property remained the same
secretKeys.foreach { key => assert(redactedConf(key) === Utils.REDACTION_REPLACEMENT_TEXT) }
assert(redactedConf("spark.regular.property") === "not_a_secret")
assert(redactedConf("spark.regular.property") === "regular_value")
assert(redactedConf("spark.sensitive.property") === Utils.REDACTION_REPLACEMENT_TEXT)

}
}
4 changes: 2 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ Apart from these, the following properties are also available, and may be useful
<td>(?i)secret|password</td>
<td>
Regex to decide which Spark configuration properties and environment variables in driver and
executor environments contain sensitive information. When this regex matches a property, its
value is redacted from the environment UI and various logs like YARN and event logs.
executor environments contain sensitive information. When this regex matches a property key or
value, the value is redacted from the environment UI and various logs like YARN and event logs.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.{HashMap => JHashMap}

import scala.collection.mutable
import scala.concurrent.duration._
import scala.io.Source
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
Expand Down Expand Up @@ -87,24 +88,30 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
testBasicYarnApp(false)
}

test("run Spark in yarn-client mode with different configurations") {
test("run Spark in yarn-client mode with different configurations, ensuring redaction") {
testBasicYarnApp(true,
Map(
"spark.driver.memory" -> "512m",
"spark.executor.cores" -> "1",
"spark.executor.memory" -> "512m",
"spark.executor.instances" -> "2"
"spark.executor.instances" -> "2",
// Sending some senstive information, which we'll make sure gets redacted
"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD,
"spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD
))
}

test("run Spark in yarn-cluster mode with different configurations") {
test("run Spark in yarn-cluster mode with different configurations, ensuring redaction") {
testBasicYarnApp(false,
Map(
"spark.driver.memory" -> "512m",
"spark.driver.cores" -> "1",
"spark.executor.cores" -> "1",
"spark.executor.memory" -> "512m",
"spark.executor.instances" -> "2"
"spark.executor.instances" -> "2",
// Sending some senstive information, which we'll make sure gets redacted
"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD,
"spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD
))
}

Expand Down Expand Up @@ -349,6 +356,7 @@ private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matc
private object YarnClusterDriver extends Logging with Matchers {

val WAIT_TIMEOUT_MILLIS = 10000
val SECRET_PASSWORD = "secret_password"

def main(args: Array[String]): Unit = {
if (args.length != 1) {
Expand Down Expand Up @@ -395,6 +403,13 @@ private object YarnClusterDriver extends Logging with Matchers {
assert(executorInfos.nonEmpty)
executorInfos.foreach { info =>
assert(info.logUrlMap.nonEmpty)
info.logUrlMap.values.foreach { url =>
val log = Source.fromURL(url).mkString
assert(
!log.contains(SECRET_PASSWORD),
s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
)
}
}

// If we are running in yarn-cluster mode, verify that driver logs links and present and are
Expand All @@ -406,8 +421,13 @@ private object YarnClusterDriver extends Logging with Matchers {
assert(driverLogs.contains("stderr"))
assert(driverLogs.contains("stdout"))
val urlStr = driverLogs("stderr")
// Ensure that this is a valid URL, else this will throw an exception
new URL(urlStr)
driverLogs.foreach { kv =>
val log = Source.fromURL(kv._2).mkString
assert(
!log.contains(SECRET_PASSWORD),
s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
)
}
val containerId = YarnSparkHadoopUtil.get.getContainerId
val user = Utils.getCurrentUserName()
assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096"))
Expand Down