Skip to content

Commit 8cd0234

Browse files
xingchaozhGitHub Enterprise
authored andcommitted
[CARMEL-6135] Report Selected Buckets Number and Tasks Count when Bucket Scan Disabled by Planner (#1034)
1 parent 038befb commit 8cd0234

File tree

1 file changed

+13
-10
lines changed

1 file changed

+13
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -521,22 +521,22 @@ case class FileSourceScanExec(
521521

522522
val withSelectedBucketsCount = relation.bucketSpec.map { spec =>
523523
val bucketedKey = "Bucketed"
524-
if (bucketedScan) {
525-
val numSelectedBuckets = optionalBucketSet.map { b =>
526-
b.cardinality()
527-
} getOrElse {
528-
spec.numBuckets
529-
}
530-
metadata ++ Map(
531-
bucketedKey -> "true",
532-
"SelectedBucketsCount" -> (s"$numSelectedBuckets out of ${spec.numBuckets}"))
524+
val withBucketedScanStatus = if (bucketedScan) {
525+
metadata + (bucketedKey -> "true")
533526
} else if (!relation.sparkSession.sessionState.conf.bucketingEnabled) {
534527
metadata + (bucketedKey -> "false (disabled by configuration)")
535528
} else if (disableBucketedScan) {
536529
metadata + (bucketedKey -> "false (disabled by query planner)")
537530
} else {
538531
metadata + (bucketedKey -> "false (bucket column(s) not read)")
539532
}
533+
val numSelectedBuckets = optionalBucketSet.map { b =>
534+
b.cardinality()
535+
} getOrElse {
536+
spec.numBuckets
537+
}
538+
withBucketedScanStatus + ("SelectedBucketsCount" ->
539+
s"$numSelectedBuckets out of ${spec.numBuckets}")
540540
} getOrElse {
541541
metadata
542542
}
@@ -634,6 +634,7 @@ case class FileSourceScanExec(
634634
val filesSize = sqlContext.queryLoadLimitationManager.
635635
checkScanSize(groupId, readRDD, bucketedScan, cancelQuery = applyIndexPrune)
636636
driverMetrics("filesSize") = filesSize
637+
driverMetrics("readRDDPartitionsNumber") = readRDD.getNumPartitions
637638
readRDD
638639
}
639640

@@ -696,7 +697,9 @@ case class FileSourceScanExec(
696697
SQLMetrics.createMetric(sparkContext, "local files replace time (ms)"),
697698
"filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files read"),
698699
"pruningTime" ->
699-
SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time")
700+
SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time"),
701+
"readRDDPartitionsNumber" -> SQLMetrics.createMetric(sparkContext,
702+
"the number of partitions of read RDD")
700703
) ++ {
701704
// Tracking scan time has overhead, we can't afford to do it for each row, and can only do
702705
// it for each batch.

0 commit comments

Comments
 (0)