Skip to content

Commit 3c2ba9f

Browse files
committed
[SPARK-19305][SQL] partitioned table should always put partition columns at the end of table schema
## What changes were proposed in this pull request? For data source tables, we will always reorder the specified table schema, or the query in CTAS, to put partition columns at the end. e.g. `CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)` will create a table with schema `<a, c, d, b>` Hive serde tables don't have this problem before, because its CREATE TABLE syntax specifies data schema and partition schema individually. However, after we unifed the CREATE TABLE syntax, Hive serde table also need to do the reorder. This PR puts the reorder logic in a analyzer rule, which works with both data source tables and Hive serde tables. ## How was this patch tested? new regression test Author: Wenchen Fan <[email protected]> Closes #16655 from cloud-fan/schema.
1 parent f174cdc commit 3c2ba9f

File tree

2 files changed

+72
-18
lines changed
  • sql
    • core/src/main/scala/org/apache/spark/sql/execution/datasources
    • hive/src/test/scala/org/apache/spark/sql/hive/execution

2 files changed

+72
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -199,31 +199,55 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
199199
// * can't use all table columns as partition columns.
200200
// * partition columns' type must be AtomicType.
201201
// * sort columns' type must be orderable.
202+
// * reorder table schema or output of query plan, to put partition columns at the end.
202203
case c @ CreateTable(tableDesc, _, query) =>
203-
val analyzedQuery = query.map { q =>
204-
// Analyze the query in CTAS and then we can do the normalization and checking.
205-
val qe = sparkSession.sessionState.executePlan(q)
204+
if (query.isDefined) {
205+
assert(tableDesc.schema.isEmpty,
206+
"Schema may not be specified in a Create Table As Select (CTAS) statement")
207+
208+
val qe = sparkSession.sessionState.executePlan(query.get)
206209
qe.assertAnalyzed()
207-
qe.analyzed
208-
}
209-
val schema = if (analyzedQuery.isDefined) {
210-
analyzedQuery.get.schema
211-
} else {
212-
tableDesc.schema
213-
}
210+
val analyzedQuery = qe.analyzed
211+
212+
val normalizedTable = normalizeCatalogTable(analyzedQuery.schema, tableDesc)
213+
214+
val output = analyzedQuery.output
215+
val partitionAttrs = normalizedTable.partitionColumnNames.map { partCol =>
216+
output.find(_.name == partCol).get
217+
}
218+
val newOutput = output.filterNot(partitionAttrs.contains) ++ partitionAttrs
219+
val reorderedQuery = if (newOutput == output) {
220+
analyzedQuery
221+
} else {
222+
Project(newOutput, analyzedQuery)
223+
}
214224

215-
val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
216-
schema.map(_.name)
225+
c.copy(tableDesc = normalizedTable, query = Some(reorderedQuery))
217226
} else {
218-
schema.map(_.name.toLowerCase)
227+
val normalizedTable = normalizeCatalogTable(tableDesc.schema, tableDesc)
228+
229+
val partitionSchema = normalizedTable.partitionColumnNames.map { partCol =>
230+
normalizedTable.schema.find(_.name == partCol).get
231+
}
232+
233+
val reorderedSchema =
234+
StructType(normalizedTable.schema.filterNot(partitionSchema.contains) ++ partitionSchema)
235+
236+
c.copy(tableDesc = normalizedTable.copy(schema = reorderedSchema))
219237
}
220-
checkDuplication(columnNames, "table definition of " + tableDesc.identifier)
238+
}
221239

222-
val normalizedTable = tableDesc.copy(
223-
partitionColumnNames = normalizePartitionColumns(schema, tableDesc),
224-
bucketSpec = normalizeBucketSpec(schema, tableDesc))
240+
private def normalizeCatalogTable(schema: StructType, table: CatalogTable): CatalogTable = {
241+
val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
242+
schema.map(_.name)
243+
} else {
244+
schema.map(_.name.toLowerCase)
245+
}
246+
checkDuplication(columnNames, "table definition of " + table.identifier)
225247

226-
c.copy(tableDesc = normalizedTable, query = analyzedQuery)
248+
table.copy(
249+
partitionColumnNames = normalizePartitionColumns(schema, table),
250+
bucketSpec = normalizeBucketSpec(schema, table))
227251
}
228252

229253
private def normalizePartitionColumns(schema: StructType, table: CatalogTable): Seq[String] = {

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,4 +1384,34 @@ class HiveDDLSuite
13841384
assert(e2.message.contains("Hive data source can only be used with tables"))
13851385
}
13861386
}
1387+
1388+
test("partitioned table should always put partition columns at the end of table schema") {
1389+
def getTableColumns(tblName: String): Seq[String] = {
1390+
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tblName)).schema.map(_.name)
1391+
}
1392+
1393+
withTable("t", "t1", "t2", "t3", "t4") {
1394+
sql("CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)")
1395+
assert(getTableColumns("t") == Seq("a", "c", "d", "b"))
1396+
1397+
sql("CREATE TABLE t1 USING parquet PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d")
1398+
assert(getTableColumns("t1") == Seq("a", "c", "d", "b"))
1399+
1400+
Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.partitionBy("d", "b").saveAsTable("t2")
1401+
assert(getTableColumns("t2") == Seq("a", "c", "d", "b"))
1402+
1403+
withTempPath { path =>
1404+
val dataPath = new File(new File(path, "d=1"), "b=1").getCanonicalPath
1405+
Seq(1 -> 1).toDF("a", "c").write.save(dataPath)
1406+
1407+
sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.getCanonicalPath}'")
1408+
assert(getTableColumns("t3") == Seq("a", "c", "d", "b"))
1409+
}
1410+
1411+
sql("CREATE TABLE t4(a int, b int, c int, d int) USING hive PARTITIONED BY (d, b)")
1412+
assert(getTableColumns("t4") == Seq("a", "c", "d", "b"))
1413+
1414+
// TODO: add test for creating partitioned hive serde table as select, once we support it.
1415+
}
1416+
}
13871417
}

0 commit comments

Comments
 (0)