Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.scheduler

import java.net.URI
import java.util.Properties

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -103,7 +105,7 @@ private[spark] class EventLoggingListener(

// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
logEvent(event)
logEvent(event.copy(properties = redactProperties(event.properties)))
if (shouldLogStageExecutorMetrics) {
// record the peak metrics for the new stage
liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()),
Expand Down Expand Up @@ -156,7 +158,9 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
}

override def onJobStart(event: SparkListenerJobStart): Unit = logEvent(event, flushLogger = true)
override def onJobStart(event: SparkListenerJobStart): Unit = {
logEvent(event.copy(properties = redactProperties(event.properties)), flushLogger = true)
}

override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, flushLogger = true)

Expand Down Expand Up @@ -246,6 +250,22 @@ private[spark] class EventLoggingListener(
logWriter.stop()
}

private def redactProperties(properties: Properties): Properties = {
if (properties == null) {
return properties
}
val redactedProperties = new Properties
// properties may contain some custom local properties such as stage/job description
// only properties in sparkConf need to be redacted.
val (globalProperties, localProperties) = properties.asScala.toSeq.partition {
case (key, _) => sparkConf.contains(key)
}
(Utils.redact(sparkConf, globalProperties) ++ localProperties).foreach {
case (key, value) => redactedProperties.setProperty(key, value)
}
redactedProperties
}

private[spark] def redactEvent(
event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
// environmentDetails maps a string descriptor to a set of properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler

import java.io.{File, InputStream}
import java.util.Arrays
import java.util.{Arrays, Properties}

import scala.collection.immutable.Map
import scala.collection.mutable
Expand Down Expand Up @@ -96,6 +96,67 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
assert(redactedProps(key) == "*********(redacted)")
}

test("Spark-33504 sensitive attributes redaction in properties") {
val (secretKey, secretPassword) = ("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
"secret_password")
val (customKey, customValue) = ("parse_token", "secret_password")

val conf = getLoggingConf(testDirPath, None).set(secretKey, secretPassword)

val properties = new Properties()
properties.setProperty(secretKey, secretPassword)
properties.setProperty(customKey, customValue)

val logName = "properties-reaction-test"
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus(conf)

val stageId = 1
val jobId = 1
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0,
Seq.empty, Seq.empty, "details")

val events = Array(SparkListenerStageSubmitted(stageInfo, properties),
SparkListenerJobStart(jobId, 0, Seq(stageInfo), properties))

eventLogger.start()
listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem]))
listenerBus.addToEventLogQueue(eventLogger)
events.foreach(event => listenerBus.post(event))
listenerBus.stop()
eventLogger.stop()

val logData = EventLogFileReader.openEventLog(new Path(eventLogger.logWriter.logPath),
fileSystem)
try {
val lines = readLines(logData)
val logStart = SparkListenerLogStart(SPARK_VERSION)
assert(lines.size === 3)
assert(lines(0).contains("SparkListenerLogStart"))
assert(lines(1).contains("SparkListenerStageSubmitted"))
assert(lines(2).contains("SparkListenerJobStart"))

lines.foreach{
line => JsonProtocol.sparkEventFromJson(parse(line)) match {
case logStartEvent: SparkListenerLogStart =>
assert(logStartEvent == logStart)

case stageSubmittedEvent: SparkListenerStageSubmitted =>
assert(stageSubmittedEvent.properties.getProperty(secretKey) == "*********(redacted)")
assert(stageSubmittedEvent.properties.getProperty(customKey) == customValue)

case jobStartEvent : SparkListenerJobStart =>
assert(jobStartEvent.properties.getProperty(secretKey) == "*********(redacted)")
assert(jobStartEvent.properties.getProperty(customKey) == customValue)

case _ => assert(false)
}
}
} finally {
logData.close()
}
}

test("Executor metrics update") {
testStageExecutorMetricsEventLogging()
}
Expand Down