Skip to content

Commit ecab485

Browse files
committed
[SPARK-28139][SQL] Add v2 ALTER TABLE implementation.
1 parent fe75ff8 commit ecab485

File tree

11 files changed

+1024
-32
lines changed

11 files changed

+1024
-32
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,14 +227,18 @@ public String property() {
227227
}
228228
}
229229

230+
interface ColumnChange extends TableChange {
231+
String[] fieldNames();
232+
}
233+
230234
/**
231235
* A TableChange to add a field.
232236
* <p>
233237
* If the field already exists, the change must result in an {@link IllegalArgumentException}.
234238
* If the new field is nested and its parent does not exist or is not a struct, the change must
235239
* result in an {@link IllegalArgumentException}.
236240
*/
237-
final class AddColumn implements TableChange {
241+
final class AddColumn implements ColumnChange {
238242
private final String[] fieldNames;
239243
private final DataType dataType;
240244
private final boolean isNullable;
@@ -247,6 +251,7 @@ private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, St
247251
this.comment = comment;
248252
}
249253

254+
@Override
250255
public String[] fieldNames() {
251256
return fieldNames;
252257
}
@@ -272,7 +277,7 @@ public String comment() {
272277
* <p>
273278
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
274279
*/
275-
final class RenameColumn implements TableChange {
280+
final class RenameColumn implements ColumnChange {
276281
private final String[] fieldNames;
277282
private final String newName;
278283

@@ -281,6 +286,7 @@ private RenameColumn(String[] fieldNames, String newName) {
281286
this.newName = newName;
282287
}
283288

289+
@Override
284290
public String[] fieldNames() {
285291
return fieldNames;
286292
}
@@ -297,7 +303,7 @@ public String newName() {
297303
* <p>
298304
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
299305
*/
300-
final class UpdateColumnType implements TableChange {
306+
final class UpdateColumnType implements ColumnChange {
301307
private final String[] fieldNames;
302308
private final DataType newDataType;
303309
private final boolean isNullable;
@@ -308,6 +314,7 @@ private UpdateColumnType(String[] fieldNames, DataType newDataType, boolean isNu
308314
this.isNullable = isNullable;
309315
}
310316

317+
@Override
311318
public String[] fieldNames() {
312319
return fieldNames;
313320
}
@@ -328,7 +335,7 @@ public boolean isNullable() {
328335
* <p>
329336
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
330337
*/
331-
final class UpdateColumnComment implements TableChange {
338+
final class UpdateColumnComment implements ColumnChange {
332339
private final String[] fieldNames;
333340
private final String newComment;
334341

@@ -337,6 +344,7 @@ private UpdateColumnComment(String[] fieldNames, String newComment) {
337344
this.newComment = newComment;
338345
}
339346

347+
@Override
340348
public String[] fieldNames() {
341349
return fieldNames;
342350
}
@@ -351,13 +359,14 @@ public String newComment() {
351359
* <p>
352360
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
353361
*/
354-
final class DeleteColumn implements TableChange {
362+
final class DeleteColumn implements ColumnChange {
355363
private final String[] fieldNames;
356364

357365
private DeleteColumn(String[] fieldNames) {
358366
this.fieldNames = fieldNames;
359367
}
360368

369+
@Override
361370
public String[] fieldNames() {
362371
return fieldNames;
363372
}

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange}
2626
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
2727
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
2828
import org.apache.spark.sql.sources.v2.Table
29-
import org.apache.spark.sql.types.{StructField, StructType}
29+
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
3030

3131
object CatalogV2Util {
3232
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
@@ -132,16 +132,45 @@ object CatalogV2Util {
132132
val pos = struct.getFieldIndex(fieldNames.head)
133133
.getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${fieldNames.head}"))
134134
val field = struct.fields(pos)
135-
val replacement: Option[StructField] = if (fieldNames.tail.isEmpty) {
136-
update(field)
137-
} else {
138-
field.dataType match {
139-
case nestedStruct: StructType =>
140-
val updatedType: StructType = replace(nestedStruct, fieldNames.tail, update)
141-
Some(StructField(field.name, updatedType, field.nullable, field.metadata))
142-
case _ =>
143-
throw new IllegalArgumentException(s"Not a struct: ${fieldNames.head}")
144-
}
135+
val replacement: Option[StructField] = (fieldNames.tail, field.dataType) match {
136+
case (Seq(), _) =>
137+
update(field)
138+
139+
case (names, struct: StructType) =>
140+
val updatedType: StructType = replace(struct, names, update)
141+
Some(StructField(field.name, updatedType, field.nullable, field.metadata))
142+
143+
case (Seq("key"), map @ MapType(keyType, _, _)) =>
144+
val updated = update(StructField("key", keyType, nullable = false))
145+
.getOrElse(throw new IllegalArgumentException(s"Cannot delete map key"))
146+
Some(field.copy(dataType = map.copy(keyType = updated.dataType)))
147+
148+
case (Seq("key", names @ _*), map @ MapType(keyStruct: StructType, _, _)) =>
149+
Some(field.copy(dataType = map.copy(keyType = replace(keyStruct, names, update))))
150+
151+
case (Seq("value"), map @ MapType(_, mapValueType, isNullable)) =>
152+
val updated = update(StructField("value", mapValueType, nullable = isNullable))
153+
.getOrElse(throw new IllegalArgumentException(s"Cannot delete map value"))
154+
Some(field.copy(dataType = map.copy(
155+
valueType = updated.dataType,
156+
valueContainsNull = updated.nullable)))
157+
158+
case (Seq("value", names @ _*), map @ MapType(_, valueStruct: StructType, _)) =>
159+
Some(field.copy(dataType = map.copy(valueType = replace(valueStruct, names, update))))
160+
161+
case (Seq("element"), array @ ArrayType(elementType, isNullable)) =>
162+
val updated = update(StructField("element", elementType, nullable = isNullable))
163+
.getOrElse(throw new IllegalArgumentException(s"Cannot delete array element"))
164+
Some(field.copy(dataType = array.copy(
165+
elementType = updated.dataType,
166+
containsNull = updated.nullable)))
167+
168+
case (Seq("element", names @ _*), array @ ArrayType(elementStruct: StructType, _)) =>
169+
Some(field.copy(dataType = array.copy(elementType = replace(elementStruct, names, update))))
170+
171+
case (names, dataType) =>
172+
throw new IllegalArgumentException(
173+
s"Cannot find field: ${names.head} in ${dataType.simpleString}")
145174
}
146175

147176
val newFields = struct.fields.zipWithIndex.flatMap {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424
import scala.util.Random
2525

2626
import org.apache.spark.sql.AnalysisException
27-
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog}
27+
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog, TableChange}
2828
import org.apache.spark.sql.catalyst._
2929
import org.apache.spark.sql.catalyst.catalog._
3030
import org.apache.spark.sql.catalyst.encoders.OuterScopes
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
3434
import org.apache.spark.sql.catalyst.expressions.objects._
3535
import org.apache.spark.sql.catalyst.plans._
3636
import org.apache.spark.sql.catalyst.plans.logical._
37+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement}
3738
import org.apache.spark.sql.catalyst.rules._
3839
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
3940
import org.apache.spark.sql.catalyst.util.toPrettySQL
@@ -163,6 +164,7 @@ class Analyzer(
163164
new SubstituteUnresolvedOrdinals(conf)),
164165
Batch("Resolution", fixedPoint,
165166
ResolveTableValuedFunctions ::
167+
ResolveAlterTable ::
166168
ResolveTables ::
167169
ResolveRelations ::
168170
ResolveReferences ::
@@ -777,6 +779,86 @@ class Analyzer(
777779
}
778780
}
779781

782+
/**
783+
* Resolve ALTER TABLE statements that use a DSv2 catalog.
784+
*
785+
* This rule converts unresolved ALTER TABLE statements to v2 when a v2 catalog is responsible
786+
* for the table identifier. A v2 catalog is responsible for an identifier when the identifier
787+
* has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and
788+
* the table identifier does not include a catalog.
789+
*/
790+
object ResolveAlterTable extends Rule[LogicalPlan] {
791+
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
792+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
793+
case alter @ AlterTableAddColumnsStatement(
794+
CatalogObjectIdentifier(Some(v2Catalog), ident), cols) =>
795+
val changes = cols.map { col =>
796+
TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
797+
}
798+
799+
AlterTable(
800+
v2Catalog.asTableCatalog, ident,
801+
UnresolvedRelation(alter.tableName),
802+
changes)
803+
804+
case alter @ AlterTableAlterColumnStatement(
805+
CatalogObjectIdentifier(Some(v2Catalog), ident), colName, dataType, comment) =>
806+
val typeChange = dataType.map { newDataType =>
807+
TableChange.updateColumnType(colName.toArray, newDataType, true)
808+
}
809+
810+
val commentChange = comment.map { newComment =>
811+
TableChange.updateColumnComment(colName.toArray, newComment)
812+
}
813+
814+
AlterTable(
815+
v2Catalog.asTableCatalog, ident,
816+
UnresolvedRelation(alter.tableName),
817+
typeChange.toSeq ++ commentChange.toSeq)
818+
819+
case alter @ AlterTableRenameColumnStatement(
820+
CatalogObjectIdentifier(Some(v2Catalog), ident), col, newName) =>
821+
AlterTable(
822+
v2Catalog.asTableCatalog, ident,
823+
UnresolvedRelation(alter.tableName),
824+
Seq(TableChange.renameColumn(col.toArray, newName)))
825+
826+
case alter @ AlterTableDropColumnsStatement(
827+
CatalogObjectIdentifier(Some(v2Catalog), ident), cols) =>
828+
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
829+
AlterTable(
830+
v2Catalog.asTableCatalog, ident,
831+
UnresolvedRelation(alter.tableName),
832+
changes)
833+
834+
case alter @ AlterTableSetPropertiesStatement(
835+
CatalogObjectIdentifier(Some(v2Catalog), ident), props) =>
836+
val changes = props.map {
837+
case (key, value) =>
838+
TableChange.setProperty(key, value)
839+
}
840+
841+
AlterTable(
842+
v2Catalog.asTableCatalog, ident,
843+
UnresolvedRelation(alter.tableName),
844+
changes.toSeq)
845+
846+
case alter @ AlterTableUnsetPropertiesStatement(
847+
CatalogObjectIdentifier(Some(v2Catalog), ident), keys, _) =>
848+
AlterTable(
849+
v2Catalog.asTableCatalog, ident,
850+
UnresolvedRelation(alter.tableName),
851+
keys.map(key => TableChange.removeProperty(key)))
852+
853+
case alter @ AlterTableSetLocationStatement(
854+
CatalogObjectIdentifier(Some(v2Catalog), ident), newLoc) =>
855+
AlterTable(
856+
v2Catalog.asTableCatalog, ident,
857+
UnresolvedRelation(alter.tableName),
858+
Seq(TableChange.setProperty("location", newLoc)))
859+
}
860+
}
861+
780862
/**
781863
* Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from
782864
* a logical plan node's children.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.api.python.PythonEvalType
2121
import org.apache.spark.sql.AnalysisException
22+
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnType}
2223
import org.apache.spark.sql.catalyst.expressions._
2324
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
2425
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -353,6 +354,42 @@ trait CheckAnalysis extends PredicateHelper {
353354
case _ =>
354355
}
355356

357+
case alter: AlterTable if alter.childrenResolved =>
358+
val table = alter.table
359+
def findField(operation: String, fieldName: Array[String]): StructField = {
360+
// include collections because structs nested in maps and arrays may be altered
361+
val field = table.schema.findNestedField(fieldName, includeCollections = true)
362+
if (field.isEmpty) {
363+
throw new AnalysisException(
364+
s"Cannot $operation missing field in ${table.name} schema: ${fieldName.quoted}")
365+
}
366+
field.get
367+
}
368+
369+
alter.changes.foreach {
370+
case add: AddColumn =>
371+
val parent = add.fieldNames.init
372+
if (parent.nonEmpty) {
373+
findField("add to", parent)
374+
}
375+
case update: UpdateColumnType =>
376+
val field = findField("update", update.fieldNames)
377+
if (!Cast.canUpCast(field.dataType, update.newDataType)) {
378+
throw new AnalysisException(
379+
s"Cannot update ${table.name} field ${update.fieldNames}: " +
380+
s"${field.dataType.simpleString} cannot be cast to " +
381+
s"${update.newDataType.simpleString}")
382+
}
383+
case rename: RenameColumn =>
384+
findField("rename", rename.fieldNames)
385+
case update: UpdateColumnComment =>
386+
findField("update", update.fieldNames)
387+
case delete: DeleteColumn =>
388+
findField("delete", delete.fieldNames)
389+
case _ =>
390+
// no validation needed for set and remove property
391+
}
392+
356393
case _ => // Fallbacks to the following checks
357394
}
358395

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,15 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
4040
*
4141
* @param multipartIdentifier table name
4242
*/
43-
case class UnresolvedRelation(multipartIdentifier: Seq[String]) extends LeafNode {
43+
case class UnresolvedRelation(
44+
multipartIdentifier: Seq[String]) extends LeafNode with NamedRelation {
4445
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
4546

4647
/** Returns a `.` separated name for this relation. */
4748
def tableName: String = multipartIdentifier.quoted
4849

50+
override def name: String = tableName
51+
4952
override def output: Seq[Attribute] = Nil
5053

5154
override lazy val resolved = false

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20-
import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog}
20+
import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange}
21+
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, ColumnChange}
2122
import org.apache.spark.sql.catalog.v2.expressions.Transform
2223
import org.apache.spark.sql.catalyst.AliasIdentifier
2324
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
@@ -507,6 +508,40 @@ case class DropTable(
507508
ident: Identifier,
508509
ifExists: Boolean) extends Command
509510

511+
/**
512+
* Alter a table.
513+
*/
514+
case class AlterTable(
515+
catalog: TableCatalog,
516+
ident: Identifier,
517+
table: NamedRelation,
518+
changes: Seq[TableChange]) extends Command {
519+
520+
override def children: Seq[LogicalPlan] = Seq(table)
521+
522+
override lazy val resolved: Boolean = {
523+
changes.forall {
524+
case add: AddColumn =>
525+
add.fieldNames match {
526+
case Array(_) =>
527+
// a top-level field can always be added
528+
true
529+
case _ =>
530+
// the parent field must exist
531+
table.schema.findNestedField(add.fieldNames.init, includeCollections = true).isDefined
532+
}
533+
534+
case colChange: ColumnChange =>
535+
// the column that will be changed must exist
536+
table.schema.findNestedField(colChange.fieldNames, includeCollections = true).isDefined
537+
538+
case _ =>
539+
// property changes require no resolution checks
540+
true
541+
}
542+
}
543+
}
544+
510545

511546
/**
512547
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the

0 commit comments

Comments
 (0)