Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,13 @@ package object config {
" bigger files.")
.longConf
.createWithDefault(4 * 1024 * 1024)

private[spark] val SECRET_REDACTION_PATTERN =
ConfigBuilder("spark.redaction.regex")
.doc("Regex to decide which Spark configuration properties and environment variables in " +
"driver and executor environments contain sensitive information. When this regex matches " +
"a property , its value is redacted from the environment UI and various logs like YARN " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space before comma.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

"and event logs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing period.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

.stringConf
.createWithDefault("(?i)secret|password")
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ private[spark] class EventLoggingListener(

override def onTaskEnd(event: SparkListenerTaskEnd): Unit = logEvent(event)

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = logEvent(event)
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
logEvent(redactEvent(event))
}

// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
Expand Down Expand Up @@ -231,6 +233,19 @@ private[spark] class EventLoggingListener(
}
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't add this blank line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

private def redactEvent(event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why you chose this instead of redacting at the source of SparkListenerEnvironmentUpdate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I thought about this quite a bit when making this change. I should have posted that decision in the PR description, apologies for not providing that context. The way I see it - there are three places where redaction can be done:

  1. Right at the source of SparkListenerEnvironmentUpdate (here, in SparkContext.scala).
  2. In JsonProtocol.scala when converting the event to JSON.
  3. In EventLogginListener.scala (where it is right now), when being persisted to disk.

A user could write a custom listener that listened to the environment updates and did something useful with them. And, I didn't want to impose redaction on them. They could be using it to create a clone of their environment, for example and may need to the same sensitive properties. So, I ruled out 1.

And, JsonProtocol seemed like a generic utility to convert events to JSON. While I could be selective about only redacting events of SparkListenerEnvironmentUpdate type, I didn't want to assume that everyone translating the environment update to JSON should only be seeing redacted configuration. So, that made me rule out 2.

I decided that it was best redact "closest to disk", which made me put the redaction code where I did - in EventLoggingListener. Hope that makes sense, happy to hear your thoughts if you think otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside is that with this choice you have more complicated tests, and you have to do redaction at every place where this information might be written (which at the moment is just two - the event logger and the UI).

JsonProtocol is not really a choice because it doesn't cover the UI.

A user could write a custom listener that listened to the environment updates and did something useful with them. And, I didn't want to impose redaction on them.

That's the only argument I can buy, and said user has different ways to get that information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the only argument I can buy, and said user has different ways to get that information.

True, sparkConf.getAll is always available.

I still think it's better to put the redaction closer to the "sinks" for now. The good thing though is that if we see the number of "sinks" increasing, and everyone wanting redaction, we can take redaction more upstream. For now, 2 sinks seem manageable and it's hard to guess if future sinks are going to want redaction or not. So, unless you strongly object, I'd like to keep the redaction closer to the "sinks" now.

// "Spark Properties" entry will always exist because the map is always populated with it.
val props = event
.environmentDetails
.get("Spark Properties")
.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using map.get(key).get, just use map(key).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

val redactedProps = Utils.redact(sparkConf, props)
val redactedEnvironmentDetails = event.environmentDetails +
("Spark Properties" -> redactedProps)
SparkListenerEnvironmentUpdate(redactedEnvironmentDetails)
}

}

private[spark] object EventLoggingListener extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,17 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node

import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils

private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
private val listener = parent.listener

private def removePass(kv: (String, String)): (String, String) = {
if (kv._1.toLowerCase.contains("password") || kv._1.toLowerCase.contains("secret")) {
(kv._1, "******")
} else kv
}

def render(request: HttpServletRequest): Seq[Node] = {
val runtimeInformationTable = UIUtils.listingTable(
propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, listener.sparkProperties.map(removePass), fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(propertyHeader, propertyRow,
Utils.redact(parent.conf, listener.sparkProperties), fixedWidth = true)

val systemPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
val classpathEntriesTable = UIUtils.listingTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.ui._

private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
val listener = parent.environmentListener
val conf = parent.conf
attachPage(new EnvironmentPage(this))
}

Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import org.slf4j.Logger
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.internal.config._
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.util.logging.RollingFileAppender
Expand Down Expand Up @@ -2555,6 +2555,19 @@ private[spark] object Utils extends Logging {
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}
}

private[util] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"

def redact(conf: SparkConf, kvs: Seq[(String, String)]): Seq[(String, String)] = {
val redactionPattern = conf.get(SECRET_REDACTION_PATTERN).r
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very expensive. How about a version that takes a list of tuples and redacts them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What part do you think is expensive? Going through all the configuration properties and matching them with the regex?
If so, I agree. However, that has to be done somewhere. All the callers of this function have a SparkConf that they want stuff redacted from. So, if this function accepts a list of tuples, they have to run the regex check to find that list first before sending it to redact(). So, overall, unless I am missing something, I don't think we can avoid the expense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compiling the regex once for every item in the list being redacted, instead of doing it once for the whole list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. Let me fix this.

kvs.map { kv =>
if (redactionPattern.findFirstIn(kv._1).isDefined) {
(kv._1, REDACTION_REPLACEMENT_TEXT)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indented too far

else kv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style here is wrong. Better to use the Option api:

regex.findFirstIn(...).map(...).getOrElse(...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's much better, thanks.

}
}

}

private[util] object CallerContext extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,30 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
}
}

test("Event logging with password redaction") {
val secretPassword = "secret_password"
val conf = getLoggingConf(testDirPath, None).set("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
secretPassword)
sc = new SparkContext("local-cluster[2,2,1024]", "test", conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty expensive way of testing this. Why not just call the redaction method and make sure it's doing the right thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wanted a little more than just a "unit" test. This was more broader and checked for actual redaction taking place in event logs, so I have it here.

I think you have a valid point though, if you think this is too expensive, I think the method in UtilsSuite.scala does a pretty good job at "unit testing" redact(), so I'd rather take this out completely. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are not doing the exact same thing. There's logic in the EventLoggingListener code that also needs to be tested. But you can have a more targeted unit test instead of running a distributed Spark application.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have updated this test to be less bloated. Thanks!

assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get

sc.parallelize(1 to 10000).count()
sc.stop()

val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
val eventLog = Source.fromInputStream(logData).mkString
// Make sure nothing secret shows up anywhere
assert(!eventLog.contains(secretPassword), s"Secret password ($secretPassword) not redacted " +
s"from event logs:\n $eventLog")
val expected = """"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD":"*********(redacted)""""
Copy link
Contributor

@vanzin vanzin Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty hacky. It makes assumptions about how things are formatted in the event log.

The previous assert should be enough (ignoring my previous comment about changing this test).

// Make sure every occurrence of the property is accompanied by a redaction text.
val regex = """"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD":"([^"]*)"""".r
val matches = regex.findAllIn(eventLog)
assert(matches.nonEmpty)
matches.foreach{ matched => assert(matched.equals(expected)) }
}

test("Log overwriting") {
val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
val logPath = new URI(logUri).getPath
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -974,4 +974,28 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

assert(pValue > threshold)
}

test("redact sensitive information") {
val sparkConf = new SparkConf

// Set some secret keys
val secretKeys = Seq("" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "" + accomplishing here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me trying to format things in my editor led to this, apologies. Fixed it.

"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
"spark.my.password",
"spark.my.sECreT")
secretKeys.foreach { key =>
sparkConf.set(key, "secret_password")
}
// Set a non-secret key
sparkConf.set("spark.regular.property", "not_a_secret")

// Redact sensitive information
val redactedConf = Utils.redact(sparkConf, sparkConf.getAll).toMap

// Assert that secret information got redacted while the regular property remained the same
secretKeys.foreach { key =>
assert(redactedConf.get(key).get == Utils.REDACTION_REPLACEMENT_TEXT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(conf(key) === ...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
assert(redactedConf.get("spark.regular.property").get == "not_a_secret")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
}
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,15 @@ Apart from these, the following properties are also available, and may be useful
process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
<td><code>spark.redaction.regex</code></td>
<td>(?i)secret|password</td>
<td>
Regex to decide which Spark configuration properties and environment variables in driver and
executor environments contain sensitive information. When this regex matches a property, its
value is redacted from the environment UI and various logs like YARN and event logs.
</td>
</tr>
<tr>
<td><code>spark.python.profile</code></td>
<td>false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
Expand Down Expand Up @@ -75,7 +74,7 @@ private[yarn] class ExecutorRunnable(
|===============================================================================
|YARN executor launch context:
| env:
|${env.map { case (k, v) => s" $k -> $v\n" }.mkString}
|${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString}
| command:
| ${commands.mkString(" \\ \n ")}
|
Expand Down