Skip to content

Commit 20a6043

Browse files
committed
init commit
1 parent c70c38e commit 20a6043

File tree

2 files changed

+35
-2
lines changed

2 files changed

+35
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20+
import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
2021
import org.apache.spark.sql.catalyst.expressions._
2122
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2223
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
@@ -59,8 +60,10 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
5960
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
6061
val prunedFsRelation =
6162
fsRelation.copy(location = prunedFileIndex)(sparkSession)
62-
val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
63-
63+
val withStats = logicalRelation.catalogTable.map(_.copy(
64+
stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes)))))
65+
val prunedLogicalRelation = logicalRelation.copy(
66+
relation = prunedFsRelation, catalogTable = withStats)
6467
// Keep partition-pruning predicates so that they are visible in physical planning
6568
val filterExpression = filters.reduceLeft(And)
6669
val filter = Filter(filterExpression, prunedLogicalRelation)

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
2525
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
2626
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
2727
import org.apache.spark.sql.hive.test.TestHiveSingleton
28+
import org.apache.spark.sql.internal.SQLConf
2829
import org.apache.spark.sql.test.SQLTestUtils
2930
import org.apache.spark.sql.types.StructType
3031

@@ -66,4 +67,33 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
6667
}
6768
}
6869
}
70+
71+
test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") {
72+
withTempView("tempTbl", "partTbl") {
73+
spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl")
74+
sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet")
75+
for (part <- Seq(1, 2, 3)) {
76+
sql(
77+
s"""
78+
|INSERT OVERWRITE TABLE partTbl PARTITION (part='$part')
79+
|select id from tempTbl
80+
""".stripMargin)
81+
}
82+
83+
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
84+
val df = sql("SELECT * FROM partTbl where part = 1")
85+
val query = df.queryExecution.analyzed.analyze
86+
val sizes1 = query.collect {
87+
case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes
88+
}
89+
assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}")
90+
assert(sizes1(0) > 5000, s"expected > 5000 for test table 'src', got: ${sizes1(0)}")
91+
val sizes2 = Optimize.execute(query).collect {
92+
case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes
93+
}
94+
assert(sizes2.size === 1, s"Size wrong for:\n ${df.queryExecution}")
95+
assert(sizes2(0) < 5000, s"expected < 5000 for test table 'src', got: ${sizes2(0)}")
96+
}
97+
}
98+
}
6999
}

0 commit comments

Comments
 (0)