Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ case class OrderedJoin(
/**
* Reorder the joins using a dynamic programming algorithm. This implementation is based on the
* paper: Access Path Selection in a Relational Database Management System.
* http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf
* https://dl.acm.org/doi/10.1145/582095.582099
*
* First we put all items (basic joined nodes) into level 0, then we build all two-way joins
* at level 1 from plans at level 0 (single items), then build all 3-way joins from plans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,17 @@ object SQLConf {
.checkValue(_ > 0, "the value of spark.sql.sources.bucketing.maxBuckets must be greater than 0")
.createWithDefault(100000)

val AUTO_BUCKETED_SCAN_ENABLED =
buildConf("spark.sql.sources.bucketing.autoBucketedScan.enabled")
.doc("When true, decide whether to do bucketed scan on input tables based on query plan " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since our user documents are generated based on this statement, could you describe a bit more about how to decide whether to do bucketed scans or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, wondering what do you think of below?

When true, decide whether to do bucketed scan on input tables
based on query plan automatically. Do not use bucketed scan if
(1).query does not have operators to utilize bucketing (e.g. join, group-by, etc),
or (2).there's an exchange operator between these operators and table scan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks okay, but I have the same suggestion with #29804 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated.

"automatically. Do not use bucketed scan if 1. query does not have operators to utilize " +
"bucketing (e.g. join, group-by, etc), or 2. there's an exchange operator between these " +
s"operators and table scan. Note when '${BUCKETING_ENABLED.key}' is set to " +
"false, this configuration does not take any effect.")
.version("3.1.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we need to make this config external? If we just add this config for keeping the current behaviour, is it okay to add it as internal one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, just for my own education, what does it indicate to make a config internal/external?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we don't have any strict rule for that. But, I think this new rule works well in most queries, so adding this as external looks less meaning because I think most users don't turn this feature off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated.

.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - thanks for pointing it out. Created https://issues.apache.org/jira/browse/SPARK-33075 for followup, cc @viirya in case there's any other regression for enabling auto bucketed scan, except cached query.


val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.internal()
.doc("When false, we will throw an error if a query contains a cartesian product without " +
Expand Down Expand Up @@ -3164,6 +3175,8 @@ class SQLConf extends Serializable with Logging {

def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS)

def autoBucketedScanEnabled: Boolean = getConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED)

def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ case class RowDataSourceScanExec(
* @param optionalBucketSet Bucket ids for bucket pruning.
* @param optionalNumCoalescedBuckets Number of coalesced buckets.
* @param dataFilters Filters on non-partition columns.
* @param tableIdentifier identifier for the table in the metastore.
* @param tableIdentifier Identifier for the table in the metastore.
* @param disableBucketedScan Disable bucketed scan based on physical query plan, see rule
* [[DisableUnnecessaryBucketedScan]] for details.
*/
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
Expand All @@ -166,7 +168,8 @@ case class FileSourceScanExec(
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
dataFilters: Seq[Expression],
tableIdentifier: Option[TableIdentifier])
tableIdentifier: Option[TableIdentifier],
disableBucketedScan: Boolean = false)
extends DataSourceScanExec {

// Note that some vals referring the file-based relation are lazy intentionally
Expand Down Expand Up @@ -257,7 +260,8 @@ case class FileSourceScanExec(

// exposed for testing
lazy val bucketedScan: Boolean = {
if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined) {
if (relation.sparkSession.sessionState.conf.bucketingEnabled && relation.bucketSpec.isDefined
&& !disableBucketedScan) {
val spec = relation.bucketSpec.get
val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
bucketColumns.size == spec.bucketColumnNames.size
Expand Down Expand Up @@ -348,20 +352,23 @@ case class FileSourceScanExec(
"DataFilters" -> seqToString(dataFilters),
"Location" -> locationDesc)

val withSelectedBucketsCount = relation.bucketSpec.map { spec =>
val numSelectedBuckets = optionalBucketSet.map { b =>
b.cardinality()
// TODO(SPARK-32986): Add bucketed scan info in explain output of FileSourceScanExec
if (bucketedScan) {
relation.bucketSpec.map { spec =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map -> foreach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - just for my own education, why does it matter? Updated anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I remember the previous discussion: https://issues.apache.org/jira/browse/SPARK-16694

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, please only use map when you care about the return value. foreach is better if you just want to do some calculation if Option is Some

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , @maropu - I changed the code during iterations. The current change is just adding a if (bucketedScan) { ... } else { ... } on top of original code, where we still need to use map as it's returning value.

val numSelectedBuckets = optionalBucketSet.map { b =>
b.cardinality()
} getOrElse {
spec.numBuckets
}
metadata + ("SelectedBucketsCount" ->
(s"$numSelectedBuckets out of ${spec.numBuckets}" +
optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse("")))
} getOrElse {
spec.numBuckets
metadata
}
metadata + ("SelectedBucketsCount" ->
(s"$numSelectedBuckets out of ${spec.numBuckets}" +
optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse("")))
} getOrElse {
} else {
metadata
}

withSelectedBucketsCount
}

override def verboseStringWithOperatorId(): String = {
Expand Down Expand Up @@ -539,6 +546,7 @@ case class FileSourceScanExec(
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
}

// TODO(SPARK-32985): Decouple bucket filter pruning and bucketed table scan
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
filesGroupedToBuckets.filter {
Expand Down Expand Up @@ -624,6 +632,7 @@ case class FileSourceScanExec(
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None)
None,
disableBucketedScan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin
import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan}
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
Expand Down Expand Up @@ -344,6 +344,7 @@ object QueryExecution {
PlanSubqueries(sparkSession),
RemoveRedundantProjects(sparkSession.sessionState.conf),
EnsureRequirements(sparkSession.sessionState.conf),
DisableUnnecessaryBucketedScan(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.bucketing

import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashClusteredDistribution}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.internal.SQLConf

/**
* Disable unnecessary bucketed table scan based on actual physical query plan.
* NOTE: this rule is designed to be applied right after [[EnsureRequirements]],
* where all [[ShuffleExchangeExec]] and [[SortExec]] have been added to plan properly.
*
* When BUCKETING_ENABLED and AUTO_BUCKETED_SCAN_ENABLED are set to true, go through
* query plan to check where bucketed table scan is unnecessary, and disable bucketed table
* scan if:
*
* 1. The sub-plan from root to bucketed table scan, does not contain
* [[hasInterestingPartition]] operator.
*
* 2. The sub-plan from the nearest downstream [[hasInterestingPartition]] operator
* to the bucketed table scan, contains only [[isAllowedUnaryExecNode]] operators
* and at least one [[Exchange]].
*
* Examples:
* 1. no [[hasInterestingPartition]] operator:
* Project
* |
* Filter
* |
* Scan(t1: i, j)
* (bucketed on column j, DISABLE bucketed scan)
*
* 2. join:
* SortMergeJoin(t1.i = t2.j)
* / \
* Sort(i) Sort(j)
* / \
* Shuffle(i) Scan(t2: i, j)
* / (bucketed on column j, enable bucketed scan)
* Scan(t1: i, j)
* (bucketed on column j, DISABLE bucketed scan)
*
* 3. aggregate:
* HashAggregate(i, ..., Final)
* |
* Shuffle(i)
* |
* HashAggregate(i, ..., Partial)
* |
* Filter
* |
* Scan(t1: i, j)
* (bucketed on column j, DISABLE bucketed scan)
*
* The idea of [[hasInterestingPartition]] is inspired from "interesting order" in
* the paper "Access Path Selection in a Relational Database Management System"
* (https://dl.acm.org/doi/10.1145/582095.582099).
*/
case class DisableUnnecessaryBucketedScan(conf: SQLConf) extends Rule[SparkPlan] {

/**
* Disable bucketed table scan with pre-order traversal of plan.
*
* @param withInterestingPartition The traversed plan has operator with interesting partition.
* @param withExchange The traversed plan has [[Exchange]] operator.
* @param withAllowedNode The traversed plan has only [[isAllowedUnaryExecNode]] operators.
*/
private def disableBucketWithInterestingPartition(
plan: SparkPlan,
withInterestingPartition: Boolean,
withExchange: Boolean,
withAllowedNode: Boolean): SparkPlan = {
plan match {
case p if hasInterestingPartition(p) =>
// Operator with interesting partition, propagates `withInterestingPartition` as true
// to its children, and resets `withExchange` and `withAllowedNode`.
p.mapChildren(disableBucketWithInterestingPartition(_, true, false, true))
case exchange: Exchange =>
// Exchange operator propagates `withExchange` as true to its child.
exchange.mapChildren(disableBucketWithInterestingPartition(
_, withInterestingPartition, true, withAllowedNode))
case scan: FileSourceScanExec =>
if (isBucketedScanWithoutFilter(scan)) {
if (!withInterestingPartition || (withExchange && withAllowedNode)) {
scan.copy(disableBucketedScan = true)
} else {
scan
}
} else {
scan
}
case o =>
o.mapChildren(disableBucketWithInterestingPartition(
_,
withInterestingPartition,
withExchange,
withAllowedNode && isAllowedUnaryExecNode(o)))
}
}

private def hasInterestingPartition(plan: SparkPlan): Boolean = {
plan.requiredChildDistribution.exists {
case _: ClusteredDistribution | _: HashClusteredDistribution => true
case _ => false
}
}

/**
* Check if the operator is allowed single-child operator.
* We may revisit this method later as we probably can
* remove this restriction to allow arbitrary operator between
* bucketed table scan and operator with interesting partition.
*/
private def isAllowedUnaryExecNode(plan: SparkPlan): Boolean = {
Copy link
Member

@viirya viirya Sep 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add description why this is needed? HasInterestingPartition and at lease one Exchange sounds obvious condition, but this allowed unary exec node is not. Why we can disable bucketed scan only if those exec nodes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - this is good question. I agree we can be more bold and we probably don't need a whitelist operators here, e.g. SMJ - shuffle - BHJ - scan, SMJ - Shuffle - union - Scan (and another scan) should also work, but my feeling is to start with more confidence change first and improve later. With a whitelist operators here, we have a high confidence that this feature should work without introducing regression, but much less confidence if we allow arbitrary operators in the middle (at least for me). For now, to be honest, I cannot find a case why arbitrary operators cannot work. But I want to play safer in the beginning and any future improvement for this is much welcomed. cc @cloud-fan and @maropu for thoughts.

Added a comment for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opnion, it is okay for this PR to focus on a basic (minimal) support for the auto bucket scan. In followup activities, I think we can optimize it step-by-step by adding test cases and checking performance improvements... (Anyway, it would be better to leave some comment there about it as @viirya suggested above)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make the code clear for developer and maintainer, so leaving some comment is nicer if we want to constrain the scope of this rule for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already added a comment in last iteration. Please suggest concretely for alternative comment if it's not looking good. Thanks.

plan match {
case _: SortExec | _: ProjectExec | _: FilterExec => true
case partialAgg: BaseAggregateExec =>
partialAgg.requiredChildDistributionExpressions.isEmpty
case _ => false
}
}

private def isBucketedScanWithoutFilter(scan: FileSourceScanExec): Boolean = {
// Do not disable bucketed table scan if it has filter pruning,
// because bucketed table scan is still useful here to save CPU/IO cost with
// only reading selected bucket files.
scan.bucketedScan && scan.optionalBucketSet.isEmpty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if a scan operator reads most buckets? e.g., 999 of 1000 buckets. We select bucket scans even in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - this is a good question, and I think it is kind of out of scope for this PR and needs more thoughts later. We don't have a cost model to decide whether to do (bucketed filter + bucketed scan) vs (normal filter + non-bucketed scan). It can depend on number of buckets, size of filtered buckets, CPU cost for filter, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with it for now. Technically I think filter by bucket ID and bucketed scan don't need to be coupled. We can always filter files by bucket id, and then do bucketed scan or not according to this rule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, okay. Could you file jira later, @c21 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

def apply(plan: SparkPlan): SparkPlan = {
lazy val hasBucketedScanWithoutFilter = plan.find {
case scan: FileSourceScanExec => isBucketedScanWithoutFilter(scan)
case _ => false
}.isDefined

if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScanWithoutFilter) {
plan
} else {
disableBucketWithInterestingPartition(plan, false, false, true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class DataFrameJoinSuite extends QueryTest
}
assert(broadcastExchanges.size == 1)
val tables = broadcastExchanges.head.collect {
case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent)) => tableIdent
case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent), _) => tableIdent
}
assert(tables.size == 1)
assert(tables.head === TableIdentifier(table1Name, Some(dbName)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
// need to execute the query before we can examine fs.inputRDDs()
assert(stripAQEPlan(df.queryExecution.executedPlan) match {
case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _)))) =>
fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _, _)))) =>
partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
Expand Down
Loading