Skip to content

Commit 9f55860

Browse files
wangyumgatorsmile
authored andcommitted
[SPARK-24937][SQL] Datasource partition table should load empty static partitions
## What changes were proposed in this pull request? How to reproduce: ```sql spark-sql> CREATE TABLE tbl AS SELECT 1; spark-sql> CREATE TABLE tbl1 (c1 BIGINT, day STRING, hour STRING) > USING parquet > PARTITIONED BY (day, hour); spark-sql> INSERT INTO TABLE tbl1 PARTITION (day = '2018-07-25', hour='01') SELECT * FROM tbl where 1=0; spark-sql> SHOW PARTITIONS tbl1; spark-sql> CREATE TABLE tbl2 (c1 BIGINT) > PARTITIONED BY (day STRING, hour STRING); spark-sql> INSERT INTO TABLE tbl2 PARTITION (day = '2018-07-25', hour='01') SELECT * FROM tbl where 1=0; spark-sql> SHOW PARTITIONS tbl2; day=2018-07-25/hour=01 spark-sql> ``` 1. Users will be confused about whether the partition data of `tbl1` is generated. 2. Inconsistent with Hive table behavior. This pr fix this issues. ## How was this patch tested? unit tests Author: Yuming Wang <[email protected]> Closes #21883 from wangyum/SPARK-24937.
1 parent f5113ea commit 9f55860

File tree

3 files changed

+76
-2
lines changed

3 files changed

+76
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,15 @@ case class InsertIntoHadoopFsRelationCommand(
171171

172172

173173
// update metastore partition metadata
174-
refreshUpdatedPartitions(updatedPartitionPaths)
174+
if (updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty
175+
&& partitionColumns.length == staticPartitions.size) {
176+
// Avoid empty static partition can't loaded to datasource table.
177+
val staticPathFragment =
178+
PartitioningUtils.getPathFragment(staticPartitions, partitionColumns)
179+
refreshUpdatedPartitions(Set(staticPathFragment))
180+
} else {
181+
refreshUpdatedPartitions(updatedPartitionPaths)
182+
}
175183

176184
// refresh cached files in FileIndex
177185
fileIndex.foreach(_.refresh())

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.AnalysisException
3030
import org.apache.spark.sql.catalyst.InternalRow
3131
import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion}
3232
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
33-
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
33+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
3434
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3535
import org.apache.spark.sql.types._
3636
import org.apache.spark.sql.util.SchemaUtils
@@ -284,6 +284,10 @@ object PartitioningUtils {
284284
}.mkString("/")
285285
}
286286

287+
def getPathFragment(spec: TablePartitionSpec, partitionColumns: Seq[Attribute]): String = {
288+
getPathFragment(spec, StructType.fromAttributes(partitionColumns))
289+
}
290+
287291
/**
288292
* Normalize the column names in partition specification, w.r.t. the real partition column names
289293
* and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2249,6 +2249,68 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
22492249
}
22502250
}
22512251

2252+
test("Partition table should load empty static partitions") {
2253+
// All static partitions
2254+
withTable("t", "t1", "t2") {
2255+
withTempPath { dir =>
2256+
spark.sql("CREATE TABLE t(a int) USING parquet")
2257+
spark.sql("CREATE TABLE t1(a int, c string, b string) " +
2258+
s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'")
2259+
2260+
// datasource table
2261+
validateStaticPartitionTable("t1")
2262+
2263+
// hive table
2264+
if (isUsingHiveMetastore) {
2265+
spark.sql("CREATE TABLE t2(a int) " +
2266+
s"PARTITIONED BY(c string, b string) LOCATION '${dir.toURI}'")
2267+
validateStaticPartitionTable("t2")
2268+
}
2269+
2270+
def validateStaticPartitionTable(tableName: String): Unit = {
2271+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
2272+
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
2273+
assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0)
2274+
spark.sql(
2275+
s"INSERT INTO TABLE $tableName PARTITION(b='b', c='c') SELECT * FROM t WHERE 1 = 0")
2276+
assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 1)
2277+
assert(new File(dir, "c=c/b=b").exists())
2278+
checkAnswer(spark.table(tableName), Nil)
2279+
}
2280+
}
2281+
}
2282+
2283+
// Partial dynamic partitions
2284+
withTable("t", "t1", "t2") {
2285+
withTempPath { dir =>
2286+
spark.sql("CREATE TABLE t(a int) USING parquet")
2287+
spark.sql("CREATE TABLE t1(a int, b string, c string) " +
2288+
s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'")
2289+
2290+
// datasource table
2291+
validatePartialStaticPartitionTable("t1")
2292+
2293+
// hive table
2294+
if (isUsingHiveMetastore) {
2295+
spark.sql("CREATE TABLE t2(a int) " +
2296+
s"PARTITIONED BY(c string, b string) LOCATION '${dir.toURI}'")
2297+
validatePartialStaticPartitionTable("t2")
2298+
}
2299+
2300+
def validatePartialStaticPartitionTable(tableName: String): Unit = {
2301+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
2302+
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
2303+
assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0)
2304+
spark.sql(
2305+
s"INSERT INTO TABLE $tableName PARTITION(c='c', b) SELECT *, 'b' FROM t WHERE 1 = 0")
2306+
assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0)
2307+
assert(!new File(dir, "c=c/b=b").exists())
2308+
checkAnswer(spark.table(tableName), Nil)
2309+
}
2310+
}
2311+
}
2312+
}
2313+
22522314
Seq(true, false).foreach { shouldDelete =>
22532315
val tcName = if (shouldDelete) "non-existing" else "existed"
22542316
test(s"CTAS for external data source table with a $tcName location") {

0 commit comments

Comments
 (0)