Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -542,10 +542,9 @@ case class FileSourceScanExec(
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
.getOrElse(throw new IllegalStateException(s"Invalid bucket file ${f.filePath}"))
}

// TODO(SPARK-32985): Decouple bucket filter pruning and bucketed table scan
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
filesGroupedToBuckets.filter {
Expand Down Expand Up @@ -591,20 +590,48 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")

// Filter files with bucket pruning if possible

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit odd that we call method name as createNonBucketedReadRDD but do something with buckets. I guess we could name createNonBucketedReadRDD like just createReadRDD or createStandardReadRDD

lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles
val canPrune: Path => Boolean = optionalBucketSet match {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps rename this to shouldNotPrune or shouldProcess? canPrune sounds like the path should be ignored.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very bad at naming :) This is suggested from #31413 (comment). Shall I change again? cc @maropu .

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, okay.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to shouldProcess as I feel shouldNotPrune is hard to reason about.

case Some(bucketSet) =>
filePath => {
BucketingUtils.getBucketId(filePath.getName) match {
case Some(id) => bucketSet.get(id)
case None =>
if (ignoreCorruptFiles) {
// If ignoring corrupt file, do not prune when bucket file name is invalid

@sunchao sunchao Feb 2, 2021

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious what's the previous behavior of this, or is this newly introduced? we may need to add the info to the PR description (user-facing change).

Also I'm not sure if this is the best choice: if a bucketed table is corrupted, should we read the corrupt file? it will likely lead to incorrect results. On the other hand we can choose to ignore the file which seems to be more aligned with the name of the config, although result could still be incorrect.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - this is newly introduced. Updated PR description.

Also I'm not sure if this is the best choice: if a bucketed table is corrupted, should we read the corrupt file? it will likely lead to incorrect results. On the other hand we can choose to ignore the file which seems to be more aligned with the name of the config, although result could still be incorrect.

Note by default the exception will be thrown here and query will be failed loud. We allow a config here to help existing users to work around if they want. See relevant discussion in #31413 (comment) .

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool thanks for pointing to the discussion. I'm just not sure whether the corrupted file should be ignored or processed if the flag is turned on. ignoreCorruptedFiles seems to indicate that the problematic file should be ignored so it is a bit confusing that we still process it here. Also IMO ignoring it seems to be slightly safer (thinking someone dump garbage files into the bucketed partition dir)?

cc @maropu @viirya

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel either skipping or processing the file is no way perfect. There can be other corruption case, where e.g. the table (specified with 1024 buckets), but only had 500 files underneath. This could be due to some other compute engines or users accidentally dump data here without respecting spark bucketing metadata. We have no efficient way to handle if number of files fewer than number of buckets.

The existing usage of ignoreCorruptFiles skip reading some of content of file, so it's also not completely ignoring. But I am fine if we think we need another config name for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given users explicitly disable bucketing here for reading the table, I would assume they want to read the table as a non-bucketed table, so they would like to read all of input files, no? cc @viirya what's the use case you are thinking here? Thanks.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel either skipping or processing the file is no way perfect.

Yes agreed.

Given users explicitly disable bucketing here for reading the table, I would assume they want to read the table as a non-bucketed table, so they would like to read all of input files

Good point. Although it seems a bit weird that someone would do this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unrelated to "decouple bucket scan and bucket filter pruning". Can we do it in a followup PR and discuss there? Let's not introduce extra behavior change when not necessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sorry which part you are suggesting to do in a followup PR? Here we anyway need to decide how do we handle when file name is not a valid bucket file name (process or not process the file) for pruning. Does I miss anything?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we anyway need to decide how do we handle when file name is not a valid bucket file name (process or not process the file) for pruning

We should follow whatever behavior before this PR, since the correct behavior is not obvious and triggers discussion here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - okay. The behavior before this PR is to process all files for bucketed table if disabling bucketing. I changed the code to not prune the file if bucket file name is invalid. So this should follow previous behavior, and we can discuss whether to throw exception/ignore file/process file in followup PR. cc @maropu and @viirya for code change here.

true
} else {
throw new IllegalStateException(
s"Invalid bucket file $filePath when doing bucket pruning. " +
s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to ignore exception " +
"and read the file.")
}
}
}
case None =>
_ => true
}

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
)

if (canPrune(filePath)) {
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
)
} else {
Seq.empty
}
}
}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
exchange.mapChildren(disableBucketWithInterestingPartition(
_, withInterestingPartition, true, withAllowedNode))
case scan: FileSourceScanExec =>
if (isBucketedScanWithoutFilter(scan)) {
if (scan.bucketedScan) {
if (!withInterestingPartition || (withExchange && withAllowedNode)) {
val nonBucketedScan = scan.copy(disableBucketedScan = true)
scan.logicalLink.foreach(nonBucketedScan.setLogicalLink)
Expand Down Expand Up @@ -140,20 +140,13 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
}
}

private def isBucketedScanWithoutFilter(scan: FileSourceScanExec): Boolean = {
// Do not disable bucketed table scan if it has filter pruning,
// because bucketed table scan is still useful here to save CPU/IO cost with
// only reading selected bucket files.
scan.bucketedScan && scan.optionalBucketSet.isEmpty
}

def apply(plan: SparkPlan): SparkPlan = {
lazy val hasBucketedScanWithoutFilter = plan.find {
case scan: FileSourceScanExec => isBucketedScanWithoutFilter(scan)
lazy val hasBucketedScan = plan.find {
case scan: FileSourceScanExec => scan.bucketedScan
case _ => false
}.isDefined

if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScanWithoutFilter) {
if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScan) {
plan
} else {
disableBucketWithInterestingPartition(plan, false, false, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
bucketValues: Seq[Any],
filterCondition: Column,
originalDataFrame: DataFrame): Unit = {
// This test verifies parts of the plan. Disable whole stage codegen.
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
// This test verifies parts of the plan. Disable whole stage codegen,
// automatically bucketed scan, and filter push down for json data source.
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "false",
SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "false") {
val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k")
val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
// Limit: bucket pruning only works when the bucket column has one and only one column
Expand Down Expand Up @@ -148,19 +151,53 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
if (invalidBuckets.nonEmpty) {
fail(s"Buckets ${invalidBuckets.mkString(",")} should have been pruned from:\n$plan")
}

withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
// Bucket pruning should still work when bucketing is disabled

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then do we have a config to turn on/off bucketing file pruning, or it's always applied?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - always applied. I can add one if this makes surprise to existing users/queries, and they can turn off if needed. e.g. spark.sql.sources.bucketing.pruning.enabled?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can keep the previous behavior: when SQLConf.BUCKETING_ENABLED is off, don't do any bucket optimization, including bucket scan and bucket pruning.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - make sense to me. Updated the change.

val planWithBucketDisabled = spark.table("bucketed_table").select("i", "j", "k")
.filter(filterCondition).queryExecution.executedPlan
val fileScanWithBucketDisabled = getFileScan(planWithBucketDisabled)
assert(!fileScanWithBucketDisabled.bucketedScan,
"except no bucketed scan when disabling bucketing but found\n" +
s"$fileScanWithBucketDisabled")

val tableSchema = fileScanWithBucketDisabled.schema
val bucketColumnIndex = tableSchema.fieldIndex(bucketColumnNames.head)
val bucketColumn = tableSchema.toAttributes(bucketColumnIndex)
val bucketColumnType = tableSchema.apply(bucketColumnIndex).dataType
val rowsWithInvalidBuckets = fileScanWithBucketDisabled.execute().filter(row => {
// Return rows should have been pruned
val bucketColumnValue = row.get(bucketColumnIndex, bucketColumnType)
val bucketId = BucketingUtils.getBucketIdFromValue(
bucketColumn, numBuckets, bucketColumnValue)
!matchedBuckets.get(bucketId)
}).collect()

if (rowsWithInvalidBuckets.nonEmpty) {
fail(s"Rows ${rowsWithInvalidBuckets.mkString(",")} should have been pruned from:\n" +
s"$planWithBucketDisabled")
}
}
}

val expectedDataFrame = originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")
checkAnswer(
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
expectedDataFrame)

withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
checkAnswer(
spark.table("bucketed_table").select("i", "j", "k").filter(filterCondition)
.orderBy("i", "j", "k"),
expectedDataFrame)
}
}
}

test("read partitioning bucketed tables with bucket pruning filters") {
withTable("bucketed_table") {
val numBuckets = NumBucketsForPruningDF
val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil)
// json does not support predicate push-down, and thus json is used here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not true anymore as json filter push down was added in https://issues.apache.org/jira/browse/SPARK-30648 .

df.write
.format("json")
.partitionBy("i")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ abstract class DisableUnnecessaryBucketedScanSuite
("SELECT i FROM t1", 0, 1),
("SELECT j FROM t1", 0, 0),
// Filter on bucketed column
("SELECT * FROM t1 WHERE i = 1", 1, 1),
("SELECT * FROM t1 WHERE i = 1", 0, 1),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test change is expected, as we no longer need to do bucket scan for this kind of query. See related change in DisableUnnecessaryBucketedScan.scala

// Filter on non-bucketed column
("SELECT * FROM t1 WHERE j = 1", 0, 1),
// Join with same buckets
Expand Down