diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 24e2a5e4d4a6..5673c02fb52b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -18,7 +18,9 @@ package org.apache.spark.scheduler import java.net.URI +import java.util.Properties +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.hadoop.conf.Configuration @@ -103,7 +105,7 @@ private[spark] class EventLoggingListener( // Events that do not trigger a flush override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { - logEvent(event) + logEvent(event.copy(properties = redactProperties(event.properties))) if (shouldLogStageExecutorMetrics) { // record the peak metrics for the new stage liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()), @@ -156,7 +158,9 @@ private[spark] class EventLoggingListener( logEvent(event, flushLogger = true) } - override def onJobStart(event: SparkListenerJobStart): Unit = logEvent(event, flushLogger = true) + override def onJobStart(event: SparkListenerJobStart): Unit = { + logEvent(event.copy(properties = redactProperties(event.properties)), flushLogger = true) + } override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, flushLogger = true) @@ -246,6 +250,22 @@ private[spark] class EventLoggingListener( logWriter.stop() } + private def redactProperties(properties: Properties): Properties = { + if (properties == null) { + return properties + } + val redactedProperties = new Properties + // properties may contain some custom local properties such as stage/job description + // only properties in sparkConf need to be redacted. + val (globalProperties, localProperties) = properties.asScala.toSeq.partition { + case (key, _) => sparkConf.contains(key) + } + (Utils.redact(sparkConf, globalProperties) ++ localProperties).foreach { + case (key, value) => redactedProperties.setProperty(key, value) + } + redactedProperties + } + private[spark] def redactEvent( event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = { // environmentDetails maps a string descriptor to a set of properties diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 046564d65b76..b49710401f15 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import java.io.{File, InputStream} -import java.util.Arrays +import java.util.{Arrays, Properties} import scala.collection.immutable.Map import scala.collection.mutable @@ -96,6 +96,67 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit assert(redactedProps(key) == "*********(redacted)") } + test("Spark-33504 sensitive attributes redaction in properties") { + val (secretKey, secretPassword) = ("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD", + "secret_password") + val (customKey, customValue) = ("parse_token", "secret_password") + + val conf = getLoggingConf(testDirPath, None).set(secretKey, secretPassword) + + val properties = new Properties() + properties.setProperty(secretKey, secretPassword) + properties.setProperty(customKey, customValue) + + val logName = "properties-reaction-test" + val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) + val listenerBus = new LiveListenerBus(conf) + + val stageId = 1 + val jobId = 1 + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details") + + val events = Array(SparkListenerStageSubmitted(stageInfo, properties), + SparkListenerJobStart(jobId, 0, Seq(stageInfo), properties)) + + eventLogger.start() + listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) + listenerBus.addToEventLogQueue(eventLogger) + events.foreach(event => listenerBus.post(event)) + listenerBus.stop() + eventLogger.stop() + + val logData = EventLogFileReader.openEventLog(new Path(eventLogger.logWriter.logPath), + fileSystem) + try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 3) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerStageSubmitted")) + assert(lines(2).contains("SparkListenerJobStart")) + + lines.foreach{ + line => JsonProtocol.sparkEventFromJson(parse(line)) match { + case logStartEvent: SparkListenerLogStart => + assert(logStartEvent == logStart) + + case stageSubmittedEvent: SparkListenerStageSubmitted => + assert(stageSubmittedEvent.properties.getProperty(secretKey) == "*********(redacted)") + assert(stageSubmittedEvent.properties.getProperty(customKey) == customValue) + + case jobStartEvent : SparkListenerJobStart => + assert(jobStartEvent.properties.getProperty(secretKey) == "*********(redacted)") + assert(jobStartEvent.properties.getProperty(customKey) == customValue) + + case _ => assert(false) + } + } + } finally { + logData.close() + } + } + test("Executor metrics update") { testStageExecutorMetricsEventLogging() }