From 7d7c91400334e6721a5e52f7c945997ae01731d9 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 27 Sep 2019 18:17:00 -0700 Subject: [PATCH 1/6] SPARK-29277: Add early DSv2 filter and projection pushdown. --- .../sql/catalyst/optimizer/Optimizer.scala | 14 +- .../datasources/v2/DataSourceV2Relation.scala | 32 ++++- .../spark/sql/execution/SparkOptimizer.scala | 7 +- .../datasources/v2/DataSourceV2Strategy.scala | 126 ++++++------------ .../datasources/v2/PushDownUtils.scala | 103 ++++++++++++++ .../v2/V2ScanRelationPushDown.scala | 64 +++++++++ 6 files changed, 251 insertions(+), 95 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index eab4c3efe4f7f..85b65edbb64b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -119,7 +119,7 @@ abstract class Optimizer(catalogManager: CatalogManager) rulesWithoutInferFiltersFromConstraints: _*) :: Nil } - (Batch("Eliminate Distinct", Once, EliminateDistinct) :: + val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) :: // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). // However, because we also use the analyzer to canonicalized queries (for view definition), @@ -170,6 +170,10 @@ abstract class Optimizer(catalogManager: CatalogManager) RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: Nil ++ operatorOptimizationBatch) :+ + // This batch pushes filters and projections into scan nodes. Before this batch, the logical + // plan may contain nodes that do not report stats. Anything that uses stats must run after + // this batch. + Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: _*) :+ // Since join costs in AQP can change between multiple runs, there is no reason that we have an // idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once. Batch("Join Reorder", FixedPoint(1), @@ -196,6 +200,9 @@ abstract class Optimizer(catalogManager: CatalogManager) RemoveNoopOperators) :+ // This batch must be executed after the `RewriteSubquery` batch, which creates joins. Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) + + // remove any batches with no rules. this may happen when subclasses do not add optional rules. + batches.filter(_.rules.nonEmpty) } /** @@ -253,6 +260,11 @@ abstract class Optimizer(catalogManager: CatalogManager) */ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil + /** + * Override to provide additional rules for early projection and filter pushdown to scans. + */ + def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil + /** * Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that * eventually run in the Optimizer. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 7da502fc298ea..cabfbcbf1ea8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -54,8 +54,34 @@ case class DataSourceV2Relation( table.asReadable.newScanBuilder(options) } + override def newInstance(): DataSourceV2Relation = { + copy(output = output.map(_.newInstance())) + } +} + +/** + * A logical plan for a DSv2 table with a scan already created. + * + * This is used in the optimizer to push filters and projection down before conversion to physical + * plan. This ensures that the stats that are used by the optimizer account for the filters and + * projection that will be pushed down. + * + * @param table a DSv2 [[Table]] + * @param scan a DSv2 [[Scan]] + * @param output the output attributes of this relation + */ +case class DataSourceV2ScanRelation( + table: Table, + scan: Scan, + output: Seq[AttributeReference]) extends LeafNode with NamedRelation { + + override def name: String = table.name() + + override def simpleString(maxFields: Int): String = { + s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" + } + override def computeStats(): Statistics = { - val scan = newScanBuilder().build() scan match { case r: SupportsReportStatistics => val statistics = r.estimateStatistics() @@ -64,10 +90,6 @@ case class DataSourceV2Relation( Statistics(sizeInBytes = conf.defaultSizeInBytes) } } - - override def newInstance(): DataSourceV2Relation = { - copy(output = output.map(_.newInstance())) - } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 4a8b56faddf7e..2619273a89ea8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -20,10 +20,13 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.ExperimentalMethods import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning} import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions import org.apache.spark.sql.execution.datasources.SchemaPruning +import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs} class SparkOptimizer( @@ -32,9 +35,11 @@ class SparkOptimizer( experimentalMethods: ExperimentalMethods) extends Optimizer(catalogManager) { + override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = + PruneFileSourcePartitions :: V2ScanRelationPushDown :: Nil + override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ - Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+ Batch("Schema Pruning", Once, SchemaPruning) :+ Batch("PartitionPruning", Once, PartitionPruning, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 3041e9e82d59f..24d7bfc75c97f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -18,102 +18,40 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} -import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} -import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, Table, TableCapability} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} -import org.apache.spark.sql.sources import org.apache.spark.sql.util.CaseInsensitiveStringMap object DataSourceV2Strategy extends Strategy with PredicateHelper { - /** - * Pushes down filters to the data source reader - * - * @return pushed filter and post-scan filters. - */ - private def pushFilters( - scanBuilder: ScanBuilder, - filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { - scanBuilder match { - case r: SupportsPushDownFilters => - // A map from translated data source leaf node filters to original catalyst filter - // expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially - // pushed down. This map can be used to construct a catalyst filter expression from the - // input filter, or a superset(partial push down filter) of the input filter. - val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression] - val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter] - // Catalyst filter expression that can't be translated to data source filters. - val untranslatableExprs = mutable.ArrayBuffer.empty[Expression] - - for (filterExpr <- filters) { - val translated = - DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr)) - if (translated.isEmpty) { - untranslatableExprs += filterExpr - } else { - translatedFilters += translated.get - } - } - - // Data source filters that need to be evaluated again after scanning. which means - // the data source cannot guarantee the rows returned can pass these filters. - // As a result we must return it so Spark can plan an extra filter operator. - val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter => - DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) - } - // The filters which are marked as pushed to this data source - val pushedFilters = r.pushedFilters().map { filter => - DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) - } - (pushedFilters, untranslatableExprs ++ postScanFilters) - - case _ => (Nil, filters) - } - } - - /** - * Applies column pruning to the data source, w.r.t. the references of the given expressions. - * - * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), - * and new output attributes after column pruning. - */ - // TODO: nested column pruning. - private def pruneColumns( - scanBuilder: ScanBuilder, - relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { - scanBuilder match { - case r: SupportsPushDownRequiredColumns => - val requiredColumns = AttributeSet(exprs.flatMap(_.references)) - val neededOutput = relation.output.filter(requiredColumns.contains) - if (neededOutput != relation.output) { - r.pruneColumns(neededOutput.toStructType) - val scan = r.build() - val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap - scan -> scan.readSchema().toAttributes.map { - // We have to keep the attribute id during transformation. - a => a.withExprId(nameToAttr(a.name).exprId) - } - } else { - r.build() -> relation.output - } - - case _ => scanBuilder.build() -> relation.output - } - } - import DataSourceV2Implicits._ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) => + // projection and filters were already pushed down in the optimizer. + // this uses PhysicalOperation to get the projection and ensure that if the batch scan does + // not support columnar, a projection is added to convert the rows to UnsafeRow. + val batchExec = BatchScanExec(relation.output, relation.scan) + + val filterCondition = filters.reduceLeftOption(And) + val withFilter = filterCondition.map(FilterExec(_, batchExec)).getOrElse(batchExec) + + val withProjection = if (withFilter.output != project || !batchExec.supportsColumnar) { + ProjectExec(project, withFilter) + } else { + batchExec + } + + withProjection :: Nil + case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => val scanBuilder = relation.newScanBuilder() @@ -125,9 +63,10 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = - pushFilters(scanBuilder, normalizedFilters) + PushDownUtils.pushFilters(scanBuilder, normalizedFilters) val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery - val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters) + val (scan, output) = PushDownUtils.pruneColumns( + scanBuilder, relation, project ++ postScanFilters) logInfo( s""" |Pushing operators to ${relation.name} @@ -254,19 +193,19 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { OverwritePartitionsDynamicExec( r.table.asWritable, writeOptions.asOptions, planLater(query)) :: Nil - case DeleteFromTable(r: DataSourceV2Relation, condition) => + case DeleteFromTable(V2TableRelation(table, output), condition) => if (condition.exists(SubqueryExpression.hasSubquery)) { throw new AnalysisException( s"Delete by condition with subquery is not supported: $condition") } // fail if any filter cannot be converted. correctness depends on removing all matching data. - val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, r.output) + val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, output) .flatMap(splitConjunctivePredicates(_).map { f => DataSourceStrategy.translateFilter(f).getOrElse( throw new AnalysisException(s"Exec update failed:" + s" cannot translate expression to source filter: $f")) }).toArray - DeleteFromTableExec(r.table.asDeletable, filters) :: Nil + DeleteFromTableExec(table.asDeletable, filters) :: Nil case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil @@ -283,8 +222,8 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { Nil } - case desc @ DescribeTable(r: DataSourceV2Relation, isExtended) => - DescribeTableExec(desc.output, r.table, isExtended) :: Nil + case desc @ DescribeTable(V2TableRelation(table, _), isExtended) => + DescribeTableExec(desc.output, table, isExtended) :: Nil case DropTable(catalog, ident, ifExists) => DropTableExec(catalog, ident, ifExists) :: Nil @@ -309,4 +248,15 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case _ => Nil } + + private object V2TableRelation { + def unapply(plan: LogicalPlan): Option[(Table, Seq[AttributeReference])] = plan match { + case r: DataSourceV2Relation => + Some((r.table, r.output)) + case r: DataSourceV2ScanRelation => + Some((r.table, r.output)) + case _ => + None + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala new file mode 100644 index 0000000000000..634ecfdf7e1d5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, PredicateHelper} +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources + +object PushDownUtils extends PredicateHelper { + /** + * Pushes down filters to the data source reader + * + * @return pushed filter and post-scan filters. + */ + def pushFilters( + scanBuilder: ScanBuilder, + filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { + scanBuilder match { + case r: SupportsPushDownFilters => + // A map from translated data source leaf node filters to original catalyst filter + // expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially + // pushed down. This map can be used to construct a catalyst filter expression from the + // input filter, or a superset(partial push down filter) of the input filter. + val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression] + val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter] + // Catalyst filter expression that can't be translated to data source filters. + val untranslatableExprs = mutable.ArrayBuffer.empty[Expression] + + for (filterExpr <- filters) { + val translated = + DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr)) + if (translated.isEmpty) { + untranslatableExprs += filterExpr + } else { + translatedFilters += translated.get + } + } + + // Data source filters that need to be evaluated again after scanning. which means + // the data source cannot guarantee the rows returned can pass these filters. + // As a result we must return it so Spark can plan an extra filter operator. + val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter => + DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) + } + // The filters which are marked as pushed to this data source + val pushedFilters = r.pushedFilters().map { filter => + DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) + } + (pushedFilters, untranslatableExprs ++ postScanFilters) + + case _ => (Nil, filters) + } + } + + /** + * Applies column pruning to the data source, w.r.t. the references of the given expressions. + * + * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), + * and new output attributes after column pruning. + */ + // TODO: nested column pruning. + def pruneColumns( + scanBuilder: ScanBuilder, + relation: DataSourceV2Relation, + exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + scanBuilder match { + case r: SupportsPushDownRequiredColumns => + val requiredColumns = AttributeSet(exprs.flatMap(_.references)) + val neededOutput = relation.output.filter(requiredColumns.contains) + if (neededOutput != relation.output) { + r.pruneColumns(neededOutput.toStructType) + val scan = r.build() + val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap + scan -> scan.readSchema().toAttributes.map { + // We have to keep the attribute id during transformation. + a => a.withExprId(nameToAttr(a.name).exprId) + } + } else { + r.build() -> relation.output + } + + case _ => scanBuilder.build() -> relation.output + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala new file mode 100644 index 0000000000000..c659816eb244e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.{And, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy + +object V2ScanRelationPushDown extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { + case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => + val scanBuilder = relation.newScanBuilder() + + val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery) + val normalizedFilters = DataSourceStrategy.normalizeFilters( + withoutSubquery, relation.output) + + // `pushedFilters` will be pushed down and evaluated in the underlying data sources. + // `postScanFilters` need to be evaluated after the scan. + // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. + val (pushedFilters, postScanFiltersWithoutSubquery) = + PushDownUtils.pushFilters(scanBuilder, normalizedFilters) + val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery + val (scan, output) = PushDownUtils.pruneColumns( + scanBuilder, relation, project ++ postScanFilters) + logInfo( + s""" + |Pushing operators to ${relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + |Output: ${output.mkString(", ")} + """.stripMargin) + + val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output) + + val filterCondition = postScanFilters.reduceLeftOption(And) + val withFilter = filterCondition.map(Filter(_, scanRelation)).getOrElse(scanRelation) + + val withProjection = if (withFilter.output != project) { + Project(project, withFilter) + } else { + withFilter + } + + withProjection + } +} From e7beb5b8a987210694b0b0efd96d90daba3a5a0c Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 2 Oct 2019 12:58:57 -0700 Subject: [PATCH 2/6] Update after test failures. --- .../spark/sql/execution/SparkOptimizer.scala | 4 +-- .../datasources/v2/DataSourceV2Strategy.scala | 6 ++-- .../v2/V2ScanRelationPushDown.scala | 4 +-- .../spark/sql/FileBasedDataSourceSuite.scala | 10 +++++-- .../execution/datasources/orc/OrcTest.scala | 18 +++++++++--- .../orc/OrcV2SchemaPruningSuite.scala | 2 +- .../parquet/ParquetFilterSuite.scala | 28 ++++++++++++++++--- .../datasources/orc/OrcFilterSuite.scala | 15 +++++++--- 8 files changed, 64 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 2619273a89ea8..3ee423f05fca6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -36,11 +36,11 @@ class SparkOptimizer( extends Optimizer(catalogManager) { override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = - PruneFileSourcePartitions :: V2ScanRelationPushDown :: Nil + // TODO: move SchemaPruning into catalyst + SchemaPruning :: PruneFileSourcePartitions :: V2ScanRelationPushDown :: Nil override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ - Batch("Schema Pruning", Once, SchemaPruning) :+ Batch("PartitionPruning", Once, PartitionPruning, OptimizeSubqueries) :+ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 24d7bfc75c97f..10f79aed21c76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -47,7 +47,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { val withProjection = if (withFilter.output != project || !batchExec.supportsColumnar) { ProjectExec(project, withFilter) } else { - batchExec + withFilter } withProjection :: Nil @@ -62,8 +62,8 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. - val (pushedFilters, postScanFiltersWithoutSubquery) = - PushDownUtils.pushFilters(scanBuilder, normalizedFilters) + val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( + scanBuilder, normalizedFilters) val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery val (scan, output) = PushDownUtils.pruneColumns( scanBuilder, relation, project ++ postScanFilters) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index c659816eb244e..e3fa8e5bededc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -35,8 +35,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] { // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. - val (pushedFilters, postScanFiltersWithoutSubquery) = - PushDownUtils.pushFilters(scanBuilder, normalizedFilters) + val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( + scanBuilder, normalizedFilters) val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery val (scan, output) = PushDownUtils.pruneColumns( scanBuilder, relation, project ++ postScanFilters) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index d08f4b9066d2b..278fcffad4720 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.io.{File, FilenameFilter, FileNotFoundException} +import java.io.{File, FileNotFoundException} import java.nio.file.{Files, StandardOpenOption} import java.util.Locale @@ -27,9 +27,11 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} -import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT} +import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ @@ -722,6 +724,8 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { df.queryExecution.optimizedPlan match { case PhysicalOperation(_, _, DataSourceV2Relation(table: ParquetTable, _, _)) => assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet) + case PhysicalOperation(_, _, DataSourceV2ScanRelation(table: ParquetTable, _, _)) => + assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet) case _ => throw new AnalysisException("Can not match ParquetTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index adbd93dcb4fe8..49c16e4012f02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -28,8 +28,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileBasedDataSourceTest} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} +import org.apache.spark.sql.execution.datasources.v2.orc.{OrcScan, OrcTable} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION @@ -118,8 +118,7 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor .where(Column(predicate)) query.queryExecution.optimizedPlan match { - case PhysicalOperation(_, filters, - DataSourceV2Relation(orcTable: OrcTable, _, options)) => + case PhysicalOperation(_, filters, DataSourceV2Relation(orcTable: OrcTable, _, options)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") val scanBuilder = orcTable.newScanBuilder(options) scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) @@ -132,6 +131,17 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters") } + case PhysicalOperation(_, filters, + DataSourceV2ScanRelation(_, OrcScan(_, _, _, _, _, _, _, pushedFilters), _)) => + assert(filters.nonEmpty, "No filter is analyzed from the given query") + if (noneSupported) { + assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters") + } else { + assert(pushedFilters.nonEmpty, "No filter is pushed down") + val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters) + assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters") + } + case _ => throw new AnalysisException("Can not match OrcTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala index b626edf5dc28e..80cfbd6a02676 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.orc import org.apache.spark.SparkConf -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.execution.datasources.SchemaPruningSuite import org.apache.spark.sql.execution.datasources.v2.BatchScanExec diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 9671866fe1535..c022ef419a15f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -33,9 +33,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.execution.datasources.orc.OrcFilters -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} +import org.apache.spark.sql.execution.datasources.v2.parquet.{ParquetScan, ParquetTable} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType @@ -1484,7 +1483,7 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { query.queryExecution.optimizedPlan.collectFirst { case PhysicalOperation(_, filters, - DataSourceV2Relation(parquetTable: ParquetTable, _, options)) => + DataSourceV2Relation(parquetTable: ParquetTable, _, options)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") val scanBuilder = parquetTable.newScanBuilder(options) val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter).toArray @@ -1506,6 +1505,27 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { checker(stripSparkFilter(query), expected) + case PhysicalOperation(_, filters, + DataSourceV2ScanRelation(_, scan: ParquetScan, _)) => + assert(filters.nonEmpty, "No filter is analyzed from the given query") + val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter).toArray + val pushedFilters = scan.pushedFilters + assert(pushedFilters.nonEmpty, "No filter is pushed down") + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) + val parquetFilters = createParquetFilters(schema) + // In this test suite, all the simple predicates are convertible here. + assert(parquetFilters.convertibleFilters(sourceFilters) === pushedFilters) + val pushedParquetFilters = pushedFilters.map { pred => + val maybeFilter = parquetFilters.createFilter(pred) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") + maybeFilter.get + } + // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) + assert(pushedParquetFilters.exists(_.getClass === filterClass), + s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") + + checker(stripSparkFilter(query), expected) + case _ => throw new AnalysisException("Can not match ParquetTable in the query.") } diff --git a/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index b1a907f9cba27..55084b31453c4 100644 --- a/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -30,8 +30,8 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} +import org.apache.spark.sql.execution.datasources.v2.orc.{OrcScan, OrcTable} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -54,8 +54,7 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { .where(Column(predicate)) query.queryExecution.optimizedPlan match { - case PhysicalOperation(_, filters, - DataSourceV2Relation(orcTable: OrcTable, _, options)) => + case PhysicalOperation(_, filters, DataSourceV2Relation(orcTable: OrcTable, _, options)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") val scanBuilder = orcTable.newScanBuilder(options) scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) @@ -65,6 +64,14 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pushedFilters") checker(maybeFilter.get) + case PhysicalOperation(_, filters, + DataSourceV2ScanRelation(_, OrcScan(_, _, _, _, _, _, _, pushedFilters), _)) => + assert(filters.nonEmpty, "No filter is analyzed from the given query") + assert(pushedFilters.nonEmpty, "No filter is pushed down") + val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pushedFilters") + checker(maybeFilter.get) + case _ => throw new AnalysisException("Can not match OrcTable in the query.") } From fb7f54d210c5fc89139342b76e6612530e4130cd Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 3 Oct 2019 15:11:01 -0700 Subject: [PATCH 3/6] Fix more test failures. --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- .../org/apache/spark/sql/FileBasedDataSourceSuite.scala | 2 +- .../org/apache/spark/sql/connector/DataSourceV2Suite.scala | 7 ++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5f6e0a82be4ce..09986876e53d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -51,7 +51,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, FileTable} import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.execution.stat.StatFunctions import org.apache.spark.sql.internal.SQLConf @@ -3218,6 +3218,8 @@ class Dataset[T] private[sql]( fr.inputFiles case r: HiveTableRelation => r.tableMeta.storage.locationUri.map(_.toString).toArray + case DataSourceV2ScanRelation(table: FileTable, _, _) => + table.fileIndex.inputFiles case DataSourceV2Relation(table: FileTable, _, _) => table.fileIndex.inputFiles }.flatten diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 278fcffad4720..982f82d5109f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -666,7 +666,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { dir.delete() spark.range(1000).write.orc(dir.toString) val df = spark.read.orc(dir.toString) - assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(getLocalDirSize(dir))) + assert(df.queryExecution.optimizedPlan.stats.sizeInBytes === BigInt(getLocalDirSize(dir))) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 138bbc3f04f64..6d090fbf5b13f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapabil import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.read.partitioning.{ClusteredDistribution, Distribution, Partitioning} -import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation} +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, DataSourceV2ScanRelation} import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.functions._ @@ -196,6 +196,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession { val df = spark.read.format(cls.getName).load() val logical = df.queryExecution.optimizedPlan.collect { case d: DataSourceV2Relation => d + case d: DataSourceV2ScanRelation => d }.head val statics = logical.computeStats() @@ -332,7 +333,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession { test("SPARK-23315: get output from canonicalized data source v2 related plans") { def checkCanonicalizedOutput( df: DataFrame, logicalNumOutput: Int, physicalNumOutput: Int): Unit = { - val logical = df.queryExecution.optimizedPlan.collect { + val logical = df.queryExecution.logical.collect { case d: DataSourceV2Relation => d }.head assert(logical.canonicalized.output.length == logicalNumOutput) @@ -356,7 +357,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession { .read .option(optionName, false) .format(classOf[DataSourceV2WithSessionConfig].getName).load() - val options = df.queryExecution.optimizedPlan.collectFirst { + val options = df.queryExecution.logical.collectFirst { case d: DataSourceV2Relation => d.options }.get assert(options.get(optionName) === "false") From 621224a85ae760714dda902120e4ba5e88b70c55 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 4 Oct 2019 15:31:33 -0700 Subject: [PATCH 4/6] Add computeStats back to DataSourceV2Relation. --- .../datasources/v2/DataSourceV2Relation.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index cabfbcbf1ea8b..ed0a07fd0ecab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2S import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils /** * A logical plan representing a data source v2 table. @@ -54,6 +55,25 @@ case class DataSourceV2Relation( table.asReadable.newScanBuilder(options) } + override def computeStats(): Statistics = { + if (Utils.isTesting) { + // when testing, throw an exception if this computeStats method is called because stats should + // not be accessed before pushing the projection and filters to create a scan. otherwise, the + // stats are not accurate because they are based on a full table scan of all columns. + throw new UnsupportedOperationException( + s"BUG: computeStats called before pushdown on DSv2 relation: $name") + } else { + // when not testing, return stats because bad stats are better than failing a query + newScanBuilder() match { + case r: SupportsReportStatistics => + val statistics = r.estimateStatistics() + DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes) + case _ => + Statistics(sizeInBytes = conf.defaultSizeInBytes) + } + } + } + override def newInstance(): DataSourceV2Relation = { copy(output = output.map(_.newInstance())) } From 42207236a814c8578806962d5f263c90c6c42488 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 17 Oct 2019 16:36:00 -0700 Subject: [PATCH 5/6] Require conversion to DataSourceV2ScanRelation. --- .../sql/catalyst/analysis/Analyzer.scala | 14 ++++- .../catalyst/plans/logical/v2Commands.scala | 6 +- .../sql/connector/catalog/CatalogV2Util.scala | 7 ++- .../datasources/v2/DataSourceV2Relation.scala | 8 +-- .../scala/org/apache/spark/sql/Dataset.scala | 4 +- .../spark/sql/execution/SparkOptimizer.scala | 3 +- .../datasources/v2/DataSourceV2Strategy.scala | 55 ++----------------- .../v2/V2ScanRelationPushDown.scala | 4 +- .../spark/sql/FileBasedDataSourceSuite.scala | 6 +- .../sql/connector/DataSourceV2Suite.scala | 1 - .../execution/datasources/orc/OrcTest.scala | 19 +------ .../parquet/ParquetFilterSuite.scala | 27 +-------- .../datasources/orc/OrcFilterSuite.scala | 15 +---- 13 files changed, 39 insertions(+), 130 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 21bf926af50d7..e50b829030f40 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -681,10 +681,18 @@ class Analyzer( .map(v2Relation => i.copy(table = v2Relation)) .getOrElse(i) + case desc @ DescribeTable(u: UnresolvedV2Relation, _) => + CatalogV2Util.loadRelation(u.catalog, u.tableName) + .map(rel => desc.copy(table = rel)) + .getOrElse(desc) + + case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) => + CatalogV2Util.loadRelation(u.catalog, u.tableName) + .map(rel => alter.copy(table = rel)) + .getOrElse(alter) + case u: UnresolvedV2Relation => - CatalogV2Util.loadTable(u.catalog, u.tableName).map { table => - DataSourceV2Relation.create(table) - }.getOrElse(u) + CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index f587ee2928fc2..3c625e9acb5a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -271,7 +271,7 @@ case class ShowNamespaces( */ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Command { - override def children: Seq[LogicalPlan] = Seq(table) + override lazy val resolved: Boolean = table.resolved override def output: Seq[Attribute] = DescribeTableSchema.describeTableAttributes() } @@ -313,9 +313,7 @@ case class AlterTable( table: NamedRelation, changes: Seq[TableChange]) extends Command { - override def children: Seq[LogicalPlan] = Seq(table) - - override lazy val resolved: Boolean = childrenResolved && { + override lazy val resolved: Boolean = table.resolved && { changes.forall { case add: AddColumn => add.fieldNames match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 6d8c6f8456412..0f313e7b96d86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -24,9 +24,10 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation} +import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation} import org.apache.spark.sql.catalyst.plans.logical.AlterTable import org.apache.spark.sql.connector.catalog.TableChange._ +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} private[sql] object CatalogV2Util { @@ -224,6 +225,10 @@ private[sql] object CatalogV2Util { case _: NoSuchNamespaceException => None } + def loadRelation(catalog: CatalogPlugin, ident: Identifier): Option[NamedRelation] = { + loadTable(catalog, ident).map(DataSourceV2Relation.create) + } + def isSessionCatalog(catalog: CatalogPlugin): Boolean = { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index ed0a07fd0ecab..87d3419e8115f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -51,20 +51,16 @@ case class DataSourceV2Relation( s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" } - def newScanBuilder(): ScanBuilder = { - table.asReadable.newScanBuilder(options) - } - override def computeStats(): Statistics = { if (Utils.isTesting) { // when testing, throw an exception if this computeStats method is called because stats should // not be accessed before pushing the projection and filters to create a scan. otherwise, the // stats are not accurate because they are based on a full table scan of all columns. - throw new UnsupportedOperationException( + throw new IllegalStateException( s"BUG: computeStats called before pushdown on DSv2 relation: $name") } else { // when not testing, return stats because bad stats are better than failing a query - newScanBuilder() match { + table.asReadable.newScanBuilder(options) match { case r: SupportsReportStatistics => val statistics = r.estimateStatistics() DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 09986876e53d1..607f495139ec1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -51,7 +51,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, FileTable} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileTable} import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.execution.stat.StatFunctions import org.apache.spark.sql.internal.SQLConf @@ -3220,8 +3220,6 @@ class Dataset[T] private[sql]( r.tableMeta.storage.locationUri.map(_.toString).toArray case DataSourceV2ScanRelation(table: FileTable, _, _) => table.fileIndex.inputFiles - case DataSourceV2Relation(table: FileTable, _, _) => - table.fileIndex.inputFiles }.flatten files.toSet.toArray } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 3ee423f05fca6..e65faefad5b9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -69,7 +69,8 @@ class SparkOptimizer( override def nonExcludableRules: Seq[String] = super.nonExcludableRules :+ ExtractPythonUDFFromJoinCondition.ruleName :+ ExtractPythonUDFFromAggregate.ruleName :+ ExtractGroupingPythonUDFFromAggregate.ruleName :+ - ExtractPythonUDFs.ruleName + ExtractPythonUDFs.ruleName :+ + V2ScanRelationPushDown.ruleName /** * Optimization batches that are executed before the regular optimization batches (also before diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 10f79aed21c76..bc66c154b57ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ import org.apache.spark.sql.{AnalysisException, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} -import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy @@ -52,42 +52,6 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { withProjection :: Nil - case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.newScanBuilder() - - val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery) - val normalizedFilters = DataSourceStrategy.normalizeFilters( - withoutSubquery, relation.output) - - // `pushedFilters` will be pushed down and evaluated in the underlying data sources. - // `postScanFilters` need to be evaluated after the scan. - // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. - val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( - scanBuilder, normalizedFilters) - val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery - val (scan, output) = PushDownUtils.pruneColumns( - scanBuilder, relation, project ++ postScanFilters) - logInfo( - s""" - |Pushing operators to ${relation.name} - |Pushed Filters: ${pushedFilters.mkString(", ")} - |Post-Scan Filters: ${postScanFilters.mkString(",")} - |Output: ${output.mkString(", ")} - """.stripMargin) - - val batchExec = BatchScanExec(output, scan) - - val filterCondition = postScanFilters.reduceLeftOption(And) - val withFilter = filterCondition.map(FilterExec(_, batchExec)).getOrElse(batchExec) - - val withProjection = if (withFilter.output != project || !batchExec.supportsColumnar) { - ProjectExec(project, withFilter) - } else { - withFilter - } - - withProjection :: Nil - case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined => val microBatchStream = r.stream.asInstanceOf[MicroBatchStream] val scanExec = MicroBatchScanExec( @@ -193,7 +157,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { OverwritePartitionsDynamicExec( r.table.asWritable, writeOptions.asOptions, planLater(query)) :: Nil - case DeleteFromTable(V2TableRelation(table, output), condition) => + case DeleteFromTable(DataSourceV2ScanRelation(table, _, output), condition) => if (condition.exists(SubqueryExpression.hasSubquery)) { throw new AnalysisException( s"Delete by condition with subquery is not supported: $condition") @@ -222,7 +186,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { Nil } - case desc @ DescribeTable(V2TableRelation(table, _), isExtended) => + case desc @ DescribeTable(DataSourceV2Relation(table, _, _), isExtended) => DescribeTableExec(desc.output, table, isExtended) :: Nil case DropTable(catalog, ident, ifExists) => @@ -248,15 +212,4 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case _ => Nil } - - private object V2TableRelation { - def unapply(plan: LogicalPlan): Option[(Table, Seq[AttributeReference])] = plan match { - case r: DataSourceV2Relation => - Some((r.table, r.output)) - case r: DataSourceV2ScanRelation => - Some((r.table, r.output)) - case _ => - None - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index e3fa8e5bededc..6aa8d989583d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -24,9 +24,11 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.DataSourceStrategy object V2ScanRelationPushDown extends Rule[LogicalPlan] { + import DataSourceV2Implicits._ + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.newScanBuilder() + val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery) val normalizedFilters = DataSourceStrategy.normalizeFilters( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 982f82d5109f9..a7f3e81904de0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -29,9 +29,7 @@ import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ @@ -722,8 +720,6 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { .option("path", paths.head.getCanonicalPath) .parquet(paths(1).getCanonicalPath, paths(2).getCanonicalPath) df.queryExecution.optimizedPlan match { - case PhysicalOperation(_, _, DataSourceV2Relation(table: ParquetTable, _, _)) => - assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet) case PhysicalOperation(_, _, DataSourceV2ScanRelation(table: ParquetTable, _, _)) => assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet) case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 6d090fbf5b13f..55c71c7d02d2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -195,7 +195,6 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession { withClue(cls.getName) { val df = spark.read.format(cls.getName).load() val logical = df.queryExecution.optimizedPlan.collect { - case d: DataSourceV2Relation => d case d: DataSourceV2ScanRelation => d }.head diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index 49c16e4012f02..528c3474a17c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -27,9 +27,9 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileBasedDataSourceTest} -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} -import org.apache.spark.sql.execution.datasources.v2.orc.{OrcScan, OrcTable} +import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION @@ -118,19 +118,6 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor .where(Column(predicate)) query.queryExecution.optimizedPlan match { - case PhysicalOperation(_, filters, DataSourceV2Relation(orcTable: OrcTable, _, options)) => - assert(filters.nonEmpty, "No filter is analyzed from the given query") - val scanBuilder = orcTable.newScanBuilder(options) - scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) - val pushedFilters = scanBuilder.pushedFilters() - if (noneSupported) { - assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters") - } else { - assert(pushedFilters.nonEmpty, "No filter is pushed down") - val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters) - assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters") - } - case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, OrcScan(_, _, _, _, _, _, _, pushedFilters), _)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index c022ef419a15f..286bb1e920266 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -33,8 +33,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} -import org.apache.spark.sql.execution.datasources.v2.parquet.{ParquetScan, ParquetTable} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType @@ -1482,29 +1482,6 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { .where(Column(predicate)) query.queryExecution.optimizedPlan.collectFirst { - case PhysicalOperation(_, filters, - DataSourceV2Relation(parquetTable: ParquetTable, _, options)) => - assert(filters.nonEmpty, "No filter is analyzed from the given query") - val scanBuilder = parquetTable.newScanBuilder(options) - val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter).toArray - scanBuilder.pushFilters(sourceFilters) - val pushedFilters = scanBuilder.pushedFilters() - assert(pushedFilters.nonEmpty, "No filter is pushed down") - val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) - val parquetFilters = createParquetFilters(schema) - // In this test suite, all the simple predicates are convertible here. - assert(parquetFilters.convertibleFilters(sourceFilters) === pushedFilters) - val pushedParquetFilters = pushedFilters.map { pred => - val maybeFilter = parquetFilters.createFilter(pred) - assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") - maybeFilter.get - } - // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) - assert(pushedParquetFilters.exists(_.getClass === filterClass), - s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") - - checker(stripSparkFilter(query), expected) - case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, scan: ParquetScan, _)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") diff --git a/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 55084b31453c4..80e330b3f2ae2 100644 --- a/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -29,9 +29,8 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} -import org.apache.spark.sql.execution.datasources.v2.orc.{OrcScan, OrcTable} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -54,16 +53,6 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { .where(Column(predicate)) query.queryExecution.optimizedPlan match { - case PhysicalOperation(_, filters, DataSourceV2Relation(orcTable: OrcTable, _, options)) => - assert(filters.nonEmpty, "No filter is analyzed from the given query") - val scanBuilder = orcTable.newScanBuilder(options) - scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) - val pushedFilters = scanBuilder.pushedFilters() - assert(pushedFilters.nonEmpty, "No filter is pushed down") - val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters) - assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pushedFilters") - checker(maybeFilter.get) - case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, OrcScan(_, _, _, _, _, _, _, pushedFilters), _)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") From c223e0537796c1e0a52bc5a655660f89ab539192 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 30 Oct 2019 11:20:37 -0700 Subject: [PATCH 6/6] Add Describe and Alter checks to CheckAnalysis. --- .../sql/catalyst/analysis/CheckAnalysis.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index d9dc9ebbcaf3c..72612d1dc76c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -104,6 +104,20 @@ trait CheckAnalysis extends PredicateHelper { case u: UnresolvedV2Relation => u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") + case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) => + u.failAnalysis( + s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.") + + case AlterTable(_, _, u: UnresolvedV2Relation, _) => + failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") + + case DescribeTable(u: UnresolvedV2Relation, _) if isView(u.originalNameParts) => + u.failAnalysis( + s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.") + + case DescribeTable(u: UnresolvedV2Relation, _) => + failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") + case operator: LogicalPlan => // Check argument data types of higher-order functions downwards first. // If the arguments of the higher-order functions are resolved but the type check fails,