diff --git a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index ef070057a3da..d5f4fd4eaa86 100644 --- a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -75,6 +75,7 @@ statement | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields | ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag + | ALTER TABLE multipartIdentifier DROP BRANCH (IF EXISTS)? identifier #dropBranch ; createReplaceTagClause diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 946c10d193bf..8c93295813c3 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -213,7 +213,8 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI normalized.contains("create branch") || normalized.contains("replace branch") || normalized.contains("create tag") || - normalized.contains("replace tag") + normalized.contains("replace tag") || + normalized.contains("drop branch") } protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = { diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index b02897aa7664..94317288dfd7 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.logical.CallArgument import org.apache.spark.sql.catalyst.plans.logical.CallStatement import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag +import org.apache.spark.sql.catalyst.plans.logical.DropBranch import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -161,6 +162,13 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS ifNotExists) } + /** + * Create an DROP BRANCH logical command. + */ + override def visitDropBranch(ctx: DropBranchContext): DropBranch = withOrigin(ctx) { + DropBranch(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null) + } + /** * Create an REPLACE PARTITION FIELD logical command. */ diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala index 24d6bd3d9123..2a22484499cf 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala @@ -21,9 +21,12 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.expressions.Attribute -case class CreateOrReplaceBranch(table: Seq[String], branch: String, - branchOptions: BranchOptions, replace: Boolean, ifNotExists: Boolean) - extends LeafCommand { +case class CreateOrReplaceBranch( + table: Seq[String], + branch: String, + branchOptions: BranchOptions, + replace: Boolean, + ifNotExists: Boolean) extends LeafCommand { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropBranch.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropBranch.scala new file mode 100644 index 000000000000..bee0b0fae688 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropBranch.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class DropBranch(table: Seq[String], branch: String, ifExists: Boolean) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"DropBranch branch: ${branch} for table: ${table.quoted}" + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropBranchExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropBranchExec.scala new file mode 100644 index 000000000000..ff8f1820099a --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropBranchExec.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.TableCatalog + +case class DropBranchExec( + catalog: TableCatalog, + ident: Identifier, + branch: String, + ifExists: Boolean) extends LeafV2CommandExec { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable => + val ref = iceberg.table().refs().get(branch) + if (ref != null || !ifExists) { + iceberg.table().manageSnapshots().removeBranch(branch).commit() + } + + case table => + throw new UnsupportedOperationException(s"Cannot drop branch on non-Iceberg table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"DropBranch branch: ${branch} for table: ${ident.quoted}" + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 6c7ededf88bb..f996e5918e04 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Call import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable +import org.apache.spark.sql.catalyst.plans.logical.DropBranch import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -70,6 +71,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case CreateOrReplaceTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, replace, ifNotExists) => CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, replace, ifNotExists) :: Nil + case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) => + DropBranchExec(catalog, ident, branch, ifExists) :: Nil + case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) => DropPartitionFieldExec(catalog, ident, transform) :: Nil diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java similarity index 79% rename from spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java rename to spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index 2cee091ae4a6..280c75f8c6c4 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -33,10 +33,21 @@ import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runners.Parameterized; -public class TestCreateBranch extends SparkExtensionsTestBase { +public class TestBranchDDL extends SparkExtensionsTestBase { + + @Before + public void before() { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") public static Object[][] parameters() { @@ -49,18 +60,13 @@ public static Object[][] parameters() { }; } - public TestCreateBranch(String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } - - @After - public void removeTable() { - sql("DROP TABLE IF EXISTS %s", tableName); + public TestBranchDDL(String catalog, String implementation, Map properties) { + super(catalog, implementation, properties); } @Test public void testCreateBranch() throws NoSuchTableException { - Table table = createDefaultTableAndInsert2Row(); + Table table = insertRows(); long snapshotId = table.currentSnapshot().snapshotId(); String branchName = "b1"; Integer minSnapshotsToKeep = 2; @@ -79,13 +85,13 @@ public void testCreateBranch() throws NoSuchTableException { AssertHelpers.assertThrows( "Cannot create an existing branch", IllegalArgumentException.class, - "already exists", + "Ref b1 already exists", () -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName)); } @Test public void testCreateBranchUseDefaultConfig() throws NoSuchTableException { - Table table = createDefaultTableAndInsert2Row(); + Table table = insertRows(); String branchName = "b1"; sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName); table.refresh(); @@ -99,7 +105,7 @@ public void testCreateBranchUseDefaultConfig() throws NoSuchTableException { @Test public void testCreateBranchUseCustomMinSnapshotsToKeep() throws NoSuchTableException { Integer minSnapshotsToKeep = 2; - Table table = createDefaultTableAndInsert2Row(); + Table table = insertRows(); String branchName = "b1"; sql( "ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS", @@ -115,7 +121,7 @@ public void testCreateBranchUseCustomMinSnapshotsToKeep() throws NoSuchTableExce @Test public void testCreateBranchUseCustomMaxSnapshotAge() throws NoSuchTableException { long maxSnapshotAge = 2L; - Table table = createDefaultTableAndInsert2Row(); + Table table = insertRows(); String branchName = "b1"; sql( "ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d DAYS", @@ -131,7 +137,7 @@ public void testCreateBranchUseCustomMaxSnapshotAge() throws NoSuchTableExceptio @Test public void testCreateBranchIfNotExists() throws NoSuchTableException { long maxSnapshotAge = 2L; - Table table = createDefaultTableAndInsert2Row(); + Table table = insertRows(); String branchName = "b1"; sql( "ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d DAYS", @@ -151,7 +157,7 @@ public void testCreateBranchUseCustomMinSnapshotsToKeepAndMaxSnapshotAge() throws NoSuchTableException { Integer minSnapshotsToKeep = 2; long maxSnapshotAge = 2L; - Table table = createDefaultTableAndInsert2Row(); + Table table = insertRows(); String branchName = "b1"; sql( "ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS %d DAYS", @@ -174,7 +180,7 @@ public void testCreateBranchUseCustomMinSnapshotsToKeepAndMaxSnapshotAge() @Test public void testCreateBranchUseCustomMaxRefAge() throws NoSuchTableException { long maxRefAge = 10L; - Table table = createDefaultTableAndInsert2Row(); + Table table = insertRows(); String branchName = "b1"; sql("ALTER TABLE %s CREATE BRANCH %s RETAIN %d DAYS", tableName, branchName, maxRefAge); table.refresh(); @@ -206,9 +212,55 @@ public void testCreateBranchUseCustomMaxRefAge() throws NoSuchTableException { tableName, branchName, maxRefAge)); } - private Table createDefaultTableAndInsert2Row() throws NoSuchTableException { - sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + @Test + public void testDropBranch() throws NoSuchTableException { + insertRows(); + + Table table = validationCatalog.loadTable(tableIdent); + String branchName = "b1"; + table.manageSnapshots().createBranch(branchName, table.currentSnapshot().snapshotId()).commit(); + SnapshotRef ref = table.refs().get(branchName); + Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId()); + + sql("ALTER TABLE %s DROP BRANCH %s", tableName, branchName); + table.refresh(); + + ref = table.refs().get(branchName); + Assert.assertNull(ref); + } + + @Test + public void testDropBranchDoesNotExist() { + AssertHelpers.assertThrows( + "Cannot perform drop branch on branch which does not exist", + IllegalArgumentException.class, + "Branch does not exist: nonExistingBranch", + () -> sql("ALTER TABLE %s DROP BRANCH %s", tableName, "nonExistingBranch")); + } + + @Test + public void testDropMainBranchFails() { + AssertHelpers.assertThrows( + "Cannot drop the main branch", + IllegalArgumentException.class, + "Cannot remove main branch", + () -> sql("ALTER TABLE %s DROP BRANCH main", tableName)); + } + + @Test + public void testDropBranchIfExists() { + String branchName = "nonExistingBranch"; + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertNull(table.refs().get(branchName)); + + sql("ALTER TABLE %s DROP BRANCH IF EXISTS %s", tableName, branchName); + table.refresh(); + + SnapshotRef ref = table.refs().get(branchName); + Assert.assertNull(ref); + } + private Table insertRows() throws NoSuchTableException { List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); Dataset df = spark.createDataFrame(records, SimpleRecord.class);