diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index e2ae489446d85..83a7ef0061fb2 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -39,9 +39,10 @@ import org.apache.spark.sql.TestingUDT.IntervalData import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA, UTC} -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{FormattedMode, SparkPlan} import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.test.SharedSparkSession @@ -1808,7 +1809,7 @@ class AvroV1Suite extends AvroSuite { .set(SQLConf.USE_V1_SOURCE_LIST, "avro") } -class AvroV2Suite extends AvroSuite { +class AvroV2Suite extends AvroSuite with ExplainSuiteHelper { import testImplicits._ override protected def sparkConf: SparkConf = @@ -1907,4 +1908,32 @@ class AvroV2Suite extends AvroSuite { assert(scan1.sameResult(scan2)) } } + + test("explain formatted on an avro data source v2") { + withTempDir { dir => + val basePath = dir.getCanonicalPath + "/avro" + val expected_plan_fragment = + s""" + |\\(1\\) BatchScan + |Output \\[2\\]: \\[value#xL, id#x\\] + |DataFilters: \\[isnotnull\\(value#xL\\), \\(value#xL > 2\\)\\] + |Format: avro + |Location: InMemoryFileIndex\\[.*\\] + |PartitionFilters: \\[isnotnull\\(id#x\\), \\(id#x > 1\\)\\] + |ReadSchema: struct\\ + |""".stripMargin.trim + spark.range(10) + .select(col("id"), col("id").as("value")) + .write.option("header", true) + .partitionBy("id") + .format("avro") + .save(basePath) + val df = spark + .read + .format("avro") + .load(basePath).where($"id" > 1 && $"value" > 2) + val normalizedOutput = getNormalizedExplain(df, FormattedMode) + assert(expected_plan_fragment.r.findAllMatchIn(normalizedOutput).length == 1) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsMetadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsMetadata.scala new file mode 100644 index 0000000000000..b2cb19b009141 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsMetadata.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.internal.connector + +/** + * A mix-in interface for {@link FileScan}. This can be used to report metadata + * for a file based scan operator. This is currently used for supporting formatted + * explain. + */ +trait SupportsMetadata { + def getMetaData(): Map[String, String] +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 211f61279ddd5..083c6bc7999bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning} -import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.internal.connector.SupportsMetadata import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils @@ -43,7 +44,32 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { override def simpleString(maxFields: Int): String = { val result = s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}" - Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, result) + redact(result) + } + + /** + * Shorthand for calling redact() without specifying redacting rules + */ + protected def redact(text: String): String = { + Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, text) + } + + override def verboseStringWithOperatorId(): String = { + val metaDataStr = scan match { + case s: SupportsMetadata => + s.getMetaData().toSeq.sorted.flatMap { + case (_, value) if value.isEmpty || value.equals("[]") => None + case (key, value) => Some(s"$key: ${redact(value)}") + case _ => None + } + case _ => + Seq(scan.description()) + } + s""" + |$formattedNodeName + |${ExplainUtils.generateFieldString("Output", output)} + |${metaDataStr.mkString("\n")} + |""".stripMargin } override def outputPartitioning: physical.Partitioning = scan match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 7e8e0ed2dc675..f090d7861b629 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -29,11 +29,13 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.connector.read.{Batch, InputPartition, Scan, Statistics, SupportsReportStatistics} import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.connector.SupportsMetadata import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils -trait FileScan extends Scan with Batch with SupportsReportStatistics with Logging { +trait FileScan extends Scan + with Batch with SupportsReportStatistics with SupportsMetadata with Logging { /** * Returns whether a file with `path` could be split or not. */ @@ -93,23 +95,28 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin override def hashCode(): Int = getClass.hashCode() + val maxMetadataValueLength = 100 + override def description(): String = { - val maxMetadataValueLength = 100 + val metadataStr = getMetaData().toSeq.sorted.map { + case (key, value) => + val redactedValue = + Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value) + key + ": " + StringUtils.abbreviate(redactedValue, maxMetadataValueLength) + }.mkString(", ") + s"${this.getClass.getSimpleName} $metadataStr" + } + + override def getMetaData(): Map[String, String] = { val locationDesc = fileIndex.getClass.getSimpleName + Utils.buildLocationMetadata(fileIndex.rootPaths, maxMetadataValueLength) - val metadata: Map[String, String] = Map( + Map( + "Format" -> s"${this.getClass.getSimpleName.replace("Scan", "").toLowerCase(Locale.ROOT)}", "ReadSchema" -> readDataSchema.catalogString, "PartitionFilters" -> seqToString(partitionFilters), "DataFilters" -> seqToString(dataFilters), "Location" -> locationDesc) - val metadataStr = metadata.toSeq.sorted.map { - case (key, value) => - val redactedValue = - Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value) - key + ": " + StringUtils.abbreviate(redactedValue, maxMetadataValueLength) - }.mkString(", ") - s"${this.getClass.getSimpleName} $metadataStr" } protected def partitions: Seq[FilePartition] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index 4f510322815ef..efb21e1c1e597 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -107,4 +107,8 @@ case class CSVScan( override def description(): String = { super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]") } + + override def getMetaData(): Map[String, String] = { + super.getMetaData() ++ Map("PushedFilers" -> seqToString(pushedFilters)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index 62894fa7a2538..38b8ced51a141 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -65,6 +65,10 @@ case class OrcScan( super.description() + ", PushedFilters: " + seqToString(pushedFilters) } + override def getMetaData(): Map[String, String] = { + super.getMetaData() ++ Map("PushedFilers" -> seqToString(pushedFilters)) + } + override def withFilters( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index bb315262a8211..c9c1e28a36960 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -93,6 +93,10 @@ case class ParquetScan( super.description() + ", PushedFilters: " + seqToString(pushedFilters) } + override def getMetaData(): Map[String, String] = { + super.getMetaData() ++ Map("PushedFilers" -> seqToString(pushedFilters)) + } + override def withFilters( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 1ad97185a564a..70303792fdf1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, E import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} trait ExplainSuiteHelper extends QueryTest with SharedSparkSession { @@ -360,6 +360,54 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } } + + test("Explain formatted output for scan operator for datasource V2") { + withTempDir { dir => + Seq("parquet", "orc", "csv", "json").foreach { fmt => + val basePath = dir.getCanonicalPath + "/" + fmt + val pushFilterMaps = Map ( + "parquet" -> + "|PushedFilers: \\[.*\\(id\\), .*\\(value\\), .*\\(id,1\\), .*\\(value,2\\)\\]", + "orc" -> + "|PushedFilers: \\[.*\\(id\\), .*\\(value\\), .*\\(id,1\\), .*\\(value,2\\)\\]", + "csv" -> + "|PushedFilers: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]", + "json" -> + "|remove_marker" + ) + val expected_plan_fragment1 = + s""" + |\\(1\\) BatchScan + |Output \\[2\\]: \\[value#x, id#x\\] + |DataFilters: \\[isnotnull\\(value#x\\), \\(value#x > 2\\)\\] + |Format: $fmt + |Location: InMemoryFileIndex\\[.*\\] + |PartitionFilters: \\[isnotnull\\(id#x\\), \\(id#x > 1\\)\\] + ${pushFilterMaps.get(fmt).get} + |ReadSchema: struct\\ + |""".stripMargin.replaceAll("\nremove_marker", "").trim + + spark.range(10) + .select(col("id"), col("id").as("value")) + .write.option("header", true) + .partitionBy("id") + .format(fmt) + .save(basePath) + val readSchema = + StructType(Seq(StructField("id", IntegerType), StructField("value", IntegerType))) + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + val df = spark + .read + .schema(readSchema) + .option("header", true) + .format(fmt) + .load(basePath).where($"id" > 1 && $"value" > 2) + val normalizedOutput = getNormalizedExplain(df, FormattedMode) + assert(expected_plan_fragment1.r.findAllMatchIn(normalizedOutput).length == 1) + } + } + } + } } class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuite {