diff --git a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index d5f4fd4eaa86..216a25e2638b 100644 --- a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -76,6 +76,7 @@ statement | ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag | ALTER TABLE multipartIdentifier DROP BRANCH (IF EXISTS)? identifier #dropBranch + | ALTER TABLE multipartIdentifier DROP TAG (IF EXISTS)? identifier #dropTag ; createReplaceTagClause diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 8c93295813c3..7c17ea667e0b 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -214,7 +214,8 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI normalized.contains("replace branch") || normalized.contains("create tag") || normalized.contains("replace tag") || - normalized.contains("drop branch") + normalized.contains("drop branch") || + normalized.contains("drop tag") } protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = { diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 94317288dfd7..f758cb08fd3d 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -45,6 +45,7 @@ import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag import org.apache.spark.sql.catalyst.plans.logical.DropBranch import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField +import org.apache.spark.sql.catalyst.plans.logical.DropTag import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.NamedArgument import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument @@ -169,6 +170,13 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS DropBranch(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null) } + /** + * Create an DROP TAG logical command. + */ + override def visitDropTag(ctx: DropTagContext): DropTag = withOrigin(ctx) { + DropTag(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null) + } + /** * Create an REPLACE PARTITION FIELD logical command. */ diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropTag.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropTag.scala new file mode 100644 index 000000000000..7e4b38e74d2f --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropTag.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class DropTag(table: Seq[String], tag: String, ifExists: Boolean) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"DropTag tag: ${tag} for table: ${table.quoted}" + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTagExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTagExec.scala new file mode 100644 index 000000000000..8df88765a986 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTagExec.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.TableCatalog + +case class DropTagExec( + catalog: TableCatalog, + ident: Identifier, + tag: String, + ifExists: Boolean) extends LeafV2CommandExec { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable => + val ref = iceberg.table().refs().get(tag) + if (ref != null || !ifExists) { + iceberg.table().manageSnapshots().removeTag(tag).commit() + } + + case table => + throw new UnsupportedOperationException(s"Cannot drop tag on non-Iceberg table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"DropTag tag: ${tag} for table: ${ident.quoted}" + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index f996e5918e04..ae582c958c47 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable import org.apache.spark.sql.catalyst.plans.logical.DropBranch import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField +import org.apache.spark.sql.catalyst.plans.logical.DropTag import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.MergeRows import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode @@ -74,6 +75,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) => DropBranchExec(catalog, ident, branch, ifExists) :: Nil + case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) => + DropTagExec(catalog, ident, tag, ifExists) :: Nil + case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) => DropPartitionFieldExec(catalog, ident, transform) :: Nil diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index 280c75f8c6c4..cc60be55ba0c 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -238,6 +238,28 @@ public void testDropBranchDoesNotExist() { () -> sql("ALTER TABLE %s DROP BRANCH %s", tableName, "nonExistingBranch")); } + @Test + public void testDropBranchFailsForTag() throws NoSuchTableException { + String tagName = "b1"; + Table table = insertRows(); + table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit(); + + AssertHelpers.assertThrows( + "Cannot perform drop branch on tag", + IllegalArgumentException.class, + "Ref b1 is a tag not a branch", + () -> sql("ALTER TABLE %s DROP BRANCH %s", tableName, tagName)); + } + + @Test + public void testDropBranchNonConformingName() { + AssertHelpers.assertThrows( + "Non-conforming branch name", + IcebergParseException.class, + "mismatched input '123'", + () -> sql("ALTER TABLE %s DROP BRANCH %s", tableName, "123")); + } + @Test public void testDropMainBranchFails() { AssertHelpers.assertThrows( diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java index def040c15e59..25efaaf766ea 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java @@ -85,8 +85,10 @@ public void testCreateTagWithRetain() throws NoSuchTableException { tableName, tagName, firstSnapshotId, maxRefAge, timeUnit); table.refresh(); SnapshotRef ref = table.refs().get(tagName); - Assert.assertEquals(firstSnapshotId, ref.snapshotId()); Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", firstSnapshotId, ref.snapshotId()); + Assert.assertEquals( + "The tag needs to have the correct max ref age.", TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); } @@ -132,8 +134,10 @@ public void testCreateTagUseDefaultConfig() throws NoSuchTableException { sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName); table.refresh(); SnapshotRef ref = table.refs().get(tagName); - Assert.assertEquals(snapshotId, ref.snapshotId()); - Assert.assertNull(ref.maxRefAgeMs()); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", snapshotId, ref.snapshotId()); + Assert.assertNull( + "The tag needs to have the default max ref age, which is null.", ref.maxRefAgeMs()); AssertHelpers.assertThrows( "Cannot create an exist tag", @@ -156,8 +160,10 @@ public void testCreateTagUseDefaultConfig() throws NoSuchTableException { sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId); table.refresh(); ref = table.refs().get(tagName); - Assert.assertEquals(snapshotId, ref.snapshotId()); - Assert.assertNull(ref.maxRefAgeMs()); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", snapshotId, ref.snapshotId()); + Assert.assertNull( + "The tag needs to have the default max ref age, which is null.", ref.maxRefAgeMs()); } @Test @@ -170,8 +176,14 @@ public void testCreateTagIfNotExists() throws NoSuchTableException { table.refresh(); SnapshotRef ref = table.refs().get(tagName); - Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId()); - Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxRefAgeMs().longValue()); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", + table.currentSnapshot().snapshotId(), + ref.snapshotId()); + Assert.assertEquals( + "The tag needs to have the correct max ref age.", + TimeUnit.DAYS.toMillis(maxSnapshotAge), + ref.maxRefAgeMs().longValue()); } @Test @@ -208,8 +220,12 @@ public void testReplaceTag() throws NoSuchTableException { sql("ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", tableName, tagName, second); table.refresh(); SnapshotRef ref = table.refs().get(tagName); - Assert.assertEquals(second, ref.snapshotId()); - Assert.assertEquals(expectedMaxRefAgeMs, ref.maxRefAgeMs().longValue()); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", second, ref.snapshotId()); + Assert.assertEquals( + "The tag needs to have the correct max ref age.", + expectedMaxRefAgeMs, + ref.maxRefAgeMs().longValue()); } @Test @@ -243,9 +259,12 @@ public void testReplaceTagWithRetain() throws NoSuchTableException { table.refresh(); SnapshotRef ref = table.refs().get(tagName); - Assert.assertEquals(second, ref.snapshotId()); Assert.assertEquals( - TimeUnit.valueOf(timeUnit).toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); + "The tag needs to point to a specific snapshot id.", second, ref.snapshotId()); + Assert.assertEquals( + "The tag needs to have the correct max ref age.", + TimeUnit.valueOf(timeUnit).toMillis(maxRefAge), + ref.maxRefAgeMs().longValue()); } } @@ -261,7 +280,78 @@ public void testCreateOrReplace() throws NoSuchTableException { sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName, tagName, first); table.refresh(); SnapshotRef ref = table.refs().get(tagName); - Assert.assertEquals(first, ref.snapshotId()); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", first, ref.snapshotId()); + } + + @Test + public void testDropTag() throws NoSuchTableException { + insertRows(); + Table table = validationCatalog.loadTable(tableIdent); + String tagName = "t1"; + table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", + table.currentSnapshot().snapshotId(), + ref.snapshotId()); + + sql("ALTER TABLE %s DROP TAG %s", tableName, tagName); + table.refresh(); + ref = table.refs().get(tagName); + Assert.assertNull("The tag needs to be dropped.", ref); + } + + @Test + public void testDropTagNonConformingName() { + AssertHelpers.assertThrows( + "Non-conforming tag name", + IcebergParseException.class, + "mismatched input '123'", + () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "123")); + } + + @Test + public void testDropTagDoesNotExist() { + AssertHelpers.assertThrows( + "Cannot perform drop tag on tag which does not exist", + IllegalArgumentException.class, + "Tag does not exist: nonExistingTag", + () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "nonExistingTag")); + } + + @Test + public void testDropTagFailesForBranch() throws NoSuchTableException { + String branchName = "b1"; + Table table = insertRows(); + table.manageSnapshots().createBranch(branchName, table.currentSnapshot().snapshotId()).commit(); + + AssertHelpers.assertThrows( + "Cannot perform drop tag on branch", + IllegalArgumentException.class, + "Ref b1 is a branch not a tag", + () -> sql("ALTER TABLE %s DROP TAG %s", tableName, branchName)); + } + + @Test + public void testDropTagIfExists() throws NoSuchTableException { + String tagName = "nonExistingTag"; + Table table = insertRows(); + Assert.assertNull("The tag does not exists.", table.refs().get(tagName)); + + sql("ALTER TABLE %s DROP TAG IF EXISTS %s", tableName, tagName); + table.refresh(); + Assert.assertNull("The tag still does not exist.", table.refs().get(tagName)); + + table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit(); + Assert.assertEquals( + "The tag has been created successfully.", + table.currentSnapshot().snapshotId(), + table.refs().get(tagName).snapshotId()); + + sql("ALTER TABLE %s DROP TAG IF EXISTS %s", tableName, tagName); + table.refresh(); + Assert.assertNull("The tag needs to be dropped.", table.refs().get(tagName)); } private Table insertRows() throws NoSuchTableException {