Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ statement
| ALTER TABLE table=multipartIdentifier partitionSpec?
CHANGE COLUMN?
colName=multipartIdentifier colType colPosition? #hiveChangeColumn
| ALTER TABLE table=multipartIdentifier partitionSpec?
REPLACE COLUMNS
'(' columns=qualifiedColTypeWithPositionList ')' #hiveReplaceColumns
| ALTER TABLE multipartIdentifier (partitionSpec)?
SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)? #setTableSerDe
| ALTER TABLE multipartIdentifier (partitionSpec)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,15 @@ trait CheckAnalysis extends PredicateHelper {
}
}

val colsToDelete = mutable.Set.empty[Seq[String]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes conflicts when I backport #27584

I think the change in this file should go into 3.0 as well. Logically columns deleted should be skipped when checking name duplication for AddColumn.

@imback82 can you open a PR to backport #27584 with changes in this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, working on it now!


alter.changes.foreach {
case add: AddColumn =>
checkColumnNotExists("add", add.fieldNames(), table.schema)
// If a column to add is a part of columns to delete, we don't need to check
// if column already exists - applies to REPLACE COLUMNS scenario.
if (!colsToDelete.contains(add.fieldNames())) {
checkColumnNotExists("add", add.fieldNames(), table.schema)
}
val parent = findParentStruct("add", add.fieldNames())
positionArgumentExists(add.position(), parent)
TypeUtils.failWithIntervalType(add.dataType())
Expand Down Expand Up @@ -523,6 +529,10 @@ trait CheckAnalysis extends PredicateHelper {
findField("update", update.fieldNames)
case delete: DeleteColumn =>
findField("delete", delete.fieldNames)
// REPLACE COLUMNS has deletes followed by adds. Remember the deleted columns
// so that add operations do not fail when the columns to add exist and they
// are to be deleted.
colsToDelete += delete.fieldNames
case _ =>
// no validation needed for set and remove property
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
}
createAlterTable(nameParts, catalog, tbl, changes)

case AlterTableReplaceColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(table) =>
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
val deleteChanges = table.schema.fieldNames.map { name =>
TableChange.deleteColumn(Array(name))
}
val addChanges = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
}
deleteChanges ++ addChanges
case None => Seq()
}
createAlterTable(nameParts, catalog, tbl, changes)

case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
val colName = a.column.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3024,6 +3024,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
position = Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
}

override def visitHiveReplaceColumns(
ctx: HiveReplaceColumnsContext): LogicalPlan = withOrigin(ctx) {
if (ctx.partitionSpec != null) {
operationNotAllowed("ALTER TABLE table PARTITION partition_spec REPLACE COLUMNS", ctx)
}
AlterTableReplaceColumnsStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.columns.qualifiedColTypeWithPosition.asScala.map { colType =>
if (colType.NULL != null) {
throw new AnalysisException(
"NOT NULL is not supported in Hive-style REPLACE COLUMNS")
}
if (colType.colPosition != null) {
throw new AnalysisException(
"Column position is not supported in Hive-style REPLACE COLUMNS")
}
typedVisit[QualifiedColType](colType)
}
)
}

/**
* Parse a [[AlterTableDropColumnsStatement]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ case class AlterTableAddColumnsStatement(
tableName: Seq[String],
columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement

case class AlterTableReplaceColumnsStatement(
tableName: Seq[String],
columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement

/**
* ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ class DDLParserSuite extends AnalysisTest {
}
}

test("alter table: hive style") {
test("alter table: hive style change column") {
val sql1 = "ALTER TABLE table_name CHANGE COLUMN a.b.c c INT"
val sql2 = "ALTER TABLE table_name CHANGE COLUMN a.b.c c INT COMMENT 'new_comment'"
val sql3 = "ALTER TABLE table_name CHANGE COLUMN a.b.c c INT AFTER other_col"
Expand Down Expand Up @@ -741,6 +741,52 @@ class DDLParserSuite extends AnalysisTest {
intercept("ALTER TABLE table_name PARTITION (a='1') CHANGE COLUMN a.b.c c INT")
}

test("alter table: hive style replace columns") {
val sql1 = "ALTER TABLE table_name REPLACE COLUMNS (x string)"
val sql2 = "ALTER TABLE table_name REPLACE COLUMNS (x string COMMENT 'x1')"
val sql3 = "ALTER TABLE table_name REPLACE COLUMNS (x string COMMENT 'x1', y int)"
val sql4 = "ALTER TABLE table_name REPLACE COLUMNS (x string COMMENT 'x1', y int COMMENT 'y1')"

comparePlans(
parsePlan(sql1),
AlterTableReplaceColumnsStatement(
Seq("table_name"),
Seq(QualifiedColType(Seq("x"), StringType, true, None, None))))

comparePlans(
parsePlan(sql2),
AlterTableReplaceColumnsStatement(
Seq("table_name"),
Seq(QualifiedColType(Seq("x"), StringType, true, Some("x1"), None))))

comparePlans(
parsePlan(sql3),
AlterTableReplaceColumnsStatement(
Seq("table_name"),
Seq(
QualifiedColType(Seq("x"), StringType, true, Some("x1"), None),
QualifiedColType(Seq("y"), IntegerType, true, None, None)
)))

comparePlans(
parsePlan(sql4),
AlterTableReplaceColumnsStatement(
Seq("table_name"),
Seq(
QualifiedColType(Seq("x"), StringType, true, Some("x1"), None),
QualifiedColType(Seq("y"), IntegerType, true, Some("y1"), None)
)))

intercept("ALTER TABLE table_name PARTITION (a='1') REPLACE COLUMNS (x string)",
"Operation not allowed: ALTER TABLE table PARTITION partition_spec REPLACE COLUMNS")

intercept("ALTER TABLE table_name REPLACE COLUMNS (x string NOT NULL)",
"NOT NULL is not supported in Hive-style REPLACE COLUMNS")

intercept("ALTER TABLE table_name REPLACE COLUMNS (x string FIRST)",
"Column position is not supported in Hive-style REPLACE COLUMNS")
}

test("alter table/view: rename table/view") {
comparePlans(
parsePlan("ALTER TABLE a.b.c RENAME TO x.y.z"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,29 @@ class ResolveSessionCatalog(
createAlterTable(nameParts, catalog, tbl, changes)
}

case AlterTableReplaceColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(_: V1Table) =>
throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
case Some(table) =>
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
val deleteChanges = table.schema.fieldNames.map { name =>
TableChange.deleteColumn(Array(name))
}
val addChanges = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
}
deleteChanges ++ addChanges
case None => Seq() // Unresolved table will be handled in CheckAnalysis.
}
createAlterTable(nameParts, catalog, tbl, changes)

case a @ AlterTableAlterColumnStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
loadTable(catalog, tbl.asIdentifier).collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,4 +1073,19 @@ trait AlterTableTests extends SharedSparkSession {
assert(updated.properties === withDefaultOwnership(Map("provider" -> v2Format)).asJava)
}
}

test("AlterTable: replace columns") {
val t = s"${catalogAndNamespace}table_name"
withTable(t) {
sql(s"CREATE TABLE $t (col1 int, col2 int COMMENT 'c2') USING $v2Format")
sql(s"ALTER TABLE $t REPLACE COLUMNS (col2 string, col3 int COMMENT 'c3')")

val table = getTableMetadata(t)

assert(table.name === fullTableName(t))
assert(table.schema === StructType(Seq(
StructField("col2", StringType),
StructField("col3", IntegerType).withComment("c3"))))
}
}
}