Skip to content

Commit 4b35282

Browse files
ctringcloud-fan
authored andcommitted
[SPARK-50883][SQL] Support altering multiple columns in the same command
### What changes were proposed in this pull request? We propose the following new syntax for altering multiple columns at the same time: ``` ALTER TABLE table_name ALTER COLUMN { { column_identifier | field_name } { COMMENT comment | { FIRST | AFTER identifier } | { SET | DROP } NOT NULL | TYPE data_type | SET DEFAULT clause | DROP DEFAULT } } [, ...] ``` For example: ``` ALTER TABLE test_table ALTER COLUMN a COMMENT "new comment", b TYPE BIGINT, x.y.z FIRST ``` This new syntax is backward compatible with the current syntax. To bound the complexity of the initial support of this syntax we place the following restrictions: + Altering the same column multiple times is not allowed + Altering a parent and a child column (for nested data type) is not allowed. + Altering v1 tables with this new syntax is not allowed. In terms of implementation, we modify the current `AlterColumn` logical plan to be `AlterColumns` that can take in multiple columns and `AlterColumnSpec`s. All `AlterColumnSpec`s are checked during analyzing phase, so if one of them is invalid (e.g., non-existent column, wrong type conversion, etc), the entire command will fail. The `AlterColumnSpec`s are transformed into `TableChange`s, which are passed to the `TableCatalog::alterTable` method. Therefore, the semantics of this new command (atomic vs non-atomic) depends on the implementation of this method. The `V2SessionCatalog::alterTable` currently applies all table changes to the catalog table and then send to the catalog in one request. As a result, column changes are by default applied to the catalog (HMS) atomically: either all changes are made or none are. For example, the above command produces the following plans: ``` == Physical Plan == AlterTable org.apache.spark.sql.delta.catalog.DeltaCatalog6d89c923, default.test_table, [org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnCommentff58ec42, org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnType7e7c730c, org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnPositionbc842915] == Parsed Logical Plan == 'AlterColumns [unresolvedfieldname(a), unresolvedfieldname(b), unresolvedfieldname(x, y, z)], [AlterColumnSpec(None,None,Some(new comment),None,None), AlterColumnSpec(Some(LongType),None,None,None,None), AlterColumnSpec(None,None,None,Some(unresolvedfieldposition(FIRST)),None)] +- 'UnresolvedTable [test_table], ALTER TABLE ... ALTER COLUMN == Analyzed Logical Plan == AlterColumns [resolvedfieldname(StructField(a,IntegerType,true)), resolvedfieldname(StructField(b,IntegerType,true)), resolvedfieldname(x, y, StructField(z,IntegerType,true))], [AlterColumnSpec(None,None,Some(new comment),None,None), AlterColumnSpec(Some(LongType),None,None,None,None), AlterColumnSpec(None,None,None,Some(resolvedfieldposition(FIRST)),None)] +- ResolvedTable org.apache.spark.sql.delta.catalog.DeltaCatalog6d89c923, default.test_table, DeltaTableV2(...)),Some(default.test_table),None,Map()), [a#163, b#164, x#165] == Physical Plan == AlterTable org.apache.spark.sql.delta.catalog.DeltaCatalog6d89c923, default.test_table, [org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnCommentff58ec42, org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnType7e7c730c, org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnPositionbc842915] ``` ### Why are the changes needed? The current ALTER TABLE ... ALTER COLUMN syntax allows altering only one column at a time. For a large table with many columns, a command must be run for each column, which can be slow due to the repeated preprocessing and I/O costs. A new syntax that enables specifying multiple columns could allow these costs to be shared across multiple column changes. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? New unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #49559 from ctring/bulk-alter-column. Authored-by: Cuong Nguyen <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 917a9a1 commit 4b35282

File tree

14 files changed

+547
-291
lines changed

14 files changed

+547
-291
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3962,6 +3962,12 @@
39623962
],
39633963
"sqlState" : "0A000"
39643964
},
3965+
"NOT_SUPPORTED_CHANGE_SAME_COLUMN" : {
3966+
"message" : [
3967+
"ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing <table>'s column <fieldName> including its nested fields multiple times in the same command."
3968+
],
3969+
"sqlState" : "0A000"
3970+
},
39653971
"NOT_SUPPORTED_COMMAND_FOR_V2_TABLE" : {
39663972
"message" : [
39673973
"<cmd> is not supported for v2 tables."

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,7 @@ statement
212212
| ALTER (TABLE | VIEW) identifierReference
213213
UNSET TBLPROPERTIES (IF EXISTS)? propertyList #unsetTableProperties
214214
| ALTER TABLE table=identifierReference
215-
(ALTER | CHANGE) COLUMN? column=multipartIdentifier
216-
alterColumnAction? #alterTableAlterColumn
215+
(ALTER | CHANGE) COLUMN? columns=alterColumnSpecList #alterTableAlterColumn
217216
| ALTER TABLE table=identifierReference partitionSpec?
218217
CHANGE COLUMN?
219218
colName=multipartIdentifier colType colPosition? #hiveChangeColumn
@@ -1489,6 +1488,14 @@ number
14891488
| MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral
14901489
;
14911490

1491+
alterColumnSpecList
1492+
: alterColumnSpec (COMMA alterColumnSpec)*
1493+
;
1494+
1495+
alterColumnSpec
1496+
: column=multipartIdentifier alterColumnAction?
1497+
;
1498+
14921499
alterColumnAction
14931500
: TYPE dataType
14941501
| commentSpec

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3847,24 +3847,29 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
38473847
resolved.copyTagsFrom(a)
38483848
resolved
38493849

3850-
case a @ AlterColumn(
3851-
table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position, _) =>
3852-
val newDataType = dataType.flatMap { dt =>
3853-
// Hive style syntax provides the column type, even if it may not have changed.
3854-
val existing = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
3855-
if (existing == dt) None else Some(dt)
3856-
}
3857-
val newPosition = position map {
3858-
case u @ UnresolvedFieldPosition(after: After) =>
3859-
// TODO: since the field name is already resolved, it's more efficient if
3860-
// `ResolvedFieldName` carries the parent struct and we resolve column position
3861-
// based on the parent struct, instead of re-resolving the entire column path.
3862-
val resolved = resolveFieldNames(table, path :+ after.column(), u)
3863-
ResolvedFieldPosition(ColumnPosition.after(resolved.field.name))
3864-
case u: UnresolvedFieldPosition => ResolvedFieldPosition(u.position)
3865-
case other => other
3850+
case a @ AlterColumns(table: ResolvedTable, specs) =>
3851+
val resolvedSpecs = specs.map {
3852+
case s @ AlterColumnSpec(ResolvedFieldName(path, field), dataType, _, _, position, _) =>
3853+
val newDataType = dataType.flatMap { dt =>
3854+
// Hive style syntax provides the column type, even if it may not have changed.
3855+
val existing = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
3856+
if (existing == dt) None else Some(dt)
3857+
}
3858+
val newPosition = position map {
3859+
case u @ UnresolvedFieldPosition(after: After) =>
3860+
// TODO: since the field name is already resolved, it's more efficient if
3861+
// `ResolvedFieldName` carries the parent struct and we resolve column
3862+
// position based on the parent struct, instead of re-resolving the entire
3863+
// column path.
3864+
val resolved = resolveFieldNames(table, path :+ after.column(), u)
3865+
ResolvedFieldPosition(ColumnPosition.after(resolved.field.name))
3866+
case u: UnresolvedFieldPosition => ResolvedFieldPosition(u.position)
3867+
case other => other
3868+
}
3869+
s.copy(newDataType = newDataType, newPosition = newPosition)
3870+
case spec => spec
38663871
}
3867-
val resolved = a.copy(dataType = newDataType, position = newPosition)
3872+
val resolved = a.copy(specs = resolvedSpecs)
38683873
resolved.copyTagsFrom(a)
38693874
resolved
38703875
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 69 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1622,60 +1622,82 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
16221622
case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
16231623
checkColumnNotExists("rename", col.path :+ newName, table.schema)
16241624

1625-
case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _, _) =>
1626-
val fieldName = col.name.quoted
1627-
if (a.dataType.isDefined) {
1628-
val field = CharVarcharUtils.getRawType(col.field.metadata)
1629-
.map(dt => col.field.copy(dataType = dt))
1630-
.getOrElse(col.field)
1631-
val newDataType = a.dataType.get
1632-
newDataType match {
1633-
case _: StructType => alter.failAnalysis(
1634-
"CANNOT_UPDATE_FIELD.STRUCT_TYPE",
1635-
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
1636-
case _: MapType => alter.failAnalysis(
1637-
"CANNOT_UPDATE_FIELD.MAP_TYPE",
1638-
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
1639-
case _: ArrayType => alter.failAnalysis(
1640-
"CANNOT_UPDATE_FIELD.ARRAY_TYPE",
1641-
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
1642-
case u: UserDefinedType[_] => alter.failAnalysis(
1643-
"CANNOT_UPDATE_FIELD.USER_DEFINED_TYPE",
1644-
Map(
1645-
"table" -> toSQLId(table.name),
1646-
"fieldName" -> toSQLId(fieldName),
1647-
"udtSql" -> toSQLType(u)))
1648-
case _: CalendarIntervalType | _: AnsiIntervalType => alter.failAnalysis(
1649-
"CANNOT_UPDATE_FIELD.INTERVAL_TYPE",
1650-
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
1651-
case _ => // update is okay
1652-
}
1653-
1654-
// We don't need to handle nested types here which shall fail before.
1655-
def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
1656-
case (CharType(l1), CharType(l2)) => l1 == l2
1657-
case (CharType(l1), VarcharType(l2)) => l1 <= l2
1658-
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
1659-
case _ => Cast.canUpCast(from, to)
1660-
}
1661-
if (!canAlterColumnType(field.dataType, newDataType)) {
1625+
case AlterColumns(table: ResolvedTable, specs) =>
1626+
val groupedColumns = specs.groupBy(_.column.name)
1627+
groupedColumns.collect {
1628+
case (name, occurrences) if occurrences.length > 1 =>
16621629
alter.failAnalysis(
1663-
errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
1630+
errorClass = "NOT_SUPPORTED_CHANGE_SAME_COLUMN",
16641631
messageParameters = Map(
16651632
"table" -> toSQLId(table.name),
1666-
"originName" -> toSQLId(fieldName),
1667-
"originType" -> toSQLType(field.dataType),
1668-
"newName" -> toSQLId(fieldName),
1669-
"newType" -> toSQLType(newDataType)))
1670-
}
1633+
"fieldName" -> toSQLId(name)))
16711634
}
1672-
if (a.nullable.isDefined) {
1673-
if (!a.nullable.get && col.field.nullable) {
1635+
groupedColumns.keys.foreach { name =>
1636+
if (groupedColumns.keys.exists(child => child != name && child.startsWith(name))) {
16741637
alter.failAnalysis(
1675-
errorClass = "_LEGACY_ERROR_TEMP_2330",
1676-
messageParameters = Map("fieldName" -> fieldName))
1638+
errorClass = "NOT_SUPPORTED_CHANGE_SAME_COLUMN",
1639+
messageParameters = Map(
1640+
"table" -> toSQLId(table.name),
1641+
"fieldName" -> toSQLId(name)))
16771642
}
16781643
}
1644+
specs.foreach {
1645+
case AlterColumnSpec(col: ResolvedFieldName, dataType, nullable, _, _, _) =>
1646+
val fieldName = col.name.quoted
1647+
if (dataType.isDefined) {
1648+
val field = CharVarcharUtils.getRawType(col.field.metadata)
1649+
.map(dt => col.field.copy(dataType = dt))
1650+
.getOrElse(col.field)
1651+
val newDataType = dataType.get
1652+
newDataType match {
1653+
case _: StructType => alter.failAnalysis(
1654+
"CANNOT_UPDATE_FIELD.STRUCT_TYPE",
1655+
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
1656+
case _: MapType => alter.failAnalysis(
1657+
"CANNOT_UPDATE_FIELD.MAP_TYPE",
1658+
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
1659+
case _: ArrayType => alter.failAnalysis(
1660+
"CANNOT_UPDATE_FIELD.ARRAY_TYPE",
1661+
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
1662+
case u: UserDefinedType[_] => alter.failAnalysis(
1663+
"CANNOT_UPDATE_FIELD.USER_DEFINED_TYPE",
1664+
Map(
1665+
"table" -> toSQLId(table.name),
1666+
"fieldName" -> toSQLId(fieldName),
1667+
"udtSql" -> toSQLType(u)))
1668+
case _: CalendarIntervalType | _: AnsiIntervalType => alter.failAnalysis(
1669+
"CANNOT_UPDATE_FIELD.INTERVAL_TYPE",
1670+
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
1671+
case _ => // update is okay
1672+
}
1673+
1674+
// We don't need to handle nested types here which shall fail before.
1675+
def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
1676+
case (CharType(l1), CharType(l2)) => l1 == l2
1677+
case (CharType(l1), VarcharType(l2)) => l1 <= l2
1678+
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
1679+
case _ => Cast.canUpCast(from, to)
1680+
}
1681+
if (!canAlterColumnType(field.dataType, newDataType)) {
1682+
alter.failAnalysis(
1683+
errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
1684+
messageParameters = Map(
1685+
"table" -> toSQLId(table.name),
1686+
"originName" -> toSQLId(fieldName),
1687+
"originType" -> toSQLType(field.dataType),
1688+
"newName" -> toSQLId(fieldName),
1689+
"newType" -> toSQLType(newDataType)))
1690+
}
1691+
}
1692+
if (nullable.isDefined) {
1693+
if (!nullable.get && col.field.nullable) {
1694+
alter.failAnalysis(
1695+
errorClass = "_LEGACY_ERROR_TEMP_2330",
1696+
messageParameters = Map("fieldName" -> fieldName))
1697+
}
1698+
}
1699+
case _ =>
1700+
}
16791701
case _ =>
16801702
}
16811703
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultStringTypes.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal}
21-
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterViewAs, ColumnDefinition, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, V1CreateTablePlan, V2CreateTablePlan}
21+
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, V1CreateTablePlan, V2CreateTablePlan}
2222
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
2323
import org.apache.spark.sql.types.{DataType, StringType}
2424

@@ -93,7 +93,7 @@ object ResolveDefaultStringTypes extends Rule[LogicalPlan] {
9393
StringType(conf.defaultStringType.collationId)
9494

9595
private def isDDLCommand(plan: LogicalPlan): Boolean = plan exists {
96-
case _: AddColumns | _: ReplaceColumns | _: AlterColumn => true
96+
case _: AddColumns | _: ReplaceColumns | _: AlterColumns => true
9797
case _ => isCreateOrAlterPlan(plan)
9898
}
9999

@@ -115,9 +115,13 @@ object ResolveDefaultStringTypes extends Rule[LogicalPlan] {
115115
case replaceCols: ReplaceColumns =>
116116
replaceCols.copy(columnsToAdd = replaceColumnTypes(replaceCols.columnsToAdd, newType))
117117

118-
case alter: AlterColumn
119-
if alter.dataType.isDefined && hasDefaultStringType(alter.dataType.get) =>
120-
alter.copy(dataType = Some(replaceDefaultStringType(alter.dataType.get, newType)))
118+
case a @ AlterColumns(_, specs: Seq[AlterColumnSpec]) =>
119+
val newSpecs = specs.map {
120+
case spec if spec.newDataType.isDefined && hasDefaultStringType(spec.newDataType.get) =>
121+
spec.copy(newDataType = Some(replaceDefaultStringType(spec.newDataType.get, newType)))
122+
case col => col
123+
}
124+
a.copy(specs = newSpecs)
121125
}
122126
}
123127

0 commit comments

Comments
 (0)