Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,14 @@ object SQLConf {
.checkValue(_ > 0, "the value of spark.sql.sources.bucketing.maxBuckets must be greater than 0")
.createWithDefault(100000)

val AUTO_BUCKETED_SCAN_ENABLED =
buildConf("spark.sql.sources.bucketing.autoBucketedScan.enabled")
.doc("When true, decide whether to do bucketed scan on input tables based on query plan " +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since our user documents are generated based on this statement, could you describe a bit more about how to decide whether to do bucketed scans or not?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, wondering what do you think of below?

When true, decide whether to do bucketed scan on input tables
based on query plan automatically. Do not use bucketed scan if
(1).query does not have operators to utilize bucketing (e.g. join, group-by, etc),
or (2).there's an exchange operator between these operators and table scan.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks okay, but I have the same suggestion with #29804 (comment)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated.

"automatically.")
.version("3.1.0")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we need to make this config external? If we just add this config for keeping the current behaviour, is it okay to add it as internal one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, just for my own education, what does it indicate to make a config internal/external?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we don't have any strict rule for that. But, I think this new rule works well in most queries, so adding this as external looks less meaning because I think most users don't turn this feature off.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated.

.booleanConf
.createWithDefault(true)

@viirya viirya Sep 28, 2020

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If spark.sql.sources.bucketing.enabled is disabled, does this config still work? Please describe it in the config doc too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - this config will not take any effect in that case, updated.


val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.internal()
.doc("When false, we will throw an error if a query contains a cartesian product without " +
Expand Down Expand Up @@ -3164,6 +3172,8 @@ class SQLConf extends Serializable with Logging {

def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS)

def autoBucketedScanEnabled: Boolean = getConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED)

def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ case class RowDataSourceScanExec(
* @param optionalBucketSet Bucket ids for bucket pruning.
* @param optionalNumCoalescedBuckets Number of coalesced buckets.
* @param dataFilters Filters on non-partition columns.
* @param tableIdentifier identifier for the table in the metastore.
* @param tableIdentifier Identifier for the table in the metastore.
* @param disableBucketedScan Disable bucketed scan based on physical query plan, see rule
* [[DisableUnnecessaryBucketedScan]] for details.
*/
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
Expand All @@ -166,7 +168,8 @@ case class FileSourceScanExec(
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
dataFilters: Seq[Expression],
tableIdentifier: Option[TableIdentifier])
tableIdentifier: Option[TableIdentifier],
disableBucketedScan: Boolean = false)
extends DataSourceScanExec {

// Note that some vals referring the file-based relation are lazy intentionally
Expand Down Expand Up @@ -257,7 +260,8 @@ case class FileSourceScanExec(

// exposed for testing
lazy val bucketedScan: Boolean = {
if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined) {
if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined
&& !disableBucketedScan) {
val spec = relation.bucketSpec.get
val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
bucketColumns.size == spec.bucketColumnNames.size
Expand Down Expand Up @@ -339,7 +343,7 @@ case class FileSourceScanExec(
location.getClass.getSimpleName +
Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength)
val metadata =
Map(
HashMap(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: HashMap -> mutable.HashMap for readability.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - updated. Was just following other code in the same file.

"Format" -> relation.fileFormat.toString,
"ReadSchema" -> requiredSchema.catalogString,
"Batched" -> supportsColumnar.toString,
Expand All @@ -348,20 +352,22 @@ case class FileSourceScanExec(
"DataFilters" -> seqToString(dataFilters),
"Location" -> locationDesc)

val withSelectedBucketsCount = relation.bucketSpec.map { spec =>
val numSelectedBuckets = optionalBucketSet.map { b =>
b.cardinality()
} getOrElse {
spec.numBuckets
if (bucketedScan) {
relation.bucketSpec.map { spec =>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map -> foreach

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - just for my own education, why does it matter? Updated anyway.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I remember the previous discussion: https://issues.apache.org/jira/browse/SPARK-16694

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, please only use map when you care about the return value. foreach is better if you just want to do some calculation if Option is Some

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , @maropu - I changed the code during iterations. The current change is just adding a if (bucketedScan) { ... } else { ... } on top of original code, where we still need to use map as it's returning value.

val numSelectedBuckets = optionalBucketSet.map { b =>
b.cardinality()
} getOrElse {
spec.numBuckets
}
metadata += ("SelectedBucketsCount" ->
(s"$numSelectedBuckets out of ${spec.numBuckets}" +
optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse("")))
}
metadata + ("SelectedBucketsCount" ->
(s"$numSelectedBuckets out of ${spec.numBuckets}" +
optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse("")))
} getOrElse {
metadata
} else if (disableBucketedScan) {
metadata += ("DisableBucketedScan" -> "true")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need it? It's kind of the reason why there is no bucket scan in this node. The reason can be: 1. the table is not bucketed. 2. the bucket column is not read. 3. the planner decides to disable it as it has no benefits.

If we do need the reason, we should make it completed. Let's not just put the disableBucketedScan flag.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least we don't need to do it in this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - how about I update the explain with a BucketedScan: [NON_BUCKETED, BUCKETED_COLUMNS_NOT_READ, DISABLED] in a followup PR? cc @maropu .

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of the reason why there is no bucket scan in this node. The reason can be: 1. the table is not bucketed. 2. the bucket column is not read. 3. the planner decides to disable it as it has no benefits.

The intention in my comment meant that users need to be able to see why bucket scans are disabled as Wenchen pointed it out above. Anyway, the followup looks okay.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO comment for followup jira - https://issues.apache.org/jira/browse/SPARK-32986 .

}

withSelectedBucketsCount
metadata.toMap
}

override def verboseStringWithOperatorId(): String = {
Expand Down Expand Up @@ -624,6 +630,7 @@ case class FileSourceScanExec(
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None)
None,
disableBucketedScan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin
import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan}
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
Expand Down Expand Up @@ -344,6 +344,7 @@ object QueryExecution {
PlanSubqueries(sparkSession),
RemoveRedundantProjects(sparkSession.sessionState.conf),
EnsureRequirements(sparkSession.sessionState.conf),
DisableUnnecessaryBucketedScan(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.bucketing

import org.apache.spark.sql.catalyst.expressions.aggregate.{Partial, PartialMerge}
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashClusteredDistribution}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.internal.SQLConf

/**
* Disable unnecessary bucketed table scan based on actual physical query plan.
* NOTE: this rule is designed to be applied right after [[EnsureRequirements]],
* where all [[ShuffleExchangeExec]] and [[SortExec]] have been added to plan properly.
*
* When BUCKETING_ENABLED and AUTO_BUCKETED_SCAN_ENABLED are set to true, go through
* query plan to check where bucketed table scan is unnecessary, and disable bucketed table
* scan if needed.
*
* For all operators which [[hasInterestingPartition]] (i.e., require [[ClusteredDistribution]]
* or [[HashClusteredDistribution]]), check if the sub-plan for operator has [[Exchange]] and
* bucketed table scan. If yes, disable the bucketed table scan in the sub-plan.
* Only allow certain operators in sub-plan, which guarantees each sub-plan is single lineage
* (i.e., each operator has only one child). See details in
* [[disableBucketWithInterestingPartition]]).
*
* Examples:
* (1).join:
* SortMergeJoin(t1.i = t2.j)
* / \
* Sort(i) Sort(j)
* / \
* Shuffle(i) Scan(t2: i, j)
* / (bucketed on column j, enable bucketed scan)
* Scan(t1: i, j)
* (bucketed on column j, DISABLE bucketed scan)
*
* (2).aggregate:
* HashAggregate(i, ..., Final)
* |
* Shuffle(i)
* |
* HashAggregate(i, ..., Partial)
* |
* Filter
* |
* Scan(t1: i, j)
* (bucketed on column j, DISABLE bucketed scan)
*
* The idea of [[hasInterestingPartition]] is inspired from "interesting order" in
* the paper "Access Path Selection in a Relational Database Management System"
* (http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we use a link to the ACM page instead? https://dl.acm.org/doi/10.1145/582095.582099

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated. Was just following the reference in CBO join reorder. Updated the link there as well.

*/
case class DisableUnnecessaryBucketedScan(conf: SQLConf) extends Rule[SparkPlan] {

/**
* Disable bucketed table scan with pre-order traversal of plan.
*
* @param withInterestingPartition The traversed plan has operator with interesting partition.
* @param withExchange The traversed plan has [[Exchange]] operator.
*/
private def disableBucketWithInterestingPartition(
plan: SparkPlan,
withInterestingPartition: Boolean,
withExchange: Boolean): SparkPlan = {
plan match {
case p if hasInterestingPartition(p) =>
// Operators with interesting partition, propagates `withInterestingPartition` as true
// to its children.
p.mapChildren(disableBucketWithInterestingPartition(_, true, false))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question; I read the PR description and I thought first that this rule is to find exchange plan nodes (inserted by EnsureRequirements) having bucket scans and then it disables the scans if the exchanges make bucket read meaningless. What I imagined is like this;

  private def disableBucketWithInterestingPartition(...): SparkPlan = {
    plan match {
      case exchange: Exchange if isAllowedUnaryExecNode(exchange.child) =>
        val newPlan = (disable bucket scan if the scan is meaningless)
        exchange.withNewChildren(newPlan:: Nil)

      case o =>
        o.mapChildren(disableBucketWithInterestingPartition(...))
    }
  }

But, the current code looks more general. Any reason to propagate required distributions in a top-down manner?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - this is good question. I think currently only matching Exchange will also do the same job. Either way is fine with me. InterestingPartition and later on InterestingOrder (for bucketed sorted scan) looks like more general and we can extend them later.

One interesting extension I can think of - if bucketed scan parallelism is too low (to few # of buckets), we may decide to not do a bucketed scan for join to trade-off for query run-time vs extra shuffle cost (in this case, there's no Exchange before join).

case exchange: Exchange if withInterestingPartition =>
// Exchange operator propagates `withExchange` as true to its child
// if the plan has interesting partition.
exchange.mapChildren(disableBucketWithInterestingPartition(
_, withInterestingPartition, true))
case scan: FileSourceScanExec
if withInterestingPartition && withExchange && isBucketedScanWithoutFilter(scan) =>
// Disable bucketed table scan if the plan has interesting partition,
// and [[Exchange]] in the plan.
scan.copy(disableBucketedScan = true)
case o =>
if (isAllowedUnaryExecNode(o)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make the rule clear. I think there are 2 cases when doing traverse here:

  1. When we are looking for node with interesting partition, we don't need a whitelist, we can just go through any node. If we hit a file scan, disable its bucketed scan.
  2. When we hit a node with interesting partition, and walk through its sub-plan, we need a whitelist and stop earlier if we hit a node outside of the whitelist.

I'd expect to see code like

if (withInterestingPartition) {
  if (isAllowedUnaryExecNode(o)) {
    ...
  } else {
    // stop traversing down here.
    o
  }
} else {
   o.mapChildren(disableBucketWithInterestingPartition(_, false, false))
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - as discussed offline, we still need to traverse sub-plan for node with interesting partition, even though this node cannot disable bucketed scan (e.g. multiple join cases).

Updated the code to disable bucketed table scan if:

  1. The sub-plan from root to bucketed table scan, only contains node with interesting partition, exchange, and allowed whitelisted single-child nodes.
  2. The sub-plan from root to bucketed table scan, does not contain node with interesting partition.

// Propagates `withInterestingPartition` and `withExchange` from parent
// for only allowed single-child nodes.
o.mapChildren(disableBucketWithInterestingPartition(
_, withInterestingPartition, withExchange))
} else {
o.mapChildren(disableBucketWithInterestingPartition(_, false, false))
}
}
}

private def hasInterestingPartition(plan: SparkPlan): Boolean = {
plan.requiredChildDistribution.exists {
case _: ClusteredDistribution | _: HashClusteredDistribution => true
case _ => false
}
}

private def isAllowedUnaryExecNode(plan: SparkPlan): Boolean = {

@viirya viirya Sep 28, 2020

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add description why this is needed? HasInterestingPartition and at lease one Exchange sounds obvious condition, but this allowed unary exec node is not. Why we can disable bucketed scan only if those exec nodes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - this is good question. I agree we can be more bold and we probably don't need a whitelist operators here, e.g. SMJ - shuffle - BHJ - scan, SMJ - Shuffle - union - Scan (and another scan) should also work, but my feeling is to start with more confidence change first and improve later. With a whitelist operators here, we have a high confidence that this feature should work without introducing regression, but much less confidence if we allow arbitrary operators in the middle (at least for me). For now, to be honest, I cannot find a case why arbitrary operators cannot work. But I want to play safer in the beginning and any future improvement for this is much welcomed. cc @cloud-fan and @maropu for thoughts.

Added a comment for now.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opnion, it is okay for this PR to focus on a basic (minimal) support for the auto bucket scan. In followup activities, I think we can optimize it step-by-step by adding test cases and checking performance improvements... (Anyway, it would be better to leave some comment there about it as @viirya suggested above)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make the code clear for developer and maintainer, so leaving some comment is nicer if we want to constrain the scope of this rule for now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already added a comment in last iteration. Please suggest concretely for alternative comment if it's not looking good. Thanks.

plan match {
case _: SortExec | _: Exchange | _: ProjectExec | _: FilterExec |
_: FileSourceScanExec => true

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_: FileSourceScanExec

Is the case only allowed if FileSourceScanExec has a relation with bucket specs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - FileSourceScanExec should be pattern matched earlier in disableBucketWithInterestingPartition before isAllowedUnaryExecNode check. But thanks for catch, we don't need to add Exchange and FileSourceScanExec here anyway, removed both.

case partialAgg: BaseAggregateExec =>
val modes = partialAgg.aggregateExpressions.map(_.mode)
modes.nonEmpty && modes.forall(mode => mode == Partial || mode == PartialMerge)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of checking the mode, shall we just check requiredChildDistributionExpressions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sure, updated. Also thanks for catch, I think requiredChildDistributionExpressions covers all cases where we can have partial aggregate without any aggregate expression (e.g. SELECT i from t GROUP BY i).

case _ => false
}
}

private def isBucketedScanWithoutFilter(scan: FileSourceScanExec): Boolean = {
// Do not disable bucketed table scan if it has filter pruning,
// because bucketed table scan is still useful here to save CPU/IO cost with
// only reading selected bucket files.
scan.bucketedScan && scan.optionalBucketSet.isEmpty

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if a scan operator reads most buckets? e.g., 999 of 1000 buckets. We select bucket scans even in this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - this is a good question, and I think it is kind of out of scope for this PR and needs more thoughts later. We don't have a cost model to decide whether to do (bucketed filter + bucketed scan) vs (normal filter + non-bucketed scan). It can depend on number of buckets, size of filtered buckets, CPU cost for filter, etc.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with it for now. Technically I think filter by bucket ID and bucketed scan don't need to be coupled. We can always filter files by bucket id, and then do bucketed scan or not according to this rule.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, okay. Could you file jira later, @c21 ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private def disableAllBucketedScan(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case scan: FileSourceScanExec if isBucketedScanWithoutFilter(scan) =>
scan.copy(disableBucketedScan = true)
}
}

def apply(plan: SparkPlan): SparkPlan = {
if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a plan doesn't have any bucket spec, we do nothing in this rule? Could we filter out the case at the beginning?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated.

plan
} else if (plan.find(hasInterestingPartition).isDefined) {
disableBucketWithInterestingPartition(plan, false, false)
} else {
// Disable all bucketed scans if there's no operator with interesting partition
// found in query plan.
disableAllBucketedScan(plan)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class DataFrameJoinSuite extends QueryTest
}
assert(broadcastExchanges.size == 1)
val tables = broadcastExchanges.head.collect {
case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent)) => tableIdent
case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent), _) => tableIdent
}
assert(tables.size == 1)
assert(tables.head === TableIdentifier(table1Name, Some(dbName)))
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,18 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
}
}

test("SPARK-32859: disable unnecessary bucketed table scan based on query plan") {
withTable("t1", "t2") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1")
Seq(2, 3).toDF("i").write.saveAsTable("t2")
val joined = sql("SELECT * FROM t1 JOIN t2 ON t1.i + 1 = t2.i")
checkKeywordsExistsInExplain(joined, keywords = "DisableBucketedScan: true")
}
}
}

test("Coalesced bucket info should be a part of explain string") {
withTable("t1", "t2") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
// need to execute the query before we can examine fs.inputRDDs()
assert(stripAQEPlan(df.queryExecution.executedPlan) match {
case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _)))) =>
fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _, _)))) =>
partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,20 +262,22 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre
"p1=2/file7_0000" -> 1),
buckets = 3)

// No partition pruning
checkScan(table) { partitions =>
assert(partitions.size == 3)
assert(partitions(0).files.size == 5)
assert(partitions(1).files.size == 0)
assert(partitions(2).files.size == 2)
}

// With partition pruning
checkScan(table.where("p1=2")) { partitions =>
assert(partitions.size == 3)
assert(partitions(0).files.size == 3)
assert(partitions(1).files.size == 0)
assert(partitions(2).files.size == 1)
withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "false") {
// No partition pruning
checkScan(table) { partitions =>
assert(partitions.size == 3)
assert(partitions(0).files.size == 5)
assert(partitions(1).files.size == 0)
assert(partitions(2).files.size == 2)
}

// With partition pruning
checkScan(table.where("p1=2")) { partitions =>
assert(partitions.size == 3)
assert(partitions(0).files.size == 3)
assert(partitions(1).files.size == 0)
assert(partitions(2).files.size == 1)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,22 +432,24 @@ abstract class BroadcastJoinSuiteBase extends QueryTest with SQLTestUtils
// join1 is a broadcast join where df2 is broadcasted. Note that output partitioning on the
// streamed side (t1) is HashPartitioning (bucketed files).
val join1 = t1.join(df2, t1("i1") === df2("i2") && t1("j1") === df2("j2"))
val plan1 = join1.queryExecution.executedPlan
assert(collect(plan1) { case e: ShuffleExchangeExec => e }.isEmpty)
val broadcastJoins = collect(plan1) { case b: BroadcastHashJoinExec => b }
assert(broadcastJoins.size == 1)
assert(broadcastJoins(0).outputPartitioning.isInstanceOf[PartitioningCollection])
val p = broadcastJoins(0).outputPartitioning.asInstanceOf[PartitioningCollection]
assert(p.partitionings.size == 4)
// Verify all the combinations of output partitioning.
Seq(Seq(t1("i1"), t1("j1")),
Seq(t1("i1"), df2("j2")),
Seq(df2("i2"), t1("j1")),
Seq(df2("i2"), df2("j2"))).foreach { expected =>
val expectedExpressions = expected.map(_.expr)
assert(p.partitionings.exists {
case h: HashPartitioning => expressionsEqual(h.expressions, expectedExpressions)
})
withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "false") {
val plan1 = join1.queryExecution.executedPlan
assert(collect(plan1) { case e: ShuffleExchangeExec => e }.isEmpty)
val broadcastJoins = collect(plan1) { case b: BroadcastHashJoinExec => b }
assert(broadcastJoins.size == 1)
assert(broadcastJoins(0).outputPartitioning.isInstanceOf[PartitioningCollection])
val p = broadcastJoins(0).outputPartitioning.asInstanceOf[PartitioningCollection]
assert(p.partitionings.size == 4)
// Verify all the combinations of output partitioning.
Seq(Seq(t1("i1"), t1("j1")),
Seq(t1("i1"), df2("j2")),
Seq(df2("i2"), t1("j1")),
Seq(df2("i2"), df2("j2"))).foreach { expected =>
val expectedExpressions = expected.map(_.expr)
assert(p.partitionings.exists {
case h: HashPartitioning => expressionsEqual(h.expressions, expectedExpressions)
})
}
}

// Join on the column from the broadcasted side (i2, j2) and make sure output partitioning
Expand Down
Loading