diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ae65f29c32264..80a7d4efe4e52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1787,6 +1787,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING = + buildConf("spark.sql.legacy.bucketedTableScan.outputOrdering") + .internal() + .doc("When true, the bucketed table scan will list files during planning to figure out the " + + "output ordering, which is expensive and may make the planning quite slow.") + .booleanConf + .createWithDefault(false) + val ARITHMETIC_OPERATIONS_FAIL_ON_OVERFLOW = buildConf("spark.sql.arithmeticOperations.failOnOverFlow") .doc("If it is set to true, all arithmetic operations on non-decimal fields throw an " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 984f4d3474b03..b76cd9fc07b47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -187,6 +188,9 @@ case class FileSourceScanExec( val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) + if (relation.partitionSchemaOption.isDefined) { + driverMetrics("numPartitions") = ret.length + } driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum val timeTakenMs = NANOSECONDS.toMillis( (System.nanoTime() - startTime) + optimizerMetadataTimeNs) @@ -237,8 +241,12 @@ case class FileSourceScanExec( val partitioning = HashPartitioning(bucketColumns, spec.numBuckets) val sortColumns = spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get) + val shouldCalculateSortOrder = + conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) && + sortColumns.nonEmpty && + !hasPartitionsAvailableAtRunTime - val sortOrder = if (sortColumns.nonEmpty && !hasPartitionsAvailableAtRunTime) { + val sortOrder = if (shouldCalculateSortOrder) { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted // but those files combined together are not globally sorted. Given that, @@ -287,12 +295,6 @@ case class FileSourceScanExec( "PushedFilters" -> seqToString(pushedDownFilters), "DataFilters" -> seqToString(dataFilters), "Location" -> locationDesc) - val withOptPartitionCount = if (relation.partitionSchemaOption.isDefined && - !hasPartitionsAvailableAtRunTime) { - metadata + ("PartitionCount" -> selectedPartitions.size.toString) - } else { - metadata - } val withSelectedBucketsCount = relation.bucketSpec.map { spec => val numSelectedBuckets = optionalBucketSet.map { b => @@ -300,10 +302,10 @@ case class FileSourceScanExec( } getOrElse { spec.numBuckets } - withOptPartitionCount + ("SelectedBucketsCount" -> + metadata + ("SelectedBucketsCount" -> s"$numSelectedBuckets out of ${spec.numBuckets}") } getOrElse { - withOptPartitionCount + metadata } withSelectedBucketsCount @@ -346,6 +348,12 @@ case class FileSourceScanExec( } else { None } + } ++ { + if (relation.partitionSchemaOption.isDefined) { + Some("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read")) + } else { + None + } } protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index d52a78033e6cd..c3edec39979ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -49,6 +49,16 @@ class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedS abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { import testImplicits._ + protected override def beforeAll(): Unit = { + super.beforeAll() + spark.sessionState.conf.setConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true) + } + + protected override def afterAll(): Unit = { + spark.sessionState.conf.unsetConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) + super.afterAll() + } + private val maxI = 5 private val maxJ = 13 private lazy val df = (0 until 50).map(i => (i % maxI, i % maxJ, i.toString)).toDF("i", "j", "k") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index d413dfb2b2dc5..68ccee5e6623a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -207,4 +207,29 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } } } + + test("SPARK-28595: explain should not trigger partition listing") { + Seq(true, false).foreach { legacyBucketedScan => + withSQLConf( + SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING.key -> legacyBucketedScan.toString) { + HiveCatalogMetrics.reset() + withTable("t") { + sql( + """ + |CREATE TABLE t USING json + |PARTITIONED BY (j) + |CLUSTERED BY (i) SORTED BY (i) INTO 4 BUCKETS + |AS SELECT 1 i, 2 j + """.stripMargin) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount == 0) + spark.table("t").sort($"i").explain() + if (legacyBucketedScan) { + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount > 0) + } else { + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount == 0) + } + } + } + } + } }