Skip to content

Commit 3bb8cd9

Browse files
peter-tothcloud-fan
authored andcommitted
[SPARK-36568][SQL] Better FileScan statistics estimation
### What changes were proposed in this pull request? This PR modifies `FileScan.estimateStatistics()` to take the read schema into account. ### Why are the changes needed? `V2ScanRelationPushDown` can column prune `DataSourceV2ScanRelation`s and change read schema of `Scan` operations. The better statistics returned by `FileScan.estimateStatistics()` can mean better query plans. For example, with this change the broadcast issue in SPARK-36568 can be avoided. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new UT. Closes #33825 from peter-toth/SPARK-36568-scan-statistics-estimation. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 159ff9f commit 3bb8cd9

File tree

6 files changed

+35
-4
lines changed

6 files changed

+35
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ trait FileScan extends Scan
4949

5050
def fileIndex: PartitioningAwareFileIndex
5151

52+
def dataSchema: StructType
53+
5254
/**
5355
* Returns the required data schema
5456
*/
@@ -187,7 +189,10 @@ trait FileScan extends Scan
187189
new Statistics {
188190
override def sizeInBytes(): OptionalLong = {
189191
val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor
190-
val size = (compressionFactor * fileIndex.sizeInBytes).toLong
192+
val size = (compressionFactor * fileIndex.sizeInBytes /
193+
(dataSchema.defaultSize + fileIndex.partitionSchema.defaultSize) *
194+
(readDataSchema.defaultSize + readPartitionSchema.defaultSize)).toLong
195+
191196
OptionalLong.of(size)
192197
}
193198

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.util.SerializableConfiguration
3333
case class TextScan(
3434
sparkSession: SparkSession,
3535
fileIndex: PartitioningAwareFileIndex,
36+
dataSchema: StructType,
3637
readDataSchema: StructType,
3738
readPartitionSchema: StructType,
3839
options: CaseInsensitiveStringMap,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScanBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ case class TextScanBuilder(
3333
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
3434

3535
override def build(): Scan = {
36-
TextScan(sparkSession, fileIndex, readDataSchema(), readPartitionSchema(), options)
36+
TextScan(sparkSession, fileIndex, dataSchema, readDataSchema(), readPartitionSchema(), options)
3737
}
3838
}

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,28 @@ class FileBasedDataSourceSuite extends QueryTest
731731
}
732732
}
733733

734+
test("SPARK-36568: FileScan statistics estimation takes read schema into account") {
735+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
736+
withTempDir { dir =>
737+
spark.range(1000).map(x => (x / 100, x, x)).toDF("k", "v1", "v2").
738+
write.partitionBy("k").mode(SaveMode.Overwrite).orc(dir.toString)
739+
val dfAll = spark.read.orc(dir.toString)
740+
val dfK = dfAll.select("k")
741+
val dfV1 = dfAll.select("v1")
742+
val dfV2 = dfAll.select("v2")
743+
val dfV1V2 = dfAll.select("v1", "v2")
744+
745+
def sizeInBytes(df: DataFrame): BigInt = df.queryExecution.optimizedPlan.stats.sizeInBytes
746+
747+
assert(sizeInBytes(dfAll) === BigInt(getLocalDirSize(dir)))
748+
assert(sizeInBytes(dfK) < sizeInBytes(dfAll))
749+
assert(sizeInBytes(dfV1) < sizeInBytes(dfAll))
750+
assert(sizeInBytes(dfV2) === sizeInBytes(dfV1))
751+
assert(sizeInBytes(dfV1V2) < sizeInBytes(dfAll))
752+
}
753+
}
754+
}
755+
734756
test("File source v2: support partition pruning") {
735757
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
736758
allFileBasedDataSources.foreach { format =>

sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ class FileScanSuite extends FileScanSuiteBase {
367367
(s, fi, ds, rds, rps, f, o, pf, df) => JsonScan(s, fi, ds, rds, rps, o, f, pf, df),
368368
Seq.empty),
369369
("TextScan",
370-
(s, fi, _, rds, rps, _, o, pf, df) => TextScan(s, fi, rds, rps, o, pf, df),
370+
(s, fi, ds, rds, rps, _, o, pf, df) => TextScan(s, fi, ds, rds, rps, o, pf, df),
371371
Seq("dataSchema", "pushedFilters")))
372372

373373
run(scanBuilders)

sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.net.URI
2222
import java.nio.file.Files
2323
import java.util.{Locale, UUID}
2424

25+
import scala.collection.JavaConverters._
2526
import scala.concurrent.duration._
2627
import scala.language.implicitConversions
2728
import scala.util.control.NonFatal
@@ -459,7 +460,9 @@ private[sql] trait SQLTestUtilsBase
459460
*/
460461
def getLocalDirSize(file: File): Long = {
461462
assert(file.isDirectory)
462-
file.listFiles.filter(f => DataSourceUtils.isDataFile(f.getName)).map(_.length).sum
463+
Files.walk(file.toPath).iterator().asScala
464+
.filter(p => Files.isRegularFile(p) && DataSourceUtils.isDataFile(p.getFileName.toString))
465+
.map(_.toFile.length).sum
463466
}
464467
}
465468

0 commit comments

Comments
 (0)