Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.spark.internal.config

import java.util.concurrent.TimeUnit
import java.util.regex.PatternSyntaxException

import scala.util.matching.Regex

import org.apache.spark.network.util.{ByteUnit, JavaUtils}

Expand Down Expand Up @@ -65,6 +68,13 @@ private object ConfigHelpers {

def byteToString(v: Long, unit: ByteUnit): String = unit.convertTo(v, ByteUnit.BYTE) + "b"

def regexFromString(str: String, key: String): Regex = {
try str.r catch {
case e: PatternSyntaxException =>
throw new IllegalArgumentException(s"$key should be a regex, but was $str", e)
}
}

}

/**
Expand Down Expand Up @@ -214,4 +224,7 @@ private[spark] case class ConfigBuilder(key: String) {
new FallbackConfigEntry(key, _doc, _public, fallback)
}

def regexConf: TypedConfigBuilder[Regex] = {
new TypedConfigBuilder(this, regexFromString(_, this.key), _.regex)
}
}
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,16 @@ package object config {
"driver and executor environments contain sensitive information. When this regex matches " +
"a property, its value is redacted from the environment UI and various logs like YARN " +
"and event logs.")
.stringConf
.createWithDefault("(?i)secret|password")
.regexConf
.createWithDefault("(?i)secret|password".r)

private[spark] val STRING_REDACTION_PATTERN =
ConfigBuilder("spark.redaction.string.regex")
.doc("Regex to decide which parts of strings produced by Spark contain sensitive " +
"information. When this regex matches a string part, that string part is replaced by a " +
"dummy value. This is currently used to redact the output of SQL explain commands.")
.regexConf
.createOptional

private[spark] val NETWORK_AUTH_ENABLED =
ConfigBuilder("spark.authenticate")
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2585,13 +2585,26 @@ private[spark] object Utils extends Logging {
}
}

private[util] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"
private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"

/**
* Redact the sensitive values in the given map. If a map key matches the redaction pattern then
* its value is replaced with a dummy text.
*/
def redact(conf: SparkConf, kvs: Seq[(String, String)]): Seq[(String, String)] = {
val redactionPattern = conf.get(SECRET_REDACTION_PATTERN).r
val redactionPattern = conf.get(SECRET_REDACTION_PATTERN)
redact(redactionPattern, kvs)
}

/**
* Redact the sensitive information in the given string.
*/
def redact(conf: SparkConf, text: String): String = {
if (text == null || text.isEmpty || !conf.contains(STRING_REDACTION_PATTERN)) return text
val regex = conf.get(STRING_REDACTION_PATTERN).get
regex.replaceAllIn(text, REDACTION_REPLACEMENT_TEXT)
}

private def redact(redactionPattern: Regex, kvs: Seq[(String, String)]): Seq[(String, String)] = {
kvs.map { kv =>
redactionPattern.findFirstIn(kv._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package org.apache.spark.internal.config

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.util.SparkConfWithEnv
Expand Down Expand Up @@ -98,6 +95,21 @@ class ConfigEntrySuite extends SparkFunSuite {
assert(conf.get(bytes) === 1L)
}

test("conf entry: regex") {
val conf = new SparkConf()
val rConf = ConfigBuilder(testKey("regex")).regexConf.createWithDefault(".*".r)

conf.set(rConf, "[0-9a-f]{8}".r)
assert(conf.get(rConf).regex === "[0-9a-f]{8}")

conf.set(rConf.key, "[0-9a-f]{4}")
assert(conf.get(rConf).regex === "[0-9a-f]{4}")

conf.set(rConf.key, "[.")
val e = intercept[IllegalArgumentException](conf.get(rConf))
assert(e.getMessage.contains("regex should be a regex, but was"))
}

test("conf entry: string seq") {
val conf = new SparkConf()
val seq = ConfigBuilder(testKey("seq")).stringConf.toSequence.createWithDefault(Seq())
Expand Down Expand Up @@ -239,5 +251,4 @@ class ConfigEntrySuite extends SparkFunSuite {
.createWithDefault(null)
testEntryRef(nullConf, ref(nullConf))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,33 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
val relation: BaseRelation
val metastoreTableIdentifier: Option[TableIdentifier]

protected val nodeNamePrefix: String = ""

override val nodeName: String = {
s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}"
}

override def simpleString: String = {
val metadataEntries = metadata.toSeq.sorted.map {
case (key, value) =>
key + ": " + StringUtils.abbreviate(redact(value), 100)
}
val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
s"$nodeNamePrefix$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
}

override def verboseString: String = redact(super.verboseString)

override def treeString(verbose: Boolean, addSuffix: Boolean): String = {
redact(super.treeString(verbose, addSuffix))
}

/**
* Shorthand for calling redactString() without specifying redacting rules
*/
private def redact(text: String): String = {
Utils.redact(SparkSession.getActiveSession.get.sparkContext.conf, text)
}
}

/** Physical plan node for scanning data from a relation. */
Expand Down Expand Up @@ -85,15 +109,6 @@ case class RowDataSourceScanExec(
}
}

override def simpleString: String = {
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
key + ": " + StringUtils.abbreviate(value, 100)
}

s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" +
s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}"
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
rdd :: Nil
}
Expand Down Expand Up @@ -307,13 +322,7 @@ case class FileSourceScanExec(
}
}

override def simpleString: String = {
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
key + ": " + StringUtils.abbreviate(value, 100)
}
val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
}
override val nodeNamePrefix: String = "File"

override protected def doProduce(ctx: CodegenContext): String = {
if (supportsBatch) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils

/**
* Suite that tests the redaction of DataSourceScanExec
*/
class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {

import Utils._

override def beforeAll(): Unit = {
sparkConf.set("spark.redaction.string.regex",
"spark-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
super.beforeAll()
}

test("treeString is redacted") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
val df = spark.read.parquet(basePath)

val rootPath = df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
.asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head
assert(rootPath.toString.contains(basePath.toString))

assert(!df.queryExecution.sparkPlan.treeString(verbose = true).contains(rootPath.getName))
assert(!df.queryExecution.executedPlan.treeString(verbose = true).contains(rootPath.getName))
assert(!df.queryExecution.toString.contains(rootPath.getName))
assert(!df.queryExecution.simpleString.contains(rootPath.getName))

val replacement = "*********"
assert(df.queryExecution.sparkPlan.treeString(verbose = true).contains(replacement))
assert(df.queryExecution.executedPlan.treeString(verbose = true).contains(replacement))
assert(df.queryExecution.toString.contains(replacement))
assert(df.queryExecution.simpleString.contains(replacement))
}
}
}