diff --git a/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index 40c8de813cbd..d0b228df0a2f 100644 --- a/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -71,6 +71,8 @@ statement | ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform #dropPartitionField | ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform WITH transform (AS name=identifier)? #replacePartitionField | ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering + | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields + | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields ; writeSpec @@ -157,9 +159,13 @@ quotedIdentifier : BACKQUOTED_IDENTIFIER ; +fieldList + : fields+=multipartIdentifier (',' fields+=multipartIdentifier)* + ; + nonReserved : ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE - | DISTRIBUTED | LOCALLY | UNORDERED + | DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET | TRUE | FALSE | MAP ; @@ -174,6 +180,7 @@ DESC: 'DESC'; DISTRIBUTED: 'DISTRIBUTED'; DROP: 'DROP'; FIELD: 'FIELD'; +FIELDS: 'FIELDS'; FIRST: 'FIRST'; LAST: 'LAST'; LOCALLY: 'LOCALLY'; @@ -181,6 +188,8 @@ NULLS: 'NULLS'; ORDERED: 'ORDERED'; PARTITION: 'PARTITION'; REPLACE: 'REPLACE'; +IDENTIFIER_KW: 'IDENTIFIER'; +SET: 'SET'; TABLE: 'TABLE'; UNORDERED: 'UNORDERED'; WITH: 'WITH'; diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 11a8017bb1a5..f11e30feaf7a 100644 --- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -116,7 +116,9 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI normalized.contains("write ordered by") || normalized.contains("write locally ordered by") || normalized.contains("write distributed by") || - normalized.contains("write unordered"))) + normalized.contains("write unordered") || + normalized.contains("set identifier fields") || + normalized.contains("drop identifier fields"))) } protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = { diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 8ecd1f02c8cc..678da9bfc345 100644 --- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -37,11 +37,13 @@ import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParse import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField import org.apache.spark.sql.catalyst.plans.logical.CallArgument import org.apache.spark.sql.catalyst.plans.logical.CallStatement +import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.NamedArgument import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField +import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin @@ -85,7 +87,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS /** - * Create an CHANGE PARTITION FIELD logical command. + * Create an REPLACE PARTITION FIELD logical command. */ override def visitReplacePartitionField(ctx: ReplacePartitionFieldContext): ReplacePartitionField = withOrigin(ctx) { ReplacePartitionField( @@ -95,6 +97,24 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS Option(ctx.name).map(_.getText)) } + /** + * Create an SET IDENTIFIER FIELDS logical command. + */ + override def visitSetIdentifierFields(ctx: SetIdentifierFieldsContext): SetIdentifierFields = withOrigin(ctx) { + SetIdentifierFields( + typedVisit[Seq[String]](ctx.multipartIdentifier), + ctx.fieldList.fields.asScala.map(_.getText)) + } + + /** + * Create an DROP IDENTIFIER FIELDS logical command. + */ + override def visitDropIdentifierFields(ctx: DropIdentifierFieldsContext): DropIdentifierFields = withOrigin(ctx) { + DropIdentifierFields( + typedVisit[Seq[String]](ctx.multipartIdentifier), + ctx.fieldList.fields.asScala.map(_.getText)) + } + /** * Create a [[SetWriteDistributionAndOrdering]] for changing the write distribution and ordering. */ diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropIdentifierFields.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropIdentifierFields.scala new file mode 100644 index 000000000000..115af1586d7a --- /dev/null +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropIdentifierFields.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class DropIdentifierFields( + table: Seq[String], + fields: Seq[String]) extends Command { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"DropIdentifierFields ${table.quoted} (${fields.quoted})" + } +} diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetIdentifierFields.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetIdentifierFields.scala new file mode 100644 index 000000000000..2e9a34b87204 --- /dev/null +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetIdentifierFields.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.expressions.Transform + +case class SetIdentifierFields( + table: Seq[String], + fields: Seq[String]) extends Command { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"SetIdentifierFields ${table.quoted} (${fields.quoted})" + } +} diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIdentifierFieldsExec.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIdentifierFieldsExec.scala new file mode 100644 index 000000000000..525ed77437a5 --- /dev/null +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIdentifierFieldsExec.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions +import org.apache.iceberg.relocated.com.google.common.collect.Sets +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.TableCatalog + +case class DropIdentifierFieldsExec( + catalog: TableCatalog, + ident: Identifier, + fields: Seq[String]) extends V2CommandExec { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable => + val schema = iceberg.table.schema + val identifierFieldNames = Sets.newHashSet(schema.identifierFieldNames) + + for (name <- fields) { + Preconditions.checkArgument(schema.findField(name) != null, + "Cannot complete drop identifier fields operation: field %s not found", name) + Preconditions.checkArgument(identifierFieldNames.contains(name), + "Cannot complete drop identifier fields operation: %s is not an identifier field", name) + identifierFieldNames.remove(name) + } + + iceberg.table.updateSchema() + .setIdentifierFields(identifierFieldNames) + .commit(); + case table => + throw new UnsupportedOperationException(s"Cannot drop identifier fields in non-Iceberg table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"DropIdentifierFields ${catalog.name}.${ident.quoted} (${fields.quoted})"; + } +} diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index d5901a8446ce..6f0361fdb150 100644 --- a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField import org.apache.spark.sql.catalyst.plans.logical.Call +import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilter import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilterWithCardinalityCheck @@ -40,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.MergeInto import org.apache.spark.sql.catalyst.plans.logical.ReplaceData import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField +import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog @@ -66,6 +68,12 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy { case ReplacePartitionField(IcebergCatalogAndIdentifier(catalog, ident), transformFrom, transformTo, name) => ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil + case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) => + SetIdentifierFieldsExec(catalog, ident, fields) :: Nil + + case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) => + DropIdentifierFieldsExec(catalog, ident, fields) :: Nil + case SetWriteDistributionAndOrdering( IcebergCatalogAndIdentifier(catalog, ident), distributionMode, ordering) => SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode, ordering) :: Nil diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetIdentifierFieldsExec.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetIdentifierFieldsExec.scala new file mode 100644 index 000000000000..7fad2dc016d2 --- /dev/null +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetIdentifierFieldsExec.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.TableCatalog + +case class SetIdentifierFieldsExec( + catalog: TableCatalog, + ident: Identifier, + fields: Seq[String]) extends V2CommandExec { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable => + iceberg.table.updateSchema() + .setIdentifierFields(scala.collection.JavaConverters.seqAsJavaList(fields)) + .commit(); + case table => + throw new UnsupportedOperationException(s"Cannot set identifier fields in non-Iceberg table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"SetIdentifierFields ${catalog.name}.${ident.quoted} (${fields.quoted})"; + } +} diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTableSchema.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTableSchema.java new file mode 100644 index 000000000000..ac12953d0a7e --- /dev/null +++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTableSchema.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestAlterTableSchema extends SparkExtensionsTestBase { + public TestAlterTableSchema(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testSetIdentifierFields() { + sql("CREATE TABLE %s (id bigint NOT NULL, " + + "location struct NOT NULL) USING iceberg", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertTrue("Table should start without identifier", table.schema().identifierFieldIds().isEmpty()); + + sql("ALTER TABLE %s SET IDENTIFIER FIELDS id", tableName); + table.refresh(); + Assert.assertEquals("Should have new identifier field", + Sets.newHashSet(table.schema().findField("id").fieldId()), + table.schema().identifierFieldIds()); + + sql("ALTER TABLE %s SET IDENTIFIER FIELDS id, location.lon", tableName); + table.refresh(); + Assert.assertEquals("Should have new identifier field", + Sets.newHashSet( + table.schema().findField("id").fieldId(), + table.schema().findField("location.lon").fieldId()), + table.schema().identifierFieldIds()); + + sql("ALTER TABLE %s SET IDENTIFIER FIELDS location.lon", tableName); + table.refresh(); + Assert.assertEquals("Should have new identifier field", + Sets.newHashSet(table.schema().findField("location.lon").fieldId()), + table.schema().identifierFieldIds()); + } + + @Test + public void testSetInvalidIdentifierFields() { + sql("CREATE TABLE %s (id bigint NOT NULL, id2 bigint) USING iceberg", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertTrue("Table should start without identifier", table.schema().identifierFieldIds().isEmpty()); + AssertHelpers.assertThrows("should not allow setting unknown fields", + IllegalArgumentException.class, + "not found in current schema or added columns", + () -> sql("ALTER TABLE %s SET IDENTIFIER FIELDS unknown", tableName)); + + AssertHelpers.assertThrows("should not allow setting optional fields", + IllegalArgumentException.class, + "not a required field", + () -> sql("ALTER TABLE %s SET IDENTIFIER FIELDS id2", tableName)); + } + + @Test + public void testDropIdentifierFields() { + sql("CREATE TABLE %s (id bigint NOT NULL, " + + "location struct NOT NULL) USING iceberg", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertTrue("Table should start without identifier", table.schema().identifierFieldIds().isEmpty()); + + sql("ALTER TABLE %s SET IDENTIFIER FIELDS id, location.lon", tableName); + table.refresh(); + Assert.assertEquals("Should have new identifier fields", + Sets.newHashSet( + table.schema().findField("id").fieldId(), + table.schema().findField("location.lon").fieldId()), + table.schema().identifierFieldIds()); + + sql("ALTER TABLE %s DROP IDENTIFIER FIELDS id", tableName); + table.refresh(); + Assert.assertEquals("Should removed identifier field", + Sets.newHashSet(table.schema().findField("location.lon").fieldId()), + table.schema().identifierFieldIds()); + + sql("ALTER TABLE %s SET IDENTIFIER FIELDS id, location.lon", tableName); + table.refresh(); + Assert.assertEquals("Should have new identifier fields", + Sets.newHashSet( + table.schema().findField("id").fieldId(), + table.schema().findField("location.lon").fieldId()), + table.schema().identifierFieldIds()); + + sql("ALTER TABLE %s DROP IDENTIFIER FIELDS id, location.lon", tableName); + table.refresh(); + Assert.assertEquals("Should have no identifier field", + Sets.newHashSet(), + table.schema().identifierFieldIds()); + } + + @Test + public void testDropInvalidIdentifierFields() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string NOT NULL, " + + "location struct NOT NULL) USING iceberg", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertTrue("Table should start without identifier", table.schema().identifierFieldIds().isEmpty()); + AssertHelpers.assertThrows("should not allow dropping unknown fields", + IllegalArgumentException.class, + "field unknown not found", + () -> sql("ALTER TABLE %s DROP IDENTIFIER FIELDS unknown", tableName)); + + sql("ALTER TABLE %s SET IDENTIFIER FIELDS id", tableName); + AssertHelpers.assertThrows("should not allow dropping a field that is not an identifier", + IllegalArgumentException.class, + "data is not an identifier field", + () -> sql("ALTER TABLE %s DROP IDENTIFIER FIELDS data", tableName)); + + AssertHelpers.assertThrows("should not allow dropping a nested field that is not an identifier", + IllegalArgumentException.class, + "location.lon is not an identifier field", + () -> sql("ALTER TABLE %s DROP IDENTIFIER FIELDS location.lon", tableName)); + } +}