Skip to content

Commit 2585d2b

Browse files
author
Andrew Or
committed
[SPARK-15279][SQL] Catch conflicting SerDe when creating table
## What changes were proposed in this pull request? The user may do something like: ``` CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS PARQUET CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS ... SERDE 'myserde' CREATE TABLE my_tab ROW FORMAT DELIMITED ... STORED AS ORC CREATE TABLE my_tab ROW FORMAT DELIMITED ... STORED AS ... SERDE 'myserde' ``` None of these should be allowed because the SerDe's conflict. As of this patch: - `ROW FORMAT DELIMITED` is only compatible with `TEXTFILE` - `ROW FORMAT SERDE` is only compatible with `TEXTFILE`, `RCFILE` and `SEQUENCEFILE` ## How was this patch tested? New tests in `DDLCommandSuite`. Author: Andrew Or <[email protected]> Closes #13068 from andrewor14/row-format-conflict.
1 parent 07c36a2 commit 2585d2b

File tree

4 files changed

+129
-33
lines changed

4 files changed

+129
-33
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,8 @@ createFileFormat
267267
;
268268

269269
fileFormat
270-
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? #tableFileFormat
271-
| identifier #genericFileFormat
270+
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING #tableFileFormat
271+
| identifier #genericFileFormat
272272
;
273273

274274
storageHandler

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -796,14 +796,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
796796
*
797797
* Expected format:
798798
* {{{
799-
* CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
800-
* [(col1 data_type [COMMENT col_comment], ...)]
799+
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
800+
* [(col1[:] data_type [COMMENT col_comment], ...)]
801801
* [COMMENT table_comment]
802-
* [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
803-
* [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
804-
* [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]]
802+
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
805803
* [ROW FORMAT row_format]
806-
* [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
804+
* [STORED AS file_format]
807805
* [LOCATION path]
808806
* [TBLPROPERTIES (property_name=property_value, ...)]
809807
* [AS select_statement];
@@ -849,6 +847,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
849847
compressed = false,
850848
serdeProperties = Map())
851849
}
850+
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
852851
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
853852
.getOrElse(EmptyStorageFormat)
854853
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
@@ -905,6 +904,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
905904

906905
/**
907906
* Create a [[CatalogStorageFormat]] for creating tables.
907+
*
908+
* Format: STORED AS ...
908909
*/
909910
override def visitCreateFileFormat(
910911
ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
@@ -932,9 +933,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
932933
ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
933934
EmptyStorageFormat.copy(
934935
inputFormat = Option(string(ctx.inFmt)),
935-
outputFormat = Option(string(ctx.outFmt)),
936-
serde = Option(ctx.serdeCls).map(string)
937-
)
936+
outputFormat = Option(string(ctx.outFmt)))
938937
}
939938

940939
/**
@@ -1018,6 +1017,49 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10181017
EmptyStorageFormat.copy(serdeProperties = entries.toMap)
10191018
}
10201019

1020+
/**
1021+
* Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT
1022+
* and STORED AS.
1023+
*
1024+
* The following are allowed. Anything else is not:
1025+
* ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE]
1026+
* ROW FORMAT DELIMITED ... STORED AS TEXTFILE
1027+
* ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ...
1028+
*/
1029+
private def validateRowFormatFileFormat(
1030+
rowFormatCtx: RowFormatContext,
1031+
createFileFormatCtx: CreateFileFormatContext,
1032+
parentCtx: ParserRuleContext): Unit = {
1033+
if (rowFormatCtx == null || createFileFormatCtx == null) {
1034+
return
1035+
}
1036+
(rowFormatCtx, createFileFormatCtx.fileFormat) match {
1037+
case (_, ffTable: TableFileFormatContext) => // OK
1038+
case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) =>
1039+
ffGeneric.identifier.getText.toLowerCase match {
1040+
case ("sequencefile" | "textfile" | "rcfile") => // OK
1041+
case fmt =>
1042+
throw operationNotAllowed(
1043+
s"ROW FORMAT SERDE is incompatible with format '$fmt', which also specifies a serde",
1044+
parentCtx)
1045+
}
1046+
case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) =>
1047+
ffGeneric.identifier.getText.toLowerCase match {
1048+
case "textfile" => // OK
1049+
case fmt => throw operationNotAllowed(
1050+
s"ROW FORMAT DELIMITED is only compatible with 'textfile', not '$fmt'", parentCtx)
1051+
}
1052+
case _ =>
1053+
// should never happen
1054+
def str(ctx: ParserRuleContext): String = {
1055+
(0 until ctx.getChildCount).map { i => ctx.getChild(i).getText }.mkString(" ")
1056+
}
1057+
throw operationNotAllowed(
1058+
s"Unexpected combination of ${str(rowFormatCtx)} and ${str(createFileFormatCtx)}",
1059+
parentCtx)
1060+
}
1061+
}
1062+
10211063
/**
10221064
* Create or replace a view. This creates a [[CreateViewCommand]] command.
10231065
*

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20+
import scala.reflect.{classTag, ClassTag}
21+
2022
import org.apache.spark.sql.catalyst.TableIdentifier
2123
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource}
2224
import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
@@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
2527
import org.apache.spark.sql.catalyst.plans.logical.Project
2628
import org.apache.spark.sql.execution.SparkSqlParser
2729
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing}
28-
import org.apache.spark.sql.internal.SQLConf
30+
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
2931
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
3032

33+
3134
// TODO: merge this with DDLSuite (SPARK-14441)
3235
class DDLCommandSuite extends PlanTest {
3336
private val parser = new SparkSqlParser(new SQLConf)
@@ -40,6 +43,15 @@ class DDLCommandSuite extends PlanTest {
4043
containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) }
4144
}
4245

46+
private def parseAs[T: ClassTag](query: String): T = {
47+
parser.parsePlan(query) match {
48+
case t: T => t
49+
case other =>
50+
fail(s"Expected to parse ${classTag[T].runtimeClass} from query," +
51+
s"got ${other.getClass.getName}: $query")
52+
}
53+
}
54+
4355
test("create database") {
4456
val sql =
4557
"""
@@ -225,19 +237,69 @@ class DDLCommandSuite extends PlanTest {
225237
comparePlans(parsed4, expected4)
226238
}
227239

240+
test("create table - row format and table file format") {
241+
val createTableStart = "CREATE TABLE my_tab ROW FORMAT"
242+
val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'"
243+
val query1 = s"$createTableStart SERDE 'anything' $fileFormat"
244+
val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat"
245+
246+
// No conflicting serdes here, OK
247+
val parsed1 = parseAs[CreateTableCommand](query1)
248+
assert(parsed1.table.storage.serde == Some("anything"))
249+
assert(parsed1.table.storage.inputFormat == Some("inputfmt"))
250+
assert(parsed1.table.storage.outputFormat == Some("outputfmt"))
251+
val parsed2 = parseAs[CreateTableCommand](query2)
252+
assert(parsed2.table.storage.serde.isEmpty)
253+
assert(parsed2.table.storage.inputFormat == Some("inputfmt"))
254+
assert(parsed2.table.storage.outputFormat == Some("outputfmt"))
255+
}
256+
257+
test("create table - row format serde and generic file format") {
258+
val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
259+
val supportedSources = Set("sequencefile", "rcfile", "textfile")
260+
261+
allSources.foreach { s =>
262+
val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
263+
if (supportedSources.contains(s)) {
264+
val ct = parseAs[CreateTableCommand](query)
265+
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
266+
assert(hiveSerde.isDefined)
267+
assert(ct.table.storage.serde == Some("anything"))
268+
assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
269+
assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
270+
} else {
271+
assertUnsupported(query, Seq("row format serde", "incompatible", s))
272+
}
273+
}
274+
}
275+
276+
test("create table - row format delimited and generic file format") {
277+
val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
278+
val supportedSources = Set("textfile")
279+
280+
allSources.foreach { s =>
281+
val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
282+
if (supportedSources.contains(s)) {
283+
val ct = parseAs[CreateTableCommand](query)
284+
val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
285+
assert(hiveSerde.isDefined)
286+
assert(ct.table.storage.serde == hiveSerde.get.serde)
287+
assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
288+
assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
289+
} else {
290+
assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s))
291+
}
292+
}
293+
}
294+
228295
test("create external table - location must be specified") {
229296
assertUnsupported(
230297
sql = "CREATE EXTERNAL TABLE my_tab",
231298
containsThesePhrases = Seq("create external table", "location"))
232299
val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
233-
parser.parsePlan(query) match {
234-
case ct: CreateTableCommand =>
235-
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
236-
assert(ct.table.storage.locationUri == Some("/something/anything"))
237-
case other =>
238-
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
239-
s"got ${other.getClass.getName}: $query")
240-
}
300+
val ct = parseAs[CreateTableCommand](query)
301+
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
302+
assert(ct.table.storage.locationUri == Some("/something/anything"))
241303
}
242304

243305
test("create table - property values must be set") {
@@ -252,14 +314,9 @@ class DDLCommandSuite extends PlanTest {
252314

253315
test("create table - location implies external") {
254316
val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
255-
parser.parsePlan(query) match {
256-
case ct: CreateTableCommand =>
257-
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
258-
assert(ct.table.storage.locationUri == Some("/something/anything"))
259-
case other =>
260-
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
261-
s"got ${other.getClass.getName}: $query")
262-
}
317+
val ct = parseAs[CreateTableCommand](query)
318+
assert(ct.table.tableType == CatalogTableType.EXTERNAL)
319+
assert(ct.table.storage.locationUri == Some("/something/anything"))
263320
}
264321

265322
test("create table using - with partitioned by") {
@@ -551,8 +608,7 @@ class DDLCommandSuite extends PlanTest {
551608

552609
test("alter table: set file format (not allowed)") {
553610
assertUnsupported(
554-
"ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
555-
"OUTPUTFORMAT 'test' SERDE 'test'")
611+
"ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' OUTPUTFORMAT 'test'")
556612
assertUnsupported(
557613
"ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
558614
"SET FILEFORMAT PARQUET")

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class HiveDDLCommandSuite extends PlanTest {
6161
|country STRING COMMENT 'country of origination')
6262
|COMMENT 'This is the staging page view table'
6363
|PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
64-
|ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE
64+
|STORED AS RCFILE
6565
|LOCATION '/user/external/page_view'
6666
|TBLPROPERTIES ('p1'='v1', 'p2'='v2')
6767
|AS SELECT * FROM src""".stripMargin
@@ -88,8 +88,6 @@ class HiveDDLCommandSuite extends PlanTest {
8888
assert(desc.partitionColumns ==
8989
CatalogColumn("dt", "string", comment = Some("date type")) ::
9090
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
91-
assert(desc.storage.serdeProperties ==
92-
Map((serdeConstants.SERIALIZATION_FORMAT, "\u002C"), (serdeConstants.FIELD_DELIM, "\u002C")))
9391
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
9492
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
9593
assert(desc.storage.serde ==

0 commit comments

Comments
 (0)