diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 603d53a295609..5d09d120e486e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -370,21 +370,26 @@ case class FileSourceScanExec( "DataFilters" -> seqToString(dataFilters), "Location" -> locationDesc) - // TODO(SPARK-32986): Add bucketed scan info in explain output of FileSourceScanExec - if (bucketedScan) { - relation.bucketSpec.map { spec => + relation.bucketSpec.map { spec => + val bucketedKey = "Bucketed" + if (bucketedScan) { val numSelectedBuckets = optionalBucketSet.map { b => b.cardinality() } getOrElse { spec.numBuckets } - metadata + ("SelectedBucketsCount" -> - (s"$numSelectedBuckets out of ${spec.numBuckets}" + + metadata ++ Map( + bucketedKey -> "true", + "SelectedBucketsCount" -> (s"$numSelectedBuckets out of ${spec.numBuckets}" + optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse(""))) - } getOrElse { - metadata + } else if (!relation.sparkSession.sessionState.conf.bucketingEnabled) { + metadata + (bucketedKey -> "false (disabled by configuration)") + } else if (disableBucketedScan) { + metadata + (bucketedKey -> "false (disabled by query planner)") + } else { + metadata + (bucketedKey -> "false (bucket column(s) not read)") } - } else { + } getOrElse { metadata } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 396d227218ab8..688ded0c3eb39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -677,6 +677,33 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit } } } + + test("SPARK-32986: Bucketed scan info should be a part of explain string") { + withTable("t1", "t2") { + Seq((1, 2), (2, 3)).toDF("i", "j").write.bucketBy(8, "i").saveAsTable("t1") + Seq(2, 3).toDF("i").write.bucketBy(8, "i").saveAsTable("t2") + val df1 = spark.table("t1") + val df2 = spark.table("t2") + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { + checkKeywordsExistsInExplain( + df1.join(df2, df1("i") === df2("i")), + "Bucketed: true") + } + + withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { + checkKeywordsExistsInExplain( + df1.join(df2, df1("i") === df2("i")), + "Bucketed: false (disabled by configuration)") + } + + checkKeywordsExistsInExplain(df1, "Bucketed: false (disabled by query planner)" ) + + checkKeywordsExistsInExplain( + df1.select("j"), + "Bucketed: false (bucket column(s) not read)") + } + } } case class ExplainSingleData(id: Int)