Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -681,10 +681,18 @@ class Analyzer(
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)

case desc @ DescribeTable(u: UnresolvedV2Relation, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for incorporating these!

CatalogV2Util.loadRelation(u.catalog, u.tableName)
.map(rel => desc.copy(table = rel))
.getOrElse(desc)

case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested out a trait that worked for all of the plans that need to be resolved here, but the code was longer with the trait and implementations. If we need it later because we have more cases in this rule, it should be easy to add. I don't think we need it right now.

CatalogV2Util.loadRelation(u.catalog, u.tableName)
.map(rel => alter.copy(table = rel))
.getOrElse(alter)

case u: UnresolvedV2Relation =>
CatalogV2Util.loadTable(u.catalog, u.tableName).map { table =>
DataSourceV2Relation.create(table)
}.getOrElse(u)
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ trait CheckAnalysis extends PredicateHelper {
case u: UnresolvedV2Relation =>
u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")

case AlterTable(_, _, u: UnresolvedV2Relation, _) =>
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case DescribeTable(u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")

case DescribeTable(u: UnresolvedV2Relation, _) =>
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the type check fails,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
rulesWithoutInferFiltersFromConstraints: _*) :: Nil
}

(Batch("Eliminate Distinct", Once, EliminateDistinct) ::
val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
Expand Down Expand Up @@ -170,6 +170,10 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
// This batch pushes filters and projections into scan nodes. Before this batch, the logical
// plan may contain nodes that do not report stats. Anything that uses stats must run after
// this batch.
Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: _*) :+
// Since join costs in AQP can change between multiple runs, there is no reason that we have an
// idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once.
Batch("Join Reorder", FixedPoint(1),
Expand All @@ -196,6 +200,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveNoopOperators) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)

// remove any batches with no rules. this may happen when subclasses do not add optional rules.
batches.filter(_.rules.nonEmpty)
}

/**
Expand Down Expand Up @@ -253,6 +260,11 @@ abstract class Optimizer(catalogManager: CatalogManager)
*/
def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Override to provide additional rules for early projection and filter pushdown to scans.
*/
def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that
* eventually run in the Optimizer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ case class ShowNamespaces(
*/
case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Command {

override def children: Seq[LogicalPlan] = Seq(table)
override lazy val resolved: Boolean = table.resolved

override def output: Seq[Attribute] = DescribeTableSchema.describeTableAttributes()
}
Expand Down Expand Up @@ -313,9 +313,7 @@ case class AlterTable(
table: NamedRelation,
changes: Seq[TableChange]) extends Command {

override def children: Seq[LogicalPlan] = Seq(table)

override lazy val resolved: Boolean = childrenResolved && {
override lazy val resolved: Boolean = table.resolved && {
changes.forall {
case add: AddColumn =>
add.fieldNames match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.plans.logical.AlterTable
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}

private[sql] object CatalogV2Util {
Expand Down Expand Up @@ -224,6 +225,10 @@ private[sql] object CatalogV2Util {
case _: NoSuchNamespaceException => None
}

def loadRelation(catalog: CatalogPlugin, ident: Identifier): Option[NamedRelation] = {
loadTable(catalog, ident).map(DataSourceV2Relation.create)
}

def isSessionCatalog(catalog: CatalogPlugin): Boolean = {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2S
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

/**
* A logical plan representing a data source v2 table.
Expand All @@ -50,12 +51,53 @@ case class DataSourceV2Relation(
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
}

def newScanBuilder(): ScanBuilder = {
table.asReadable.newScanBuilder(options)
override def computeStats(): Statistics = {
if (Utils.isTesting) {
// when testing, throw an exception if this computeStats method is called because stats should
// not be accessed before pushing the projection and filters to create a scan. otherwise, the
// stats are not accurate because they are based on a full table scan of all columns.
throw new IllegalStateException(
s"BUG: computeStats called before pushdown on DSv2 relation: $name")
} else {
// when not testing, return stats because bad stats are better than failing a query
table.asReadable.newScanBuilder(options) match {
case r: SupportsReportStatistics =>
val statistics = r.estimateStatistics()
DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes)
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
}
}

override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
}

/**
* A logical plan for a DSv2 table with a scan already created.
*
* This is used in the optimizer to push filters and projection down before conversion to physical
* plan. This ensures that the stats that are used by the optimizer account for the filters and
* projection that will be pushed down.
*
* @param table a DSv2 [[Table]]
* @param scan a DSv2 [[Scan]]
* @param output the output attributes of this relation
*/
case class DataSourceV2ScanRelation(
table: Table,
scan: Scan,
output: Seq[AttributeReference]) extends LeafNode with NamedRelation {

override def name: String = table.name()

override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
}

override def computeStats(): Statistics = {
val scan = newScanBuilder().build()
scan match {
case r: SupportsReportStatistics =>
val statistics = r.estimateStatistics()
Expand All @@ -64,10 +106,6 @@ case class DataSourceV2Relation(
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
}

override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
}

/**
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -3218,7 +3218,7 @@ class Dataset[T] private[sql](
fr.inputFiles
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
case DataSourceV2Relation(table: FileTable, _, _) =>
case DataSourceV2ScanRelation(table: FileTable, _, _) =>
table.fileIndex.inputFiles
}.flatten
files.toSet.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning}
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.datasources.SchemaPruning
import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown
import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs}

class SparkOptimizer(
Expand All @@ -32,10 +35,12 @@ class SparkOptimizer(
experimentalMethods: ExperimentalMethods)
extends Optimizer(catalogManager) {

override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
// TODO: move SchemaPruning into catalyst
SchemaPruning :: PruneFileSourcePartitions :: V2ScanRelationPushDown :: Nil

override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
Batch("Schema Pruning", Once, SchemaPruning) :+
Batch("PartitionPruning", Once,
PartitionPruning,
OptimizeSubqueries) :+
Expand Down Expand Up @@ -64,7 +69,8 @@ class SparkOptimizer(
override def nonExcludableRules: Seq[String] = super.nonExcludableRules :+
ExtractPythonUDFFromJoinCondition.ruleName :+
ExtractPythonUDFFromAggregate.ruleName :+ ExtractGroupingPythonUDFFromAggregate.ruleName :+
ExtractPythonUDFs.ruleName
ExtractPythonUDFs.ruleName :+
V2ScanRelationPushDown.ruleName
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only DataSourceV2ScanRelation will be converted to a physical scan node, so the early push-down rule is now required.


/**
* Optimization batches that are executed before the regular optimization batches (also before
Expand Down
Loading