diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 08d5ff53bf2e2..4db9550e09b69 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -167,6 +167,9 @@ statement | ALTER TABLE table=multipartIdentifier partitionSpec? CHANGE COLUMN? colName=multipartIdentifier colType colPosition? #hiveChangeColumn + | ALTER TABLE table=multipartIdentifier partitionSpec? + REPLACE COLUMNS + '(' columns=qualifiedColTypeWithPositionList ')' #hiveReplaceColumns | ALTER TABLE multipartIdentifier (partitionSpec)? SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)? #setTableSerDe | ALTER TABLE multipartIdentifier (partitionSpec)? diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 4ec737fd9b70d..d4f9222783020 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -470,9 +470,15 @@ trait CheckAnalysis extends PredicateHelper { } } + val colsToDelete = mutable.Set.empty[Seq[String]] + alter.changes.foreach { case add: AddColumn => - checkColumnNotExists("add", add.fieldNames(), table.schema) + // If a column to add is a part of columns to delete, we don't need to check + // if column already exists - applies to REPLACE COLUMNS scenario. + if (!colsToDelete.contains(add.fieldNames())) { + checkColumnNotExists("add", add.fieldNames(), table.schema) + } val parent = findParentStruct("add", add.fieldNames()) positionArgumentExists(add.position(), parent) TypeUtils.failWithIntervalType(add.dataType()) @@ -523,6 +529,10 @@ trait CheckAnalysis extends PredicateHelper { findField("update", update.fieldNames) case delete: DeleteColumn => findField("delete", delete.fieldNames) + // REPLACE COLUMNS has deletes followed by adds. Remember the deleted columns + // so that add operations do not fail when the columns to add exist and they + // are to be deleted. + colsToDelete += delete.fieldNames case _ => // no validation needed for set and remove property } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 88a3c0a73a10b..96558410d4004 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -44,6 +44,27 @@ class ResolveCatalogs(val catalogManager: CatalogManager) } createAlterTable(nameParts, catalog, tbl, changes) + case AlterTableReplaceColumnsStatement( + nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => + val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match { + case Some(table) => + // REPLACE COLUMNS deletes all the existing columns and adds new columns specified. + val deleteChanges = table.schema.fieldNames.map { name => + TableChange.deleteColumn(Array(name)) + } + val addChanges = cols.map { col => + TableChange.addColumn( + col.name.toArray, + col.dataType, + col.nullable, + col.comment.orNull, + col.position.orNull) + } + deleteChanges ++ addChanges + case None => Seq() + } + createAlterTable(nameParts, catalog, tbl, changes) + case a @ AlterTableAlterColumnStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => val colName = a.column.toArray diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index e9ad84472904d..0bfa0c78a8794 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3024,6 +3024,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging position = Option(ctx.colPosition).map(typedVisit[ColumnPosition])) } + override def visitHiveReplaceColumns( + ctx: HiveReplaceColumnsContext): LogicalPlan = withOrigin(ctx) { + if (ctx.partitionSpec != null) { + operationNotAllowed("ALTER TABLE table PARTITION partition_spec REPLACE COLUMNS", ctx) + } + AlterTableReplaceColumnsStatement( + visitMultipartIdentifier(ctx.multipartIdentifier), + ctx.columns.qualifiedColTypeWithPosition.asScala.map { colType => + if (colType.NULL != null) { + throw new AnalysisException( + "NOT NULL is not supported in Hive-style REPLACE COLUMNS") + } + if (colType.colPosition != null) { + throw new AnalysisException( + "Column position is not supported in Hive-style REPLACE COLUMNS") + } + typedVisit[QualifiedColType](colType) + } + ) + } + /** * Parse a [[AlterTableDropColumnsStatement]] command. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 1e6b67bf78b70..6731214d3842d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -156,6 +156,10 @@ case class AlterTableAddColumnsStatement( tableName: Seq[String], columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement +case class AlterTableReplaceColumnsStatement( + tableName: Seq[String], + columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement + /** * ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 56d52571d1cc3..f09e1410102e2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -698,7 +698,7 @@ class DDLParserSuite extends AnalysisTest { } } - test("alter table: hive style") { + test("alter table: hive style change column") { val sql1 = "ALTER TABLE table_name CHANGE COLUMN a.b.c c INT" val sql2 = "ALTER TABLE table_name CHANGE COLUMN a.b.c c INT COMMENT 'new_comment'" val sql3 = "ALTER TABLE table_name CHANGE COLUMN a.b.c c INT AFTER other_col" @@ -741,6 +741,52 @@ class DDLParserSuite extends AnalysisTest { intercept("ALTER TABLE table_name PARTITION (a='1') CHANGE COLUMN a.b.c c INT") } + test("alter table: hive style replace columns") { + val sql1 = "ALTER TABLE table_name REPLACE COLUMNS (x string)" + val sql2 = "ALTER TABLE table_name REPLACE COLUMNS (x string COMMENT 'x1')" + val sql3 = "ALTER TABLE table_name REPLACE COLUMNS (x string COMMENT 'x1', y int)" + val sql4 = "ALTER TABLE table_name REPLACE COLUMNS (x string COMMENT 'x1', y int COMMENT 'y1')" + + comparePlans( + parsePlan(sql1), + AlterTableReplaceColumnsStatement( + Seq("table_name"), + Seq(QualifiedColType(Seq("x"), StringType, true, None, None)))) + + comparePlans( + parsePlan(sql2), + AlterTableReplaceColumnsStatement( + Seq("table_name"), + Seq(QualifiedColType(Seq("x"), StringType, true, Some("x1"), None)))) + + comparePlans( + parsePlan(sql3), + AlterTableReplaceColumnsStatement( + Seq("table_name"), + Seq( + QualifiedColType(Seq("x"), StringType, true, Some("x1"), None), + QualifiedColType(Seq("y"), IntegerType, true, None, None) + ))) + + comparePlans( + parsePlan(sql4), + AlterTableReplaceColumnsStatement( + Seq("table_name"), + Seq( + QualifiedColType(Seq("x"), StringType, true, Some("x1"), None), + QualifiedColType(Seq("y"), IntegerType, true, Some("y1"), None) + ))) + + intercept("ALTER TABLE table_name PARTITION (a='1') REPLACE COLUMNS (x string)", + "Operation not allowed: ALTER TABLE table PARTITION partition_spec REPLACE COLUMNS") + + intercept("ALTER TABLE table_name REPLACE COLUMNS (x string NOT NULL)", + "NOT NULL is not supported in Hive-style REPLACE COLUMNS") + + intercept("ALTER TABLE table_name REPLACE COLUMNS (x string FIRST)", + "Column position is not supported in Hive-style REPLACE COLUMNS") + } + test("alter table/view: rename table/view") { comparePlans( parsePlan("ALTER TABLE a.b.c RENAME TO x.y.z"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 486e7f1f84b46..a7c00f4d8ac4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -69,6 +69,29 @@ class ResolveSessionCatalog( createAlterTable(nameParts, catalog, tbl, changes) } + case AlterTableReplaceColumnsStatement( + nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => + val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match { + case Some(_: V1Table) => + throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.") + case Some(table) => + // REPLACE COLUMNS deletes all the existing columns and adds new columns specified. + val deleteChanges = table.schema.fieldNames.map { name => + TableChange.deleteColumn(Array(name)) + } + val addChanges = cols.map { col => + TableChange.addColumn( + col.name.toArray, + col.dataType, + col.nullable, + col.comment.orNull, + col.position.orNull) + } + deleteChanges ++ addChanges + case None => Seq() // Unresolved table will be handled in CheckAnalysis. + } + createAlterTable(nameParts, catalog, tbl, changes) + case a @ AlterTableAlterColumnStatement( nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => loadTable(catalog, tbl.asIdentifier).collect { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index 3cdac59c20fc9..7c9c28bff5e88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -1073,4 +1073,19 @@ trait AlterTableTests extends SharedSparkSession { assert(updated.properties === withDefaultOwnership(Map("provider" -> v2Format)).asJava) } } + + test("AlterTable: replace columns") { + val t = s"${catalogAndNamespace}table_name" + withTable(t) { + sql(s"CREATE TABLE $t (col1 int, col2 int COMMENT 'c2') USING $v2Format") + sql(s"ALTER TABLE $t REPLACE COLUMNS (col2 string, col3 int COMMENT 'c3')") + + val table = getTableMetadata(t) + + assert(table.name === fullTableName(t)) + assert(table.schema === StructType(Seq( + StructField("col2", StringType), + StructField("col3", IntegerType).withComment("c3")))) + } + } }