Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,6 @@ case class FileSourceScanExec(
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
}

// TODO(SPARK-32985): Decouple bucket filter pruning and bucketed table scan
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
filesGroupedToBuckets.filter {
Expand Down Expand Up @@ -591,20 +590,34 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")

// Filter files with bucket pruning if possible

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit odd that we call method name as createNonBucketedReadRDD but do something with buckets. I guess we could name createNonBucketedReadRDD like just createReadRDD or createStandardReadRDD

val filePruning: Path => Boolean = optionalBucketSet match {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: filePruning -> can[Bucket]Prune?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - changed.

case Some(bucketSet) =>
filePath => bucketSet.get(BucketingUtils.getBucketId(filePath.getName)
.getOrElse(sys.error(s"Invalid bucket file $filePath")))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use IllegalStateException instead of sys.error?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - I was following code path for creating bucketed RDD. Do we want to change this place as well? Just wondering why we prefer IllegalStateException here? Is it better error classification?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following code path for creating bucketed RDD. Do we want to change this place as well?

Yea, since the fix is trivial, changing it in this PR looks fine to me.

Just wondering why we prefer IllegalStateException here? Is it better error classification?

This is not a strict rule, but I think we tend to use IllegalStateException for a unexpected code path. For example, see the related previous comment:
#28810 (comment)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, sys.error crashes the JVM and should be avoided.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu , @cloud-fan - sure, updated.

@viirya viirya Feb 1, 2021

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks not good to fail the query here. The scan node actually reads bucket implicitly because bucket scan is disabled for this path. Instead of query failure, maybe log warning and skip pruning?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error here indicates there's data corruption (invalid file name) for spark data source bucketed table. The benefit for logging warning here is to unblock read these kind of corrupted bucketed tables with disabling bucketing. I feel this is dangerous. Users should not rely on disabling bucketing to read potentially wrong data from bucketed table, they should correct the table. I am more preferring to fail loud here with exception, as warning logging would be very hard to debug. But I am open to others opinions as well, cc @maropu and @cloud-fan .

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your reason sounds reasonable. But it still sounds like a potential breaking change. I'm not sure if some users read table in this way, but it is indeed possible. It will very confuse to them as they won't ask to read table in bucketed way and it works previously.

If eventually we still decide to fail the query, then a good to understand error message is necessary to let users know why we use bucketed spec here and why Spark fails to read the table. Maybe we should also provide some hints for solving it, if possible.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have some options to control missing & corrupted files for data sources, so how about following the semantics?

val IGNORE_CORRUPT_FILES = buildConf("spark.sql.files.ignoreCorruptFiles")
.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
"encountering corrupted files and the contents that have been read will still be returned. " +
"This configuration is effective only when using file-based sources such as Parquet, JSON " +
"and ORC.")
.version("2.1.1")
.booleanConf
.createWithDefault(false)
val IGNORE_MISSING_FILES = buildConf("spark.sql.files.ignoreMissingFiles")
.doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " +
"encountering missing files and the contents that have been read will still be returned. " +
"This configuration is effective only when using file-based sources such as Parquet, JSON " +
"and ORC.")
.version("2.3.0")
.booleanConf
.createWithDefault(false)

For the data corruption case above, how about throwing an exception by default and then stating in the exception message that you should use a specified option if you want to ignore it?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think we better can avoid the case that no way to overcome the bucket id corruption. Using existing option to ignore the failure sounds good.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu , @viirya - This sounds reasonable to me. Changed to use spark.sql.files.ignoreCorruptFiles to handle invalid bucket file name case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

case None =>
_ => true
}

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
)

if (filePruning(filePath)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice improvement!

val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
)
} else {
Seq.empty
}
}
}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
exchange.mapChildren(disableBucketWithInterestingPartition(
_, withInterestingPartition, true, withAllowedNode))
case scan: FileSourceScanExec =>
if (isBucketedScanWithoutFilter(scan)) {
if (scan.bucketedScan) {
if (!withInterestingPartition || (withExchange && withAllowedNode)) {
val nonBucketedScan = scan.copy(disableBucketedScan = true)
scan.logicalLink.foreach(nonBucketedScan.setLogicalLink)
Expand Down Expand Up @@ -140,20 +140,13 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
}
}

private def isBucketedScanWithoutFilter(scan: FileSourceScanExec): Boolean = {
// Do not disable bucketed table scan if it has filter pruning,
// because bucketed table scan is still useful here to save CPU/IO cost with
// only reading selected bucket files.
scan.bucketedScan && scan.optionalBucketSet.isEmpty
}

def apply(plan: SparkPlan): SparkPlan = {
lazy val hasBucketedScanWithoutFilter = plan.find {
case scan: FileSourceScanExec => isBucketedScanWithoutFilter(scan)
lazy val hasBucketedScan = plan.find {
case scan: FileSourceScanExec => scan.bucketedScan
case _ => false
}.isDefined

if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScanWithoutFilter) {
if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScan) {
plan
} else {
disableBucketWithInterestingPartition(plan, false, false, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
bucketValues: Seq[Any],
filterCondition: Column,
originalDataFrame: DataFrame): Unit = {
// This test verifies parts of the plan. Disable whole stage codegen.
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
// This test verifies parts of the plan. Disable whole stage codegen,
// automatically bucketed scan, and filter push down for json data source.
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "false",
SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "false") {
val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k")
val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
// Limit: bucket pruning only works when the bucket column has one and only one column
Expand Down Expand Up @@ -148,19 +151,53 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
if (invalidBuckets.nonEmpty) {
fail(s"Buckets ${invalidBuckets.mkString(",")} should have been pruned from:\n$plan")
}

withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
// Bucket pruning should still work when bucketing is disabled

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then do we have a config to turn on/off bucketing file pruning, or it's always applied?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - always applied. I can add one if this makes surprise to existing users/queries, and they can turn off if needed. e.g. spark.sql.sources.bucketing.pruning.enabled?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can keep the previous behavior: when SQLConf.BUCKETING_ENABLED is off, don't do any bucket optimization, including bucket scan and bucket pruning.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - make sense to me. Updated the change.

val planWithBucketDisabled = spark.table("bucketed_table").select("i", "j", "k").filter(filterCondition)
.queryExecution.executedPlan
val fileScanWithBucketDisabled = getFileScan(planWithBucketDisabled)
assert(!fileScanWithBucketDisabled.bucketedScan,
"except no bucketed scan when disabling bucketing but found\n" +
s"$fileScanWithBucketDisabled")

val tableSchema = fileScanWithBucketDisabled.schema
val bucketColumnIndex = tableSchema.fieldIndex(bucketColumnNames.head)
val bucketColumn = tableSchema.toAttributes(bucketColumnIndex)
val bucketColumnType = tableSchema.apply(bucketColumnIndex).dataType
val rowsWithInvalidBuckets = fileScanWithBucketDisabled.execute().filter(row => {
// Return rows should have been pruned
val bucketColumnValue = row.get(bucketColumnIndex, bucketColumnType)
val bucketId = BucketingUtils.getBucketIdFromValue(
bucketColumn, numBuckets, bucketColumnValue)
!matchedBuckets.get(bucketId)
}).collect()

if (rowsWithInvalidBuckets.nonEmpty) {
fail(s"Rows ${rowsWithInvalidBuckets.mkString(",")} should have been pruned from:\n" +
s"$planWithBucketDisabled")
}
}
}

val expectedDataFrame = originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")
checkAnswer(
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
expectedDataFrame)

withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
checkAnswer(
spark.table("bucketed_table").select("i", "j", "k").filter(filterCondition)
.orderBy("i", "j", "k"),
expectedDataFrame)
}
}
}

test("read partitioning bucketed tables with bucket pruning filters") {
withTable("bucketed_table") {
val numBuckets = NumBucketsForPruningDF
val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil)
// json does not support predicate push-down, and thus json is used here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not true anymore as json filter push down was added in https://issues.apache.org/jira/browse/SPARK-30648 .

df.write
.format("json")
.partitionBy("i")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ abstract class DisableUnnecessaryBucketedScanSuite
("SELECT i FROM t1", 0, 1),
("SELECT j FROM t1", 0, 0),
// Filter on bucketed column
("SELECT * FROM t1 WHERE i = 1", 1, 1),
("SELECT * FROM t1 WHERE i = 1", 0, 1),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test change is expected, as we no longer need to do bucket scan for this kind of query. See related change in DisableUnnecessaryBucketedScan.scala

// Filter on non-bucketed column
("SELECT * FROM t1 WHERE j = 1", 0, 1),
// Join with same buckets
Expand Down