From 84742f45061fc7189a09f48c5ca2a862880a69b9 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sun, 31 Jan 2021 22:18:48 -0800 Subject: [PATCH 01/10] Decouple bucket scan and bucket filter pruning for data source v1 --- .../sql/execution/DataSourceScanExec.scala | 34 +++++++++----- .../DisableUnnecessaryBucketedScan.scala | 15 ++----- .../spark/sql/sources/BucketedReadSuite.scala | 45 +++++++++++++++++-- .../DisableUnnecessaryBucketedScanSuite.scala | 2 +- 4 files changed, 70 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 02fb73ed30680..05f7d8b34c617 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -591,20 +591,34 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + val filePruning: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) => + filePath => bucketSet.get(BucketingUtils.getBucketId(filePath.getName) + .getOrElse(sys.error(s"Invalid bucket file $filePath"))) + case None => + _ => true + } + val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => // getPath() is very expensive so we only want to call it once in this block: val filePath = file.getPath - val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( - sparkSession = relation.sparkSession, - file = file, - filePath = filePath, - isSplitable = isSplitable, - maxSplitBytes = maxSplitBytes, - partitionValues = partition.values - ) + + if (filePruning(filePath)) { + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, relation.options, filePath) + PartitionedFileUtil.splitFiles( + sparkSession = relation.sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) + } else { + Seq.empty + } } }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala index 6b195b3b49f09..98bcab2a839af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala @@ -98,7 +98,7 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] { exchange.mapChildren(disableBucketWithInterestingPartition( _, withInterestingPartition, true, withAllowedNode)) case scan: FileSourceScanExec => - if (isBucketedScanWithoutFilter(scan)) { + if (scan.bucketedScan) { if (!withInterestingPartition || (withExchange && withAllowedNode)) { val nonBucketedScan = scan.copy(disableBucketedScan = true) scan.logicalLink.foreach(nonBucketedScan.setLogicalLink) @@ -140,20 +140,13 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] { } } - private def isBucketedScanWithoutFilter(scan: FileSourceScanExec): Boolean = { - // Do not disable bucketed table scan if it has filter pruning, - // because bucketed table scan is still useful here to save CPU/IO cost with - // only reading selected bucket files. - scan.bucketedScan && scan.optionalBucketSet.isEmpty - } - def apply(plan: SparkPlan): SparkPlan = { - lazy val hasBucketedScanWithoutFilter = plan.find { - case scan: FileSourceScanExec => isBucketedScanWithoutFilter(scan) + lazy val hasBucketedScan = plan.find { + case scan: FileSourceScanExec => scan.bucketedScan case _ => false }.isDefined - if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScanWithoutFilter) { + if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScan) { plan } else { disableBucketWithInterestingPartition(plan, false, false, true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 9dcc0cfda93f1..bf5254107107d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -117,8 +117,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti bucketValues: Seq[Any], filterCondition: Column, originalDataFrame: DataFrame): Unit = { - // This test verifies parts of the plan. Disable whole stage codegen. - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + // This test verifies parts of the plan. Disable whole stage codegen, + // automatically bucketed scan, and filter push down for json data source. + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "false", + SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "false") { val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k") val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec // Limit: bucket pruning only works when the bucket column has one and only one column @@ -148,11 +151,46 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti if (invalidBuckets.nonEmpty) { fail(s"Buckets ${invalidBuckets.mkString(",")} should have been pruned from:\n$plan") } + + withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { + // Bucket pruning should still work when bucketing is disabled + val planWithBucketDisabled = spark.table("bucketed_table").select("i", "j", "k").filter(filterCondition) + .queryExecution.executedPlan + val fileScanWithBucketDisabled = getFileScan(planWithBucketDisabled) + assert(!fileScanWithBucketDisabled.bucketedScan, + "except no bucketed scan when disabling bucketing but found\n" + + s"$fileScanWithBucketDisabled") + + val tableSchema = fileScanWithBucketDisabled.schema + val bucketColumnIndex = tableSchema.fieldIndex(bucketColumnNames.head) + val bucketColumn = tableSchema.toAttributes(bucketColumnIndex) + val bucketColumnType = tableSchema.apply(bucketColumnIndex).dataType + val rowsWithInvalidBuckets = fileScanWithBucketDisabled.execute().filter(row => { + // Return rows should have been pruned + val bucketColumnValue = row.get(bucketColumnIndex, bucketColumnType) + val bucketId = BucketingUtils.getBucketIdFromValue( + bucketColumn, numBuckets, bucketColumnValue) + !matchedBuckets.get(bucketId) + }).collect() + + if (rowsWithInvalidBuckets.nonEmpty) { + fail(s"Rows ${rowsWithInvalidBuckets.mkString(",")} should have been pruned from:\n" + + s"$planWithBucketDisabled") + } + } } + val expectedDataFrame = originalDataFrame.filter(filterCondition).orderBy("i", "j", "k") checkAnswer( bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), - originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")) + expectedDataFrame) + + withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { + checkAnswer( + spark.table("bucketed_table").select("i", "j", "k").filter(filterCondition) + .orderBy("i", "j", "k"), + expectedDataFrame) + } } } @@ -160,7 +198,6 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti withTable("bucketed_table") { val numBuckets = NumBucketsForPruningDF val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil) - // json does not support predicate push-down, and thus json is used here df.write .format("json") .partitionBy("i") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala index 179cdeb976391..1a19824a31555 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala @@ -95,7 +95,7 @@ abstract class DisableUnnecessaryBucketedScanSuite ("SELECT i FROM t1", 0, 1), ("SELECT j FROM t1", 0, 0), // Filter on bucketed column - ("SELECT * FROM t1 WHERE i = 1", 1, 1), + ("SELECT * FROM t1 WHERE i = 1", 0, 1), // Filter on non-bucketed column ("SELECT * FROM t1 WHERE j = 1", 0, 1), // Join with same buckets From 3b72a6b00482b2e390ce66ab9823aabf3232a257 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sun, 31 Jan 2021 22:21:42 -0800 Subject: [PATCH 02/10] Remove unnecessary comment --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 05f7d8b34c617..a67b3bdd87cb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -545,7 +545,6 @@ case class FileSourceScanExec( .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) } - // TODO(SPARK-32985): Decouple bucket filter pruning and bucketed table scan val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { val bucketSet = optionalBucketSet.get filesGroupedToBuckets.filter { From cd90f0a77fcf9e741ac4dd85ad3b678dde06ea13 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sun, 31 Jan 2021 22:57:24 -0800 Subject: [PATCH 03/10] Fix style --- .../org/apache/spark/sql/sources/BucketedReadSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index bf5254107107d..473565f3c4813 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -154,8 +154,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { // Bucket pruning should still work when bucketing is disabled - val planWithBucketDisabled = spark.table("bucketed_table").select("i", "j", "k").filter(filterCondition) - .queryExecution.executedPlan + val planWithBucketDisabled = spark.table("bucketed_table").select("i", "j", "k") + .filter(filterCondition).queryExecution.executedPlan val fileScanWithBucketDisabled = getFileScan(planWithBucketDisabled) assert(!fileScanWithBucketDisabled.bucketedScan, "except no bucketed scan when disabling bucketing but found\n" + From e63a8c3f5799351fd902bfab706740ad241f68ed Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Mon, 1 Feb 2021 13:54:15 -0800 Subject: [PATCH 04/10] Address all comments --- .../sql/execution/DataSourceScanExec.scala | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index a67b3bdd87cb1..666ac72f03236 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -542,7 +542,7 @@ case class FileSourceScanExec( }.groupBy { f => BucketingUtils .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + .getOrElse(throw new IllegalStateException(s"Invalid bucket file ${f.filePath}")) } val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { @@ -591,10 +591,24 @@ case class FileSourceScanExec( s"open cost is considered as scanning $openCostInBytes bytes.") // Filter files with bucket pruning if possible - val filePruning: Path => Boolean = optionalBucketSet match { + val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles + val canPrune: Path => Boolean = optionalBucketSet match { case Some(bucketSet) => - filePath => bucketSet.get(BucketingUtils.getBucketId(filePath.getName) - .getOrElse(sys.error(s"Invalid bucket file $filePath"))) + filePath => { + BucketingUtils.getBucketId(filePath.getName) match { + case Some(id) => bucketSet.get(id) + case None => + if (ignoreCorruptFiles) { + // If ignoring corrupt file, do not prune when bucket file name is invalid + true + } else { + throw new IllegalStateException( + s"Invalid bucket file $filePath when doing bucket pruning. " + + s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable exception " + + "and read the file.") + } + } + } case None => _ => true } @@ -604,7 +618,7 @@ case class FileSourceScanExec( // getPath() is very expensive so we only want to call it once in this block: val filePath = file.getPath - if (filePruning(filePath)) { + if (canPrune(filePath)) { val isSplitable = relation.fileFormat.isSplitable( relation.sparkSession, relation.options, filePath) PartitionedFileUtil.splitFiles( From 3d348a6e58d70175ca35831c8567b41fa25762a7 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Mon, 1 Feb 2021 20:41:33 -0800 Subject: [PATCH 05/10] Address all comments --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 666ac72f03236..4c639e4697eb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -591,7 +591,7 @@ case class FileSourceScanExec( s"open cost is considered as scanning $openCostInBytes bytes.") // Filter files with bucket pruning if possible - val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles + lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles val canPrune: Path => Boolean = optionalBucketSet match { case Some(bucketSet) => filePath => { @@ -604,8 +604,8 @@ case class FileSourceScanExec( } else { throw new IllegalStateException( s"Invalid bucket file $filePath when doing bucket pruning. " + - s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to disable exception " + - "and read the file.") + s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to ignore exception " + + "and read the file.") } } } From 9a6999d507073bcb56e1fe6266102fcfed6c884c Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Mon, 1 Feb 2021 22:32:46 -0800 Subject: [PATCH 06/10] Change naming again --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 4c639e4697eb1..f67170bf2fe4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -592,7 +592,7 @@ case class FileSourceScanExec( // Filter files with bucket pruning if possible lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles - val canPrune: Path => Boolean = optionalBucketSet match { + val shouldProcess: Path => Boolean = optionalBucketSet match { case Some(bucketSet) => filePath => { BucketingUtils.getBucketId(filePath.getName) match { @@ -618,7 +618,7 @@ case class FileSourceScanExec( // getPath() is very expensive so we only want to call it once in this block: val filePath = file.getPath - if (canPrune(filePath)) { + if (shouldProcess(filePath)) { val isSplitable = relation.fileFormat.isSplitable( relation.sparkSession, relation.options, filePath) PartitionedFileUtil.splitFiles( From 7d2f84978266929d0d7903dae163cf54de5106d5 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 3 Feb 2021 13:18:58 -0800 Subject: [PATCH 07/10] Follow existing behavior to not prune file if name is invalid --- .../spark/sql/execution/DataSourceScanExec.scala | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index f67170bf2fe4d..264f7498351c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -591,22 +591,14 @@ case class FileSourceScanExec( s"open cost is considered as scanning $openCostInBytes bytes.") // Filter files with bucket pruning if possible - lazy val ignoreCorruptFiles = fsRelation.sparkSession.sessionState.conf.ignoreCorruptFiles val shouldProcess: Path => Boolean = optionalBucketSet match { case Some(bucketSet) => filePath => { BucketingUtils.getBucketId(filePath.getName) match { case Some(id) => bucketSet.get(id) case None => - if (ignoreCorruptFiles) { - // If ignoring corrupt file, do not prune when bucket file name is invalid - true - } else { - throw new IllegalStateException( - s"Invalid bucket file $filePath when doing bucket pruning. " + - s"Enable ${SQLConf.IGNORE_CORRUPT_FILES.key} to ignore exception " + - "and read the file.") - } + // Do not prune the file if bucket file name is invalid + true } } case None => From 820c164568c73f41a25c8d9e40192752737fdb05 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 3 Feb 2021 21:20:22 -0800 Subject: [PATCH 08/10] Address comment to use spark.sql.sources.bucketing.enabled to turn off pruning --- .../sql/execution/DataSourceScanExec.scala | 3 ++- .../spark/sql/sources/BucketedReadSuite.scala | 20 +++++++++---------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 264f7498351c9..3ed414b179e39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -591,8 +591,9 @@ case class FileSourceScanExec( s"open cost is considered as scanning $openCostInBytes bytes.") // Filter files with bucket pruning if possible + lazy val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled val shouldProcess: Path => Boolean = optionalBucketSet match { - case Some(bucketSet) => + case Some(bucketSet) if bucketingEnabled => filePath => { BucketingUtils.getBucketId(filePath.getName) match { case Some(id) => bucketSet.get(id) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 473565f3c4813..0063da2fff134 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -152,20 +152,18 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti fail(s"Buckets ${invalidBuckets.mkString(",")} should have been pruned from:\n$plan") } - withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { - // Bucket pruning should still work when bucketing is disabled - val planWithBucketDisabled = spark.table("bucketed_table").select("i", "j", "k") + withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { + // Bucket pruning should still work without bucketed scan + val planWithoutBucketedScan = spark.table("bucketed_table").select("i", "j", "k") .filter(filterCondition).queryExecution.executedPlan - val fileScanWithBucketDisabled = getFileScan(planWithBucketDisabled) - assert(!fileScanWithBucketDisabled.bucketedScan, - "except no bucketed scan when disabling bucketing but found\n" + - s"$fileScanWithBucketDisabled") + val fileScan = getFileScan(planWithoutBucketedScan) + assert(!fileScan.bucketedScan, s"except no bucketed scan but found\n$fileScan") - val tableSchema = fileScanWithBucketDisabled.schema + val tableSchema = fileScan.schema val bucketColumnIndex = tableSchema.fieldIndex(bucketColumnNames.head) val bucketColumn = tableSchema.toAttributes(bucketColumnIndex) val bucketColumnType = tableSchema.apply(bucketColumnIndex).dataType - val rowsWithInvalidBuckets = fileScanWithBucketDisabled.execute().filter(row => { + val rowsWithInvalidBuckets = fileScan.execute().filter(row => { // Return rows should have been pruned val bucketColumnValue = row.get(bucketColumnIndex, bucketColumnType) val bucketId = BucketingUtils.getBucketIdFromValue( @@ -175,7 +173,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti if (rowsWithInvalidBuckets.nonEmpty) { fail(s"Rows ${rowsWithInvalidBuckets.mkString(",")} should have been pruned from:\n" + - s"$planWithBucketDisabled") + s"$planWithoutBucketedScan") } } } @@ -185,7 +183,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), expectedDataFrame) - withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { + withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { checkAnswer( spark.table("bucketed_table").select("i", "j", "k").filter(filterCondition) .orderBy("i", "j", "k"), From 066d5a4709d64e16d9f5226a55df13527a432010 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 3 Feb 2021 21:50:22 -0800 Subject: [PATCH 09/10] Refine test and fix scala compilation error --- .../sql/execution/DataSourceScanExec.scala | 2 +- .../spark/sql/sources/BucketedReadSuite.scala | 17 +++++++---------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 3ed414b179e39..5de8ddec5e8b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -602,7 +602,7 @@ case class FileSourceScanExec( true } } - case None => + case _ => _ => true } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 0063da2fff134..d0f569cf675f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -122,7 +122,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "false", SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "false") { - val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k") + val bucketedDataFrame = spark.table("bucketed_table") val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec // Limit: bucket pruning only works when the bucket column has one and only one column assert(bucketColumnNames.length == 1) @@ -154,15 +154,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { // Bucket pruning should still work without bucketed scan - val planWithoutBucketedScan = spark.table("bucketed_table").select("i", "j", "k") - .filter(filterCondition).queryExecution.executedPlan + val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition) + .queryExecution.executedPlan val fileScan = getFileScan(planWithoutBucketedScan) assert(!fileScan.bucketedScan, s"except no bucketed scan but found\n$fileScan") - val tableSchema = fileScan.schema - val bucketColumnIndex = tableSchema.fieldIndex(bucketColumnNames.head) - val bucketColumn = tableSchema.toAttributes(bucketColumnIndex) - val bucketColumnType = tableSchema.apply(bucketColumnIndex).dataType + val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType val rowsWithInvalidBuckets = fileScan.execute().filter(row => { // Return rows should have been pruned val bucketColumnValue = row.get(bucketColumnIndex, bucketColumnType) @@ -179,14 +176,14 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } val expectedDataFrame = originalDataFrame.filter(filterCondition).orderBy("i", "j", "k") + .select("i", "j", "k") checkAnswer( - bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), + bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k").select("i", "j", "k"), expectedDataFrame) withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { checkAnswer( - spark.table("bucketed_table").select("i", "j", "k").filter(filterCondition) - .orderBy("i", "j", "k"), + bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k").select("i", "j", "k"), expectedDataFrame) } } From 03120af6b31af89dbc9fb9aad05045e98d52c699 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 4 Feb 2021 20:42:42 -0800 Subject: [PATCH 10/10] Remove lazy val --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 5de8ddec5e8b2..38e63d425bb21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -591,7 +591,7 @@ case class FileSourceScanExec( s"open cost is considered as scanning $openCostInBytes bytes.") // Filter files with bucket pruning if possible - lazy val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled + val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled val shouldProcess: Path => Boolean = optionalBucketSet match { case Some(bucketSet) if bucketingEnabled => filePath => {