diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index a64e8bcd68175..0cf5558d49a08 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -114,7 +114,7 @@ case class OrderedJoin( /** * Reorder the joins using a dynamic programming algorithm. This implementation is based on the * paper: Access Path Selection in a Relational Database Management System. - * http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf + * https://dl.acm.org/doi/10.1145/582095.582099 * * First we put all items (basic joined nodes) into level 0, then we build all two-way joins * at level 1 from plans at level 0 (single items), then build all 3-way joins from plans diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0d1a3e365c918..18ffc655b2174 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -951,6 +951,17 @@ object SQLConf { .checkValue(_ > 0, "the value of spark.sql.sources.bucketing.maxBuckets must be greater than 0") .createWithDefault(100000) + val AUTO_BUCKETED_SCAN_ENABLED = + buildConf("spark.sql.sources.bucketing.autoBucketedScan.enabled") + .doc("When true, decide whether to do bucketed scan on input tables based on query plan " + + "automatically. Do not use bucketed scan if 1. query does not have operators to utilize " + + "bucketing (e.g. join, group-by, etc), or 2. there's an exchange operator between these " + + s"operators and table scan. Note when '${BUCKETING_ENABLED.key}' is set to " + + "false, this configuration does not take any effect.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") .internal() .doc("When false, we will throw an error if a query contains a cartesian product without " + @@ -3164,6 +3175,8 @@ class SQLConf extends Serializable with Logging { def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS) + def autoBucketedScanEnabled: Boolean = getConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED) + def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 1b9ca63ea21d3..45d28ddb42fc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -156,7 +156,9 @@ case class RowDataSourceScanExec( * @param optionalBucketSet Bucket ids for bucket pruning. * @param optionalNumCoalescedBuckets Number of coalesced buckets. * @param dataFilters Filters on non-partition columns. - * @param tableIdentifier identifier for the table in the metastore. + * @param tableIdentifier Identifier for the table in the metastore. + * @param disableBucketedScan Disable bucketed scan based on physical query plan, see rule + * [[DisableUnnecessaryBucketedScan]] for details. */ case class FileSourceScanExec( @transient relation: HadoopFsRelation, @@ -166,7 +168,8 @@ case class FileSourceScanExec( optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], dataFilters: Seq[Expression], - tableIdentifier: Option[TableIdentifier]) + tableIdentifier: Option[TableIdentifier], + disableBucketedScan: Boolean = false) extends DataSourceScanExec { // Note that some vals referring the file-based relation are lazy intentionally @@ -257,7 +260,8 @@ case class FileSourceScanExec( // exposed for testing lazy val bucketedScan: Boolean = { - if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined) { + if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined + && !disableBucketedScan) { val spec = relation.bucketSpec.get val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) bucketColumns.size == spec.bucketColumnNames.size @@ -348,20 +352,23 @@ case class FileSourceScanExec( "DataFilters" -> seqToString(dataFilters), "Location" -> locationDesc) - val withSelectedBucketsCount = relation.bucketSpec.map { spec => - val numSelectedBuckets = optionalBucketSet.map { b => - b.cardinality() + // TODO(SPARK-32986): Add bucketed scan info in explain output of FileSourceScanExec + if (bucketedScan) { + relation.bucketSpec.map { spec => + val numSelectedBuckets = optionalBucketSet.map { b => + b.cardinality() + } getOrElse { + spec.numBuckets + } + metadata + ("SelectedBucketsCount" -> + (s"$numSelectedBuckets out of ${spec.numBuckets}" + + optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse(""))) } getOrElse { - spec.numBuckets + metadata } - metadata + ("SelectedBucketsCount" -> - (s"$numSelectedBuckets out of ${spec.numBuckets}" + - optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse(""))) - } getOrElse { + } else { metadata } - - withSelectedBucketsCount } override def verboseStringWithOperatorId(): String = { @@ -539,6 +546,7 @@ case class FileSourceScanExec( .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) } + // TODO(SPARK-32985): Decouple bucket filter pruning and bucketed table scan val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { val bucketSet = optionalBucketSet.get filesGroupedToBuckets.filter { @@ -624,6 +632,7 @@ case class FileSourceScanExec( optionalBucketSet, optionalNumCoalescedBuckets, QueryPlan.normalizePredicates(dataFilters, output), - None) + None, + disableBucketedScan) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index dca2c5b16e8d5..a056500fa361a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} -import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin +import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan} import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} @@ -344,6 +344,7 @@ object QueryExecution { PlanSubqueries(sparkSession), RemoveRedundantProjects(sparkSession.sessionState.conf), EnsureRequirements(sparkSession.sessionState.conf), + DisableUnnecessaryBucketedScan(sparkSession.sessionState.conf), ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf, sparkSession.sessionState.columnarRules), CollapseCodegenStages(sparkSession.sessionState.conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala new file mode 100644 index 0000000000000..9b4f898df00b6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashClusteredDistribution} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SortExec, SparkPlan} +import org.apache.spark.sql.execution.aggregate.BaseAggregateExec +import org.apache.spark.sql.execution.exchange.Exchange +import org.apache.spark.sql.internal.SQLConf + +/** + * Disable unnecessary bucketed table scan based on actual physical query plan. + * NOTE: this rule is designed to be applied right after [[EnsureRequirements]], + * where all [[ShuffleExchangeExec]] and [[SortExec]] have been added to plan properly. + * + * When BUCKETING_ENABLED and AUTO_BUCKETED_SCAN_ENABLED are set to true, go through + * query plan to check where bucketed table scan is unnecessary, and disable bucketed table + * scan if: + * + * 1. The sub-plan from root to bucketed table scan, does not contain + * [[hasInterestingPartition]] operator. + * + * 2. The sub-plan from the nearest downstream [[hasInterestingPartition]] operator + * to the bucketed table scan, contains only [[isAllowedUnaryExecNode]] operators + * and at least one [[Exchange]]. + * + * Examples: + * 1. no [[hasInterestingPartition]] operator: + * Project + * | + * Filter + * | + * Scan(t1: i, j) + * (bucketed on column j, DISABLE bucketed scan) + * + * 2. join: + * SortMergeJoin(t1.i = t2.j) + * / \ + * Sort(i) Sort(j) + * / \ + * Shuffle(i) Scan(t2: i, j) + * / (bucketed on column j, enable bucketed scan) + * Scan(t1: i, j) + * (bucketed on column j, DISABLE bucketed scan) + * + * 3. aggregate: + * HashAggregate(i, ..., Final) + * | + * Shuffle(i) + * | + * HashAggregate(i, ..., Partial) + * | + * Filter + * | + * Scan(t1: i, j) + * (bucketed on column j, DISABLE bucketed scan) + * + * The idea of [[hasInterestingPartition]] is inspired from "interesting order" in + * the paper "Access Path Selection in a Relational Database Management System" + * (https://dl.acm.org/doi/10.1145/582095.582099). + */ +case class DisableUnnecessaryBucketedScan(conf: SQLConf) extends Rule[SparkPlan] { + + /** + * Disable bucketed table scan with pre-order traversal of plan. + * + * @param withInterestingPartition The traversed plan has operator with interesting partition. + * @param withExchange The traversed plan has [[Exchange]] operator. + * @param withAllowedNode The traversed plan has only [[isAllowedUnaryExecNode]] operators. + */ + private def disableBucketWithInterestingPartition( + plan: SparkPlan, + withInterestingPartition: Boolean, + withExchange: Boolean, + withAllowedNode: Boolean): SparkPlan = { + plan match { + case p if hasInterestingPartition(p) => + // Operator with interesting partition, propagates `withInterestingPartition` as true + // to its children, and resets `withExchange` and `withAllowedNode`. + p.mapChildren(disableBucketWithInterestingPartition(_, true, false, true)) + case exchange: Exchange => + // Exchange operator propagates `withExchange` as true to its child. + exchange.mapChildren(disableBucketWithInterestingPartition( + _, withInterestingPartition, true, withAllowedNode)) + case scan: FileSourceScanExec => + if (isBucketedScanWithoutFilter(scan)) { + if (!withInterestingPartition || (withExchange && withAllowedNode)) { + scan.copy(disableBucketedScan = true) + } else { + scan + } + } else { + scan + } + case o => + o.mapChildren(disableBucketWithInterestingPartition( + _, + withInterestingPartition, + withExchange, + withAllowedNode && isAllowedUnaryExecNode(o))) + } + } + + private def hasInterestingPartition(plan: SparkPlan): Boolean = { + plan.requiredChildDistribution.exists { + case _: ClusteredDistribution | _: HashClusteredDistribution => true + case _ => false + } + } + + /** + * Check if the operator is allowed single-child operator. + * We may revisit this method later as we probably can + * remove this restriction to allow arbitrary operator between + * bucketed table scan and operator with interesting partition. + */ + private def isAllowedUnaryExecNode(plan: SparkPlan): Boolean = { + plan match { + case _: SortExec | _: ProjectExec | _: FilterExec => true + case partialAgg: BaseAggregateExec => + partialAgg.requiredChildDistributionExpressions.isEmpty + case _ => false + } + } + + private def isBucketedScanWithoutFilter(scan: FileSourceScanExec): Boolean = { + // Do not disable bucketed table scan if it has filter pruning, + // because bucketed table scan is still useful here to save CPU/IO cost with + // only reading selected bucket files. + scan.bucketedScan && scan.optionalBucketSet.isEmpty + } + + def apply(plan: SparkPlan): SparkPlan = { + lazy val hasBucketedScanWithoutFilter = plan.find { + case scan: FileSourceScanExec => isBucketedScanWithoutFilter(scan) + case _ => false + }.isDefined + + if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScanWithoutFilter) { + plan + } else { + disableBucketWithInterestingPartition(plan, false, false, true) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index b463a76a74026..14d03a30453ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -348,7 +348,7 @@ class DataFrameJoinSuite extends QueryTest } assert(broadcastExchanges.size == 1) val tables = broadcastExchanges.head.collect { - case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent)) => tableIdent + case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent), _) => tableIdent } assert(tables.size == 1) assert(tables.head === TableIdentifier(table1Name, Some(dbName))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index a21c461e84588..73b23496de515 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1314,7 +1314,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark // need to execute the query before we can examine fs.inputRDDs() assert(stripAQEPlan(df.queryExecution.executedPlan) match { case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter( - fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _)))) => + fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _, _)))) => partitionFilters.exists(ExecSubqueryExpression.hasSubquery) && fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala new file mode 100644 index 0000000000000..1c258bc0dadb9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} + +class DisableUnnecessaryBucketedScanWithoutHiveSupportSuite + extends DisableUnnecessaryBucketedScanSuite + with SharedSparkSession { + + protected override def beforeAll(): Unit = { + super.beforeAll() + assert(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") + } +} + +abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest with SQLTestUtils { + import testImplicits._ + + private lazy val df1 = + (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") + private lazy val df2 = + (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") + + private def checkDisableBucketedScan( + query: String, + expectedNumScanWithAutoScanEnabled: Int, + expectedNumScanWithAutoScanDisabled: Int): Unit = { + + def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): Unit = { + val plan = sql(query).queryExecution.executedPlan + val bucketedScan = plan.collect { case s: FileSourceScanExec if s.bucketedScan => s } + assert(bucketedScan.length == expectedNumBucketedScan) + } + + withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { + checkNumBucketedScan(query, expectedNumScanWithAutoScanEnabled) + val result = sql(query).collect() + + withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "false") { + checkNumBucketedScan(query, expectedNumScanWithAutoScanDisabled) + checkAnswer(sql(query), result) + } + } + } + + test("SPARK-32859: disable unnecessary bucketed table scan - basic test") { + withTable("t1", "t2", "t3") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") + df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") + df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t3") + + Seq( + // Read bucketed table + ("SELECT * FROM t1", 0, 1), + ("SELECT i FROM t1", 0, 1), + ("SELECT j FROM t1", 0, 0), + // Filter on bucketed column + ("SELECT * FROM t1 WHERE i = 1", 1, 1), + // Filter on non-bucketed column + ("SELECT * FROM t1 WHERE j = 1", 0, 1), + // Join with same buckets + ("SELECT /*+ broadcast(t1)*/ * FROM t1 JOIN t2 ON t1.i = t2.i", 0, 2), + ("SELECT /*+ shuffle_hash(t1)*/ * FROM t1 JOIN t2 ON t1.i = t2.i", 2, 2), + ("SELECT /*+ merge(t1)*/ * FROM t1 JOIN t2 ON t1.i = t2.i", 2, 2), + // Join with different buckets + ("SELECT /*+ broadcast(t1)*/ * FROM t1 JOIN t3 ON t1.i = t3.i", 0, 2), + ("SELECT /*+ shuffle_hash(t1)*/ * FROM t1 JOIN t3 ON t1.i = t3.i", 1, 2), + ("SELECT /*+ merge(t1)*/ * FROM t1 JOIN t3 ON t1.i = t3.i", 1, 2), + // Join on non-bucketed column + ("SELECT /*+ broadcast(t1)*/ * FROM t1 JOIN t2 ON t1.i = t2.j", 0, 2), + ("SELECT /*+ shuffle_hash(t1)*/ * FROM t1 JOIN t2 ON t1.i = t2.j", 1, 2), + ("SELECT /*+ merge(t1)*/ * FROM t1 JOIN t2 ON t1.i = t2.j", 1, 2), + ("SELECT /*+ broadcast(t1)*/ * FROM t1 JOIN t2 ON t1.j = t2.j", 0, 2), + ("SELECT /*+ shuffle_hash(t1)*/ * FROM t1 JOIN t2 ON t1.j = t2.j", 0, 2), + ("SELECT /*+ merge(t1)*/ * FROM t1 JOIN t2 ON t1.j = t2.j", 0, 2), + // Aggregate on bucketed column + ("SELECT SUM(i) FROM t1 GROUP BY i", 1, 1), + // Aggregate on non-bucketed column + ("SELECT SUM(i) FROM t1 GROUP BY j", 0, 1), + ("SELECT j, SUM(i), COUNT(j) FROM t1 GROUP BY j", 0, 1) + ).foreach { case (query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled) => + checkDisableBucketedScan(query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled) + } + } + } + + test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins test") { + withTable("t1", "t2", "t3") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") + df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") + df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t3") + + Seq( + // Multiple joins on bucketed columns + (""" + SELECT /*+ broadcast(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3 + ON t1.i = t2.i AND t2.i = t3.i + """.stripMargin, 0, 3), + (""" + SELECT /*+ broadcast(t1) merge(t3)*/ * FROM t1 JOIN t2 JOIN t3 + ON t1.i = t2.i AND t2.i = t3.i + """.stripMargin, 2, 3), + (""" + SELECT /*+ merge(t1) broadcast(t3)*/ * FROM t1 JOIN t2 JOIN t3 + ON t1.i = t2.i AND t2.i = t3.i + """.stripMargin, 2, 3), + (""" + SELECT /*+ merge(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3 + ON t1.i = t2.i AND t2.i = t3.i + """.stripMargin, 2, 3), + // Multiple joins on non-bucketed columns + (""" + SELECT /*+ broadcast(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3 + ON t1.i = t2.j AND t2.j = t3.i + """.stripMargin, 0, 3), + (""" + SELECT /*+ merge(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3 + ON t1.i = t2.j AND t2.j = t3.i + """.stripMargin, 1, 3), + (""" + SELECT /*+ merge(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3 + ON t1.j = t2.j AND t2.j = t3.j + """.stripMargin, 0, 3) + ).foreach { case (query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled) => + checkDisableBucketedScan(query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled) + } + } + } + + test("SPARK-32859: disable unnecessary bucketed table scan - multiple bucketed columns test") { + withTable("t1", "t2", "t3") { + df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") + df2.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t2") + df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t3") + + Seq( + // Filter on bucketed columns + ("SELECT * FROM t1 WHERE i = 1", 0, 1), + ("SELECT * FROM t1 WHERE i = 1 AND j = 1", 0, 1), + // Join on bucketed columns + (""" + SELECT /*+ broadcast(t1)*/ * FROM t1 JOIN t2 ON t1.i = t2.i AND t1.j = t2.j + """.stripMargin, 0, 2), + (""" + SELECT /*+ merge(t1)*/ * FROM t1 JOIN t2 ON t1.i = t2.i AND t1.j = t2.j + """.stripMargin, 2, 2), + (""" + SELECT /*+ merge(t1)*/ * FROM t1 JOIN t3 ON t1.i = t3.i AND t1.j = t3.j + """.stripMargin, 1, 2), + ("SELECT /*+ merge(t1)*/ * FROM t1 JOIN t2 ON t1.i = t2.i", 0, 2), + // Aggregate on bucketed columns + ("SELECT i, j, COUNT(*) FROM t1 GROUP BY i, j", 1, 1), + ("SELECT i, COUNT(i) FROM t1 GROUP BY i", 0, 0), + ("SELECT i, COUNT(j) FROM t1 GROUP BY i", 0, 1) + ).foreach { case (query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled) => + checkDisableBucketedScan(query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled) + } + } + } + + test("SPARK-32859: disable unnecessary bucketed table scan - other operators test") { + withTable("t1", "t2", "t3") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") + df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") + df1.write.format("parquet").saveAsTable("t3") + + Seq( + // Operator with interesting partition not in sub-plan + (""" + SELECT t1.i FROM t1 + UNION ALL + (SELECT t2.i FROM t2 GROUP BY t2.i) + """.stripMargin, 1, 2), + // Non-allowed operator in sub-plan + (""" + SELECT COUNT(*) + FROM (SELECT t1.i FROM t1 UNION ALL SELECT t2.i FROM t2) + GROUP BY i + """.stripMargin, 2, 2), + // Multiple [[Exchange]] in sub-plan + (""" + SELECT j, SUM(i), COUNT(*) FROM t1 GROUP BY j + DISTRIBUTE BY j + """.stripMargin, 0, 1), + (""" + SELECT j, COUNT(*) + FROM (SELECT i, j FROM t1 DISTRIBUTE BY i, j) + GROUP BY j + """.stripMargin, 0, 1), + // No bucketed table scan in plan + (""" + SELECT j, COUNT(*) + FROM (SELECT t1.j FROM t1 JOIN t3 ON t1.j = t3.j) + GROUP BY j + """.stripMargin, 0, 0) + ).foreach { case (query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled) => + checkDisableBucketedScan(query, numScanWithAutoScanEnabled, numScanWithAutoScanDisabled) + } + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanWithHiveSupportSuite.scala new file mode 100644 index 0000000000000..30eb93cb5c3e8 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanWithHiveSupportSuite.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION + +class DisableUnnecessaryBucketedScanWithHiveSupportSuite + extends DisableUnnecessaryBucketedScanSuite + with TestHiveSingleton { + + protected override def beforeAll(): Unit = { + super.beforeAll() + assert(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive") + } +}