Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1787,6 +1787,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING =
buildConf("spark.sql.legacy.bucketedTableScan.outputOrdering")
.internal()
.doc("When true, the bucketed table scan will list files during planning to figure out the " +
"output ordering, which is expensive and may make the planning quite slow.")
.booleanConf
.createWithDefault(false)

val ARITHMETIC_OPERATIONS_FAIL_ON_OVERFLOW =
buildConf("spark.sql.arithmeticOperations.failOnOverFlow")
.doc("If it is set to true, all arithmetic operations on non-decimal fields throw an " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -187,6 +188,9 @@ case class FileSourceScanExec(
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
if (relation.partitionSchemaOption.isDefined) {
driverMetrics("numPartitions") = ret.length
}
driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum
val timeTakenMs = NANOSECONDS.toMillis(
(System.nanoTime() - startTime) + optimizerMetadataTimeNs)
Expand Down Expand Up @@ -237,8 +241,12 @@ case class FileSourceScanExec(
val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
val sortColumns =
spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
val shouldCalculateSortOrder =
conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) &&
sortColumns.nonEmpty &&
!hasPartitionsAvailableAtRunTime

val sortOrder = if (sortColumns.nonEmpty && !hasPartitionsAvailableAtRunTime) {
val sortOrder = if (shouldCalculateSortOrder) {
// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
Expand Down Expand Up @@ -287,23 +295,17 @@ case class FileSourceScanExec(
"PushedFilters" -> seqToString(pushedDownFilters),
"DataFilters" -> seqToString(dataFilters),
"Location" -> locationDesc)
val withOptPartitionCount = if (relation.partitionSchemaOption.isDefined &&
!hasPartitionsAvailableAtRunTime) {
metadata + ("PartitionCount" -> selectedPartitions.size.toString)
} else {
metadata
}

val withSelectedBucketsCount = relation.bucketSpec.map { spec =>
val numSelectedBuckets = optionalBucketSet.map { b =>
b.cardinality()
} getOrElse {
spec.numBuckets
}
withOptPartitionCount + ("SelectedBucketsCount" ->
metadata + ("SelectedBucketsCount" ->
s"$numSelectedBuckets out of ${spec.numBuckets}")
} getOrElse {
withOptPartitionCount
metadata
}

withSelectedBucketsCount
Expand Down Expand Up @@ -346,6 +348,12 @@ case class FileSourceScanExec(
} else {
None
}
} ++ {
if (relation.partitionSchemaOption.isDefined) {
Some("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"))
} else {
None
}
}

protected override def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedS
abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
import testImplicits._

protected override def beforeAll(): Unit = {
super.beforeAll()
spark.sessionState.conf.setConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true)
}

protected override def afterAll(): Unit = {
spark.sessionState.conf.unsetConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a "store and recover the old conf" instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the test, we assume that every test suite will keep the shared SparkSession clean after tests are run. So the old conf should be the default conf here, and we only need to call unsetConf to restore the default config.

super.afterAll()
}

private val maxI = 5
private val maxJ = 13
private lazy val df = (0 until 50).map(i => (i % maxI, i % maxJ, i.toString)).toDF("i", "j", "k")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.sql.hive.execution

import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
Expand Down Expand Up @@ -207,4 +207,29 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
}
}

test("SPARK-28595: explain should not trigger partition listing") {
Seq(true, false).foreach { legacyBucketedScan =>
withSQLConf(
SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING.key -> legacyBucketedScan.toString) {
HiveCatalogMetrics.reset()
withTable("t") {
sql(
"""
|CREATE TABLE t USING json
|PARTITIONED BY (j)
|CLUSTERED BY (i) SORTED BY (i) INTO 4 BUCKETS
|AS SELECT 1 i, 2 j
""".stripMargin)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount == 0)
spark.table("t").sort($"i").explain()
if (legacyBucketedScan) {
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount > 0)
} else {
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount == 0)
}
}
}
}
}
}