diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 91beb5e639af..6045e1b31532 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -115,6 +115,8 @@ statement (identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze | ALTER TABLE tableIdentifier ADD COLUMNS '(' columns=colTypeList ')' #addTableColumns + | ALTER TABLE tableIdentifier + REPLACE COLUMNS '(' columns=colTypeList ')' #replaceTableColumns | ALTER (TABLE | VIEW) from=tableIdentifier RENAME TO to=tableIdentifier #renameTable | ALTER (TABLE | VIEW) tableIdentifier @@ -232,7 +234,6 @@ unsupportedHiveNativeCommands | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT - | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS | kw1=START kw2=TRANSACTION | kw1=COMMIT | kw1=ROLLBACK diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 1a145c24d78c..0a0d4ff994a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -113,9 +113,9 @@ trait ExternalCatalog { def alterTable(tableDefinition: CatalogTable): Unit /** - * Alter the data schema of a table identified by the provided database and table name. The new - * data schema should not have conflict column names with the existing partition columns, and - * should still contain all the existing data columns. + * Alter the data schema of a table identified by the provided database and table name. + * The new data schema should not have conflicting column names with existing partition columns. + * The existing data schema will be overwritten with the new data schema provided. * * @param db Database that table to alter schema for exists in * @param table Name of table to alter schema for diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index c05f777770f3..a2b918358e11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -359,9 +359,9 @@ class SessionCatalog( } /** - * Alter the data schema of a table identified by the provided table identifier. The new data - * schema should not have conflict column names with the existing partition columns, and should - * still contain all the existing data columns. + * Alter the data schema of a table identified by the provided table identifier. + * The new data schema should not have conflicting column names with existing partition columns. + * The existing data schema will be overwritten with the new data schema provided. * * @param identifier TableIdentifier * @param newDataSchema Updated data schema to be used for the table @@ -375,19 +375,6 @@ class SessionCatalog( requireDbExists(db) requireTableExists(tableIdentifier) - val catalogTable = externalCatalog.getTable(db, table) - val oldDataSchema = catalogTable.dataSchema - // not supporting dropping columns yet - val nonExistentColumnNames = - oldDataSchema.map(_.name).filterNot(columnNameResolved(newDataSchema, _)) - if (nonExistentColumnNames.nonEmpty) { - throw new AnalysisException( - s""" - |Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are - |not present in the new schema. We don't support dropping columns yet. - """.stripMargin) - } - externalCatalog.alterTableDataSchema(db, table, newDataSchema) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 052014ab8674..17fa5b3420c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -88,4 +88,33 @@ private[spark] object SchemaUtils { s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}") } } + + /** + * Checks if the two provided schemas have columns with matching names. If yes, also the + * data type has to match otherwise an exception is raised + * + * @param schemaOne first schema to compare + * @param schemaTwo second schema to compare + * @param resolver resolver used to determine if two identifiers are equal + */ + def checkDataTypeMatchesForSameColumnName( + schemaOne: StructType, schemaTwo: StructType, resolver: Resolver): Unit = { + checkDataTypeMatchesForSameColumnName(schemaOne, schemaTwo, isCaseSensitiveAnalysis(resolver)) + } + + def checkDataTypeMatchesForSameColumnName( + schemaOne: StructType, schemaTwo: StructType, caseSensitiveAnalysis: Boolean): Unit = { + for (s1 <- schemaOne; s2 <- schemaTwo) { + // scalastyle:off caselocale + val schemaOneColumName = if (caseSensitiveAnalysis) s1.name else s1.name.toLowerCase + val schemaTwoColumName = if (caseSensitiveAnalysis) s2.name else s2.name.toLowerCase + // scalastyle:on caselocale + + if (schemaOneColumName == schemaTwoColumName & s1.dataType != s2.dataType) { + throw new AnalysisException( + s"Column `$schemaOneColumName` type doesn't match between schemas ($s1 <> $s2)") + } + } + } + } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 92f87ea796e8..4b8f44577b86 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -479,11 +479,8 @@ abstract class SessionCatalogSuite extends AnalysisTest { withBasicCatalog { sessionCatalog => sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") - val e = intercept[AnalysisException] { - sessionCatalog.alterTableDataSchema( - TableIdentifier("t1", Some("default")), StructType(oldTab.dataSchema.drop(1))) - }.getMessage - assert(e.contains("We don't support dropping columns yet.")) + sessionCatalog.alterTableDataSchema( + TableIdentifier("t1", Some("default")), StructType(oldTab.dataSchema.drop(1))) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala index 2f576a4031e9..f6417152f5df 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala @@ -82,4 +82,48 @@ class SchemaUtilsSuite extends SparkFunSuite { checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = false) } + + test(s"Test checkDataTypeMatchesForSameColumnName") { + def compareSchemas(schema1_str: String, schema2_str: String, + caseSensitive: Boolean, shouldRaiseException: Boolean): Unit = { + val schema1 = StructType.fromDDL(schema1_str) + val schema2 = StructType.fromDDL(schema2_str) + + if (shouldRaiseException) { + val msg = intercept[AnalysisException] { + SchemaUtils.checkDataTypeMatchesForSameColumnName(schema1, schema2, caseSensitive) + }.getMessage + assert(msg.contains("type doesn't match between schemas")) + } + else SchemaUtils.checkDataTypeMatchesForSameColumnName(schema1, schema2, caseSensitive) + + } + // pass when datatype is the same + compareSchemas("a int, b string", "a int, B string", + caseSensitive = false, shouldRaiseException = false) + compareSchemas("a int, b string, B int", "a int, b string, B int", + caseSensitive = true, shouldRaiseException = false) + + // fail when there's at least one mismatch + compareSchemas("a int, b string", "a string, b string", + caseSensitive = false, shouldRaiseException = true) + compareSchemas("a int, b string", "a int, b string, B int", + caseSensitive = false, shouldRaiseException = true) + + // work as expected when schemas structures differ + compareSchemas("a int, b string", "c string, D int, A int", + caseSensitive = true, shouldRaiseException = false) + compareSchemas("a int, b string", "b string", + caseSensitive = false, shouldRaiseException = false) + compareSchemas("a int, b string", "B string", + caseSensitive = false, shouldRaiseException = false) + compareSchemas("a int, b string", "a string", + caseSensitive = true, shouldRaiseException = true) + compareSchemas("a int, b string", "A string", + caseSensitive = false, shouldRaiseException = true) + compareSchemas("a int", "a int, A string", + caseSensitive = true, shouldRaiseException = false) + compareSchemas("b string", "b string, B int", + caseSensitive = false, shouldRaiseException = true) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 1f1b41b7c440..9023e04a452f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -677,6 +677,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { ) } + /** + * Create a [[AlterTableReplaceColumnsCommand]] command. + * + * For example: + * {{{ + * ALTER TABLE table1 + * REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...); + * }}} + */ + override def visitReplaceTableColumns(ctx: ReplaceTableColumnsContext): + LogicalPlan = withOrigin(ctx) { + AlterTableReplaceColumnsCommand( + visitTableIdentifier(ctx.tableIdentifier), + visitColTypeList(ctx.columns) + ) + } + /** * Create an [[AlterTableSetPropertiesCommand]] command. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 64f739fe3596..761b1d5d1a1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -37,16 +37,9 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier} import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} -import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat -import org.apache.spark.sql.execution.datasources.json.JsonFileFormat -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2 -import org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2 -import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils -import org.apache.spark.util.Utils /** * A command to create a table with the same definition of the given existing table. @@ -181,6 +174,86 @@ case class AlterTableRenameCommand( } +abstract class AlterTableAddReplaceColumnsCommandsBase extends RunnableCommand { + /** + * Ensure the columns to add/replace meet requirements: + * - columns to add should not have conflicting names with existing columns + * - columns to replace should have distinct names + * - if a column to replace exists already in the table, the data type has to match + * - column names have to match the given datasource format specifications + */ + protected def verifyColumnsToAddReplace( + table: TableIdentifier, + catalogTable: CatalogTable, + colsToVerify: Seq[StructField]): Unit = { + + SchemaUtils.checkColumnNameDuplication( + colsToVerify.map(_.name), + "in the table definition of " + table.identifier, + conf.caseSensitiveAnalysis) + + SchemaUtils.checkDataTypeMatchesForSameColumnName( + StructType(colsToVerify), + catalogTable.dataSchema, + conf.caseSensitiveAnalysis) + + DDLUtils.checkDataColNames(catalogTable, colsToVerify.map(_.name)) + } + + /** + * ALTER TABLE [ADD|REPLACE] COLUMNS command do not support temporary view/table, view + * or external provider. Datasource table currently supports: parquet, json, orc. + * ADD COLUMNS also supports csv. REPLACE COLUMNS also supports text. + */ + protected def verifyAlterTableAddReplaceColumn( + conf: SQLConf, + catalog: SessionCatalog, + table: TableIdentifier): CatalogTable = { + val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) + + if (catalogTable.tableType == CatalogTableType.VIEW) { + throw new AnalysisException( + s""" + |ALTER [ADD|REPLACE] COLUMNS do not support views. + |You must drop and re-create the views for adding the new columns. Views: $table + """.stripMargin) + } + + if (DDLUtils.isDatasourceTable(catalogTable)) { + DataSource.lookupDataSource(catalogTable.provider.get, conf). + getConstructor().newInstance() match { + // Hive type is already considered as hive serde table, so the logic will not + // come in here. + case s if isDatasourceFormatSupported(s.getClass.getCanonicalName) => + case s => + throw new AnalysisException( + s""" + |ALTER [ADD|REPLACE] COLUMNS do not support datasource table with type $s. + |You must drop and re-create the table for adding the new columns. Tables: $table + """.stripMargin) + } + } + catalogTable + } + + /** + * Checks if the datasource class canonical name provided ends with one of the + * formats supported specified in `supportedDatasourceFormats` as a sequence of strings + * representing the class names of the supported datasource formats + */ + val supportedDatasourceFormats: Seq[String] + + protected def isDatasourceFormatSupported( + datasourceClassCanonicalName: String): Boolean = { + supportedDatasourceFormats.foreach { ds => + if (datasourceClassCanonicalName.endsWith(ds)) { + return true + } + } + false + } +} + /** * A command that add columns to a table * The syntax of using this command in SQL is: @@ -188,13 +261,19 @@ case class AlterTableRenameCommand( * ALTER TABLE table_identifier * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...); * }}} -*/ + */ case class AlterTableAddColumnsCommand( table: TableIdentifier, - colsToAdd: Seq[StructField]) extends RunnableCommand { + colsToAdd: Seq[StructField]) extends AlterTableAddReplaceColumnsCommandsBase { + + // Text format doesn't need ADD COLUMNS as text datasource can have only one column + override val supportedDatasourceFormats = Seq("ParquetFileFormat", "OrcFileFormat", + "OrcDataSourceV2", "JsonFileFormat", "JsonDataSourceV2", "CSVFileFormat", "CSVDataSourceV2") + override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val catalogTable = verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table) + val catalogTable = verifyAlterTableAddReplaceColumn( + sparkSession.sessionState.conf, catalog, table) try { sparkSession.catalog.uncacheTable(table.quotedString) @@ -204,54 +283,47 @@ case class AlterTableAddColumnsCommand( } catalog.refreshTable(table) - SchemaUtils.checkColumnNameDuplication( - (colsToAdd ++ catalogTable.schema).map(_.name), - "in the table definition of " + table.identifier, - conf.caseSensitiveAnalysis) - DDLUtils.checkDataColNames(catalogTable, colsToAdd.map(_.name)) + verifyColumnsToAddReplace(table, catalogTable, colsToAdd ++ catalogTable.schema) catalog.alterTableDataSchema(table, StructType(catalogTable.dataSchema ++ colsToAdd)) Seq.empty[Row] } +} - /** - * ALTER TABLE ADD COLUMNS command does not support temporary view/table, - * view, or datasource table with text, orc formats or external provider. - * For datasource table, it currently only supports parquet, json, csv, orc. - */ - private def verifyAlterTableAddColumn( - conf: SQLConf, - catalog: SessionCatalog, - table: TableIdentifier): CatalogTable = { - val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) +/** + * A command that replace columns in a table + * The syntax of using this command in SQL is: + * {{{ + * ALTER TABLE table_identifier + * REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...); + * }}} + */ +case class AlterTableReplaceColumnsCommand( + table: TableIdentifier, + colsToReplace: Seq[StructField]) extends AlterTableAddReplaceColumnsCommandsBase { - if (catalogTable.tableType == CatalogTableType.VIEW) { - throw new AnalysisException( - s""" - |ALTER ADD COLUMNS does not support views. - |You must drop and re-create the views for adding the new columns. Views: $table - """.stripMargin) - } + // Csv format not supported by REPLACE COLUMNS as csv datasource is read positionally + // and a new replacement column would reference an old replaced column's data + override val supportedDatasourceFormats = Seq("ParquetFileFormat", "OrcFileFormat", + "OrcDataSourceV2", "JsonFileFormat", "JsonDataSourceV2", "TextFileFormat", "TextDataSourceV2") - if (DDLUtils.isDatasourceTable(catalogTable)) { - DataSource.lookupDataSource(catalogTable.provider.get, conf). - getConstructor().newInstance() match { - // For datasource table, this command can only support the following File format. - // TextFileFormat only default to one column "value" - // Hive type is already considered as hive serde table, so the logic will not - // come in here. - case _: CSVFileFormat | _: JsonFileFormat | _: ParquetFileFormat => - case _: JsonDataSourceV2 | _: CSVDataSourceV2 | _: OrcDataSourceV2 => - case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") => - case s => - throw new AnalysisException( - s""" - |ALTER ADD COLUMNS does not support datasource table with type $s. - |You must drop and re-create the table for adding the new columns. Tables: $table - """.stripMargin) - } + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val catalogTable = verifyAlterTableAddReplaceColumn( + sparkSession.sessionState.conf, catalog, table) + + try { + sparkSession.catalog.uncacheTable(table.quotedString) + } catch { + case NonFatal(e) => + log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e) } - catalogTable + catalog.refreshTable(table) + + verifyColumnsToAddReplace(table, catalogTable, colsToReplace) + + catalog.alterTableDataSchema(table, StructType(colsToReplace)) + Seq.empty[Row] } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index e46ccac9f5b0..a3ab2946bea6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -840,14 +840,6 @@ class DDLParserSuite extends AnalysisTest with SharedSQLContext { assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES") } - test("alter table: replace columns (not allowed)") { - assertUnsupported( - """ - |ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT - |COMMENT 'test_comment', new_col2 LONG COMMENT 'test_comment2') RESTRICT - """.stripMargin) - } - test("show databases") { val sql1 = "SHOW DATABASES" val sql2 = "SHOW DATABASES LIKE 'defau*'" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b777db750a1b..c436a141b08d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2566,53 +2566,195 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - val supportedNativeFileFormatsForAlterTableAddColumns = Seq("csv", "json", "parquet", - "org.apache.spark.sql.execution.datasources.csv.CSVFileFormat", + val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "orc", "json", "csv", + "org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat", + "org.apache.spark.sql.execution.datasources.orc.OrcFileFormat", "org.apache.spark.sql.execution.datasources.json.JsonFileFormat", - "org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat") - + "org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider => - test(s"alter datasource table add columns - $provider") { + test(s"alter table add columns ddl - $provider") { testAddColumn(provider) } - } - - supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider => - test(s"alter datasource table add columns - partitioned - $provider") { + test(s"alter table add columns ddl - partitioned - $provider") { testAddColumnPartitioned(provider) } } - test("alter datasource table add columns - text format not supported") { + // alter table add columns for text datasource is not needed + // text datasource can have only one column + test("alter table add columns ddl - text format not supported") { withTable("t1") { sql("CREATE TABLE t1 (c1 string) USING text") val e = intercept[AnalysisException] { sql("ALTER TABLE t1 ADD COLUMNS (c2 int)") }.getMessage - assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type")) + assert(e.contains("ALTER [ADD|REPLACE] COLUMNS do not support datasource table with type")) + } + } + + protected def testReplaceColumn(provider: String): Unit = { + withTable("t1") { + sql(s"CREATE TABLE t1 (c1 string) USING $provider") + sql("INSERT INTO t1 VALUES ('1')") + sql("ALTER TABLE t1 REPLACE COLUMNS (c2 int, c3 string)") + // c1 is dropped, c2 c3 are added + var e = intercept[AnalysisException] { + sql("SELECT c1 from t1") + }.getMessage + assert(e.contains("cannot resolve '`c1`' given input columns")) + sql("INSERT INTO t1 VALUES (2, '3')") + checkAnswer( + sql("SELECT * FROM t1 where c3 = '3'"), + Seq(Row(2, "3")) + ) + // fails to replace c3 as type doesn't match + e = intercept[AnalysisException]{ + sql("ALTER TABLE t1 REPLACE COLUMNS (c2 int, c3 int, c4 string)") + }.getMessage + assert(e.contains("StructField(c3,IntegerType,true) <> StructField(c3,StringType,true)")) + // c1 is added back, c2 is replaced with the same definition + sql("ALTER TABLE t1 REPLACE COLUMNS (c1 string, c2 int)") + checkAnswer( + sql("SELECT c1, c2 FROM t1"), + Seq(Row("1", null), Row(null, 2)) + ) } } - test("alter table add columns -- not support temp view") { - withTempView("tmp_v") { - sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2") + // using a separate function as text datasource allows only one column of type string + protected def testReplaceColumnTextProvider(): Unit = { + withTable("t1") { + sql("CREATE TABLE t1 (c1 string) USING text") + sql("INSERT INTO t1 VALUES ('1')") + sql("ALTER TABLE t1 REPLACE COLUMNS (c2 string)") val e = intercept[AnalysisException] { - sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)") - } - assert(e.message.contains("ALTER ADD COLUMNS does not support views")) + sql("SELECT c1 from t1") + }.getMessage + assert(e.contains("cannot resolve '`c1`' given input columns")) + sql("INSERT INTO t1 VALUES ('2')") + checkAnswer( + sql("SELECT c2 FROM t1"), + Seq(Row("1"), Row("2")) + ) } } - test("alter table add columns -- not support view") { - withView("v1") { - sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2") + protected def testReplaceColumnPartitioned(provider: String): Unit = { + withTable("t1") { + sql(s"CREATE TABLE t1 (c1 int, c2 int, p int) USING $provider PARTITIONED BY (p)") + sql("INSERT INTO t1 PARTITION(p = 1) VALUES (1, 2)") + sql("ALTER TABLE t1 REPLACE COLUMNS (c4 int, c3 string, c2 int)") + // c1 is dropped, c2 c3 c4 are added + var e = intercept[AnalysisException] { + sql("SELECT c1 from t1") + }.getMessage + assert(e.contains("cannot resolve '`c1`' given input columns")) + checkAnswer( + sql("SELECT c2, c3, c4 FROM t1 WHERE p = 1"), + Seq(Row(2, null, null)) + ) + sql("INSERT INTO t1 PARTITION(p = 2) VALUES (4, '3', 2)") + checkAnswer( + sql("SELECT * FROM t1 WHERE p = 2"), + Seq(Row(4, "3", 2, 2)) + ) + checkAnswer( + sql("SELECT * FROM t1 WHERE c3 = '3'"), + Seq(Row(4, "3", 2, 2)) + ) + // fails to replace c2 as type doesn't match + e = intercept[AnalysisException] { + sql("ALTER TABLE t1 REPLACE COLUMNS (c3 string, c2 string)") + }.getMessage + assert(e.contains("StructField(c2,StringType,true) <> StructField(c2,IntegerType,true)" + )) + // c1 is added back, c2 is replaced with itself + sql("ALTER TABLE t1 REPLACE COLUMNS (c1 int, c2 int)") + checkAnswer( + sql("SELECT c1, c2 FROM t1"), + Seq(Row(1, 2), Row(null, 2)) + ) + } + } + + // using a separate function as text datasource allows only one column of type string + protected def testReplaceColumnPartitionedTextProvider(): Unit = { + withTable("t1") { + sql("CREATE TABLE t1 (c1 string, p int) USING text PARTITIONED BY (p)") + sql("INSERT INTO t1 PARTITION(p = 1) VALUES ('1')") + sql("ALTER TABLE t1 REPLACE COLUMNS (c2 string)") val e = intercept[AnalysisException] { - sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)") + sql("SELECT c1 from t1") + }.getMessage + assert(e.contains("cannot resolve '`c1`' given input columns")) + checkAnswer( + sql("SELECT c2 FROM t1 WHERE p = 1"), + Seq(Row("1")) + ) + sql("INSERT INTO t1 PARTITION(p = 2) VALUES ('2')") + checkAnswer( + sql("SELECT * FROM t1 WHERE p = 2"), + Seq(Row("2", 2)) + ) + checkAnswer( + sql("SELECT * FROM t1"), + Seq(Row("1", 1), Row("2", 2)) + ) + } + } + + val supportedNativeFileFormatsForAlterTableReplaceColumns = Seq("parquet", "orc", "json", "text", + "org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat", + "org.apache.spark.sql.execution.datasources.orc.OrcFileFormat", + "org.apache.spark.sql.execution.datasources.json.JsonFileFormat", + "org.apache.spark.sql.execution.datasources.text.TextFileFormat") + supportedNativeFileFormatsForAlterTableReplaceColumns.foreach { provider => + test(s"alter table replace columns ddl - $provider") { + if (provider == "text" | provider.endsWith("TextFileFormat")) testReplaceColumnTextProvider() + else testReplaceColumn(provider) + } + test(s"alter table replace columns ddl - partitioned - $provider") { + if (provider == "text" | provider.endsWith("TextFileFormat")) { + testReplaceColumnPartitionedTextProvider() } - assert(e.message.contains("ALTER ADD COLUMNS does not support views")) + else testReplaceColumnPartitioned(provider) } } + // alter table replace columns for csv datasource not supported + // csv datasource is positional: a new column would reference a replaced column's data + test("alter table replace columns ddl - csv format not supported") { + withTable("t1") { + sql("CREATE TABLE t1 (c1 string) USING csv") + val e = intercept[AnalysisException] { + sql("ALTER TABLE t1 REPLACE COLUMNS (c2 int)") + }.getMessage + assert(e.contains("ALTER [ADD|REPLACE] COLUMNS do not support datasource table with type")) + } + } + + Seq("ADD", "REPLACE").foreach { addReplaceCmd => + test(s"alter table $addReplaceCmd columns - do not support temp view") { + withTempView("tmp_v") { + sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2") + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE tmp_v $addReplaceCmd COLUMNS (c3 INT)") + } + assert(e.message.contains("ALTER [ADD|REPLACE] COLUMNS do not support views")) + } + } + + test(s"alter table $addReplaceCmd columns - do not support view") { + withView("v1") { + sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2") + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE v1 $addReplaceCmd COLUMNS (c3 INT)") + } + assert(e.message.contains("ALTER [ADD|REPLACE] COLUMNS do not support views")) + } + } + } + test("alter table add columns with existing column name") { withTable("t1") { sql("CREATE TABLE t1 (c1 int) USING PARQUET") @@ -2623,6 +2765,18 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } + test("alter table replace columns with duplicated column names") { + withTable("t1") { + sql("CREATE TABLE t1 (c1 int) USING PARQUET") + sql("ALTER TABLE t1 REPLACE COLUMNS (c1 int)") + assert(spark.table("t1").schema == new StructType().add("c1", IntegerType)) + val e = intercept[AnalysisException] { + sql("ALTER TABLE t1 REPLACE COLUMNS (c2 string, c2 string)") + }.getMessage + assert(e.contains("Found duplicate column(s)")) + } + } + test("create temporary function with if not exists") { withUserDefinedFunction("func1" -> true) { val sql1 = @@ -2693,6 +2847,36 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } + test(s"alter table replace columns with existing column name - caseSensitive $caseSensitive") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") { + withTable("t1") { + sql("CREATE TABLE t1 (c1 int) USING PARQUET") + if (!caseSensitive) { + // C1 replaces c1 + sql("ALTER TABLE t1 REPLACE COLUMNS (C1 integer)") + assert(spark.table("t1").schema == new StructType().add("C1", IntegerType)) + // C1 doesn't replace c1 as type doesn't match + val e = intercept[AnalysisException] { + sql("ALTER TABLE t1 REPLACE COLUMNS (C1 string)") + }.getMessage + assert(e.contains( + "StructField(C1,StringType,true) <> StructField(C1,IntegerType,true)")) + } else { + // c1 replaces c1, C1 is added + sql("ALTER TABLE t1 REPLACE COLUMNS (c1 int, C1 string)") + assert(spark.table("t1").schema == + new StructType().add("c1", IntegerType).add("C1", StringType)) + // c1 replaces c1, C1 doesn't replace C1 as type doesn't match + val e = intercept[AnalysisException] { + sql("ALTER TABLE t1 REPLACE COLUMNS (c1 int, C1 int)") + }.getMessage + assert(e.contains( + "StructField(C1,IntegerType,true) <> StructField(C1,StringType,true)")) + } + } + } + } + test(s"basic DDL using locale tr - caseSensitive $caseSensitive") { withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") { withLocale("tr") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 11a219231875..eb013b9a11c1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -642,9 +642,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } /** - * Alter the data schema of a table identified by the provided database and table name. The new - * data schema should not have conflict column names with the existing partition columns, and - * should still contain all the existing data columns. + * Alter the data schema of a table identified by the provided database and table name. + * The new data schema should not have conflicting column names with existing partition columns. + * The existing data schema will be overwritten with the new data schema provided. */ override def alterTableDataSchema( db: String,