Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ import org.apache.spark.sql.TestingUDT.IntervalData
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA, UTC}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{FormattedMode, SparkPlan}
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -1808,7 +1809,7 @@ class AvroV1Suite extends AvroSuite {
.set(SQLConf.USE_V1_SOURCE_LIST, "avro")
}

class AvroV2Suite extends AvroSuite {
class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
import testImplicits._

override protected def sparkConf: SparkConf =
Expand Down Expand Up @@ -1907,4 +1908,32 @@ class AvroV2Suite extends AvroSuite {
assert(scan1.sameResult(scan2))
}
}

test("explain formatted on an avro data source v2") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath + "/avro"
val expected_plan_fragment =
s"""
|\\(1\\) BatchScan
|Output \\[2\\]: \\[value#xL, id#x\\]
|DataFilters: \\[isnotnull\\(value#xL\\), \\(value#xL > 2\\)\\]
|Format: avro
|Location: InMemoryFileIndex\\[.*\\]
|PartitionFilters: \\[isnotnull\\(id#x\\), \\(id#x > 1\\)\\]
|ReadSchema: struct\\<value:bigint\\>
|""".stripMargin.trim
spark.range(10)
.select(col("id"), col("id").as("value"))
.write.option("header", true)
.partitionBy("id")
.format("avro")
.save(basePath)
val df = spark
.read
.format("avro")
.load(basePath).where($"id" > 1 && $"value" > 2)
val normalizedOutput = getNormalizedExplain(df, FormattedMode)
assert(expected_plan_fragment.r.findAllMatchIn(normalizedOutput).length == 1)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.internal.connector

/**
* A mix-in interface for {@link FileScan}. This can be used to report metadata
* for a file based scan operator. This is currently used for supporting formatted
* explain.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need @Evolving here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Will Add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu On second thought, this is not an external interface, right ? So don't think we need any annotations here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but if we expose this, developers could improve explain output for their custom scan? cc: @cloud-fan

trait SupportsMetadata {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz add some comment about what this class is used for?

def getMetaData(): Map[String, String]
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to move this file into the java side along with Batch and SupportsReportStatistics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Don't see the need to make it a part of external V2 contract. We are using it for explain now.. so thought of keeping it internal just like we use the Logging trait.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.connector.SupportsMetadata
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils

Expand All @@ -43,7 +44,32 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
override def simpleString(maxFields: Int): String = {
val result =
s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}"
Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, result)
redact(result)
}

/**
* Shorthand for calling redact() without specifying redacting rules
*/
protected def redact(text: String): String = {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to declare as protected? Never mind.

Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, text)
}

override def verboseStringWithOperatorId(): String = {
val metaDataStr = scan match {
case s: SupportsMetadata =>
s.getMetaData().toSeq.sorted.flatMap {
case (_, value) if value.isEmpty || value.equals("[]") => None
case (key, value) => Some(s"$key: ${redact(value)}")
case _ => None
}
case _ =>
Seq(scan.description())
}
s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Output", output)}
|${metaDataStr.mkString("\n")}
|""".stripMargin
}

override def outputPartitioning: physical.Partitioning = scan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.connector.read.{Batch, InputPartition, Scan, Statistics, SupportsReportStatistics}
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.connector.SupportsMetadata
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

trait FileScan extends Scan with Batch with SupportsReportStatistics with Logging {
trait FileScan extends Scan
with Batch with SupportsReportStatistics with SupportsMetadata with Logging {
/**
* Returns whether a file with `path` could be split or not.
*/
Expand Down Expand Up @@ -93,23 +95,28 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin

override def hashCode(): Int = getClass.hashCode()

val maxMetadataValueLength = 100

override def description(): String = {
val maxMetadataValueLength = 100
val metadataStr = getMetaData().toSeq.sorted.map {
case (key, value) =>
val redactedValue =
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value)
key + ": " + StringUtils.abbreviate(redactedValue, maxMetadataValueLength)
}.mkString(", ")
s"${this.getClass.getSimpleName} $metadataStr"
}

override def getMetaData(): Map[String, String] = {
val locationDesc =
fileIndex.getClass.getSimpleName +
Utils.buildLocationMetadata(fileIndex.rootPaths, maxMetadataValueLength)
val metadata: Map[String, String] = Map(
Map(
"Format" -> s"${this.getClass.getSimpleName.replace("Scan", "").toLowerCase(Locale.ROOT)}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Is this a new metadata?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Jul 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Previously we used to print the scan node class name and in the new format, we print it in its own line. Please see the old output in the pr description. We have it printed as ParquetScan.

"ReadSchema" -> readDataSchema.catalogString,
"PartitionFilters" -> seqToString(partitionFilters),
"DataFilters" -> seqToString(dataFilters),
"Location" -> locationDesc)
val metadataStr = metadata.toSeq.sorted.map {
case (key, value) =>
val redactedValue =
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value)
key + ": " + StringUtils.abbreviate(redactedValue, maxMetadataValueLength)
}.mkString(", ")
s"${this.getClass.getSimpleName} $metadataStr"
}

protected def partitions: Seq[FilePartition] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ case class CSVScan(
override def description(): String = {
super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]")
}

override def getMetaData(): Map[String, String] = {
super.getMetaData() ++ Map("PushedFilers" -> seqToString(pushedFilters))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ case class OrcScan(
super.description() + ", PushedFilters: " + seqToString(pushedFilters)
}

override def getMetaData(): Map[String, String] = {
super.getMetaData() ++ Map("PushedFilers" -> seqToString(pushedFilters))
}

override def withFilters(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan =
this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ case class ParquetScan(
super.description() + ", PushedFilters: " + seqToString(pushedFilters)
}

override def getMetaData(): Map[String, String] = {
super.getMetaData() ++ Map("PushedFilers" -> seqToString(pushedFilters))
}

override def withFilters(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan =
this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
Expand Down
50 changes: 49 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, E
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

trait ExplainSuiteHelper extends QueryTest with SharedSparkSession {

Expand Down Expand Up @@ -360,6 +360,54 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
}
}
}

test("Explain formatted output for scan operator for datasource V2") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a table-scan-explain.sql to test it? It's easier to see the result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Agree. Actually i had tried but could not get the V2 scan set up through SQL. Could you please tell me how to do it ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Currently DS v2 scan is enabled only in DataFrameReader, so we can't get it through pure SQL. Then this is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think so...

withTempDir { dir =>
Seq("parquet", "orc", "csv", "json").foreach { fmt =>
val basePath = dir.getCanonicalPath + "/" + fmt
val pushFilterMaps = Map (
"parquet" ->
"|PushedFilers: \\[.*\\(id\\), .*\\(value\\), .*\\(id,1\\), .*\\(value,2\\)\\]",
"orc" ->
"|PushedFilers: \\[.*\\(id\\), .*\\(value\\), .*\\(id,1\\), .*\\(value,2\\)\\]",
"csv" ->
"|PushedFilers: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]",
"json" ->
"|remove_marker"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simply put ""?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun I had tried and it didn't work for me. Perhaps there is a better way to do this. Basically, for JSON, i don't want a line printed for pushedFilters. Putting a "" results in the following as expected output. Here i wanted to get rid of the empty line between PartitionFilters and ReadSchema

\(1\) BatchScan
Output \[2\]: \[value#x, id#x\]
DataFilters: \[isnotnull\(value#x\), \(value#x > 2\)\]
Format: json
Location: InMemoryFileIndex\[.*\]
PartitionFilters: \[isnotnull\(id#x\), \(id#x > 1\)\]

ReadSchema: struct\<value:int\>

)
val expected_plan_fragment1 =
s"""
|\\(1\\) BatchScan
|Output \\[2\\]: \\[value#x, id#x\\]
|DataFilters: \\[isnotnull\\(value#x\\), \\(value#x > 2\\)\\]
|Format: $fmt
|Location: InMemoryFileIndex\\[.*\\]
|PartitionFilters: \\[isnotnull\\(id#x\\), \\(id#x > 1\\)\\]
${pushFilterMaps.get(fmt).get}
|ReadSchema: struct\\<value:int\\>
|""".stripMargin.replaceAll("\nremove_marker", "").trim
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we can remove .replaceAll("\nremove_marker", "") if we fix line 376. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Please see my response above.


spark.range(10)
.select(col("id"), col("id").as("value"))
.write.option("header", true)
.partitionBy("id")
.format(fmt)
.save(basePath)
val readSchema =
StructType(Seq(StructField("id", IntegerType), StructField("value", IntegerType)))
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
val df = spark
.read
.schema(readSchema)
.option("header", true)
.format(fmt)
.load(basePath).where($"id" > 1 && $"value" > 2)
val normalizedOutput = getNormalizedExplain(df, FormattedMode)
assert(expected_plan_fragment1.r.findAllMatchIn(normalizedOutput).length == 1)
}
}
}
}
}

class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuite {
Expand Down