diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 8507fb1c9f80..e8547f11d345 100644 --- a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -110,6 +110,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS val branchRetention = branchOptionsContext.flatMap(branchOptions => Option(branchOptions.refRetain())) val branchRefAgeMs = branchRetention.map(retain => TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong)) + val create = createOrReplaceBranchClause.CREATE() != null val replace = ctx.createReplaceBranchClause().REPLACE() != null val ifNotExists = createOrReplaceBranchClause.EXISTS() != null @@ -124,6 +125,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS typedVisit[Seq[String]](ctx.multipartIdentifier), branchName.getText, branchOptions, + create, replace, ifNotExists) } diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala index f555a9a98a06..06ee4cf1625c 100644 --- a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala +++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala @@ -25,6 +25,7 @@ case class CreateOrReplaceBranch( table: Seq[String], branch: String, branchOptions: BranchOptions, + create: Boolean, replace: Boolean, ifNotExists: Boolean) extends Command { diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala index 9b378cf84e25..651b5c62e159 100644 --- a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala +++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala @@ -32,6 +32,7 @@ case class CreateOrReplaceBranchExec( ident: Identifier, branch: String, branchOptions: BranchOptions, + create: Boolean, replace: Boolean, ifNotExists: Boolean) extends V2CommandExec { @@ -51,15 +52,18 @@ case class CreateOrReplaceBranchExec( "Cannot complete create or replace branch operation on %s, main has no snapshot", ident) val manageSnapshots = iceberg.table().manageSnapshots() - if (!replace) { - val ref = iceberg.table().refs().get(branch) - if (ref != null && ifNotExists) { + val refExists = null != iceberg.table().refs().get(branch) + + if (create && replace && !refExists) { + manageSnapshots.createBranch(branch, snapshotId) + } else if (replace) { + manageSnapshots.replaceBranch(branch, snapshotId) + } else { + if (refExists && ifNotExists) { return Nil } manageSnapshots.createBranch(branch, snapshotId) - } else { - manageSnapshots.replaceBranch(branch, snapshotId) } if (branchOptions.numSnapshots.nonEmpty) { diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index fdf8cb56d869..a86c84c6416c 100644 --- a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -71,8 +71,8 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy { ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil case CreateOrReplaceBranch( - IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) => - CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, replace, ifNotExists) :: Nil + IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, create, replace, ifNotExists) => + CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, create, replace, ifNotExists) :: Nil case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) => DropBranchExec(catalog, ident, branch, ifExists) :: Nil diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index 76e92f624f1a..f23dc58af60b 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -511,6 +513,71 @@ public void testCreateOrReplace() throws NoSuchTableException { Assert.assertEquals(first, ref.snapshotId()); } + @Test + public void createOrReplace() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String branchName = "b1"; + insertRows(); + long second = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch(branchName, second).commit(); + + sql( + "ALTER TABLE %s CREATE OR REPLACE BRANCH %s AS OF VERSION %d", + tableName, branchName, first); + table.refresh(); + assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second); + } + + @Test + public void createOrReplaceWithNonExistingBranch() throws NoSuchTableException { + Table table = insertRows(); + String branchName = "b1"; + insertRows(); + long snapshotId = table.currentSnapshot().snapshotId(); + + sql( + "ALTER TABLE %s CREATE OR REPLACE BRANCH %s AS OF VERSION %d", + tableName, branchName, snapshotId); + table.refresh(); + assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(snapshotId); + } + + @Test + public void replaceBranch() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String branchName = "b1"; + long expectedMaxRefAgeMs = 1000; + table + .manageSnapshots() + .createBranch(branchName, first) + .setMaxRefAgeMs(branchName, expectedMaxRefAgeMs) + .commit(); + + insertRows(); + long second = table.currentSnapshot().snapshotId(); + + sql("ALTER TABLE %s REPLACE BRANCH %s AS OF VERSION %d", tableName, branchName, second); + table.refresh(); + SnapshotRef ref = table.refs().get(branchName); + assertThat(ref.snapshotId()).isEqualTo(second); + assertThat(ref.maxRefAgeMs()).isEqualTo(expectedMaxRefAgeMs); + } + + @Test + public void replaceBranchDoesNotExist() throws NoSuchTableException { + Table table = insertRows(); + + Assertions.assertThatThrownBy( + () -> + sql( + "ALTER TABLE %s REPLACE BRANCH %s AS OF VERSION %d", + tableName, "someBranch", table.currentSnapshot().snapshotId())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Branch does not exist: someBranch"); + } + private Table insertRows() throws NoSuchTableException { List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index f758cb08fd3d..2e438de2b8cd 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -116,6 +116,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS val branchRetention = branchOptionsContext.flatMap(branchOptions => Option(branchOptions.refRetain())) val branchRefAgeMs = branchRetention.map(retain => TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong)) + val create = createOrReplaceBranchClause.CREATE() != null val replace = ctx.createReplaceBranchClause().REPLACE() != null val ifNotExists = createOrReplaceBranchClause.EXISTS() != null @@ -130,6 +131,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS typedVisit[Seq[String]](ctx.multipartIdentifier), branchName.getText, branchOptions, + create, replace, ifNotExists) } @@ -153,12 +155,14 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS tagRefAgeMs ) + val create = createTagClause.CREATE() != null val replace = createTagClause.REPLACE() != null val ifNotExists = createTagClause.EXISTS() != null CreateOrReplaceTag(typedVisit[Seq[String]](ctx.multipartIdentifier), tagName, tagOptions, + create, replace, ifNotExists) } diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala index 2a22484499cf..b7981a3c7a0d 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala @@ -25,6 +25,7 @@ case class CreateOrReplaceBranch( table: Seq[String], branch: String, branchOptions: BranchOptions, + create: Boolean, replace: Boolean, ifNotExists: Boolean) extends LeafCommand { diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala index e48f7d8ed04c..6e7db84a90fb 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala @@ -25,6 +25,7 @@ case class CreateOrReplaceTag( table: Seq[String], tag: String, tagOptions: TagOptions, + create: Boolean, replace: Boolean, ifNotExists: Boolean) extends LeafCommand { diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala index 8552ab132f45..142ed1357135 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala @@ -32,6 +32,7 @@ case class CreateOrReplaceBranchExec( ident: Identifier, branch: String, branchOptions: BranchOptions, + create: Boolean, replace: Boolean, ifNotExists: Boolean) extends LeafV2CommandExec { @@ -51,15 +52,18 @@ case class CreateOrReplaceBranchExec( "Cannot complete create or replace branch operation on %s, main has no snapshot", ident) val manageSnapshots = iceberg.table().manageSnapshots() - if (!replace) { - val ref = iceberg.table().refs().get(branch) - if (ref != null && ifNotExists) { + val refExists = null != iceberg.table().refs().get(branch) + + if (create && replace && !refExists) { + manageSnapshots.createBranch(branch, snapshotId) + } else if (replace) { + manageSnapshots.replaceBranch(branch, snapshotId) + } else { + if (refExists && ifNotExists) { return Nil } manageSnapshots.createBranch(branch, snapshotId) - } else { - manageSnapshots.replaceBranch(branch, snapshotId) } if (branchOptions.numSnapshots.nonEmpty) { diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala index 7ca193d1b156..372cd7548632 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala @@ -31,6 +31,7 @@ case class CreateOrReplaceTagExec( ident: Identifier, tag: String, tagOptions: TagOptions, + create: Boolean, replace: Boolean, ifNotExists: Boolean) extends LeafV2CommandExec { @@ -50,15 +51,18 @@ case class CreateOrReplaceTagExec( "Cannot complete create or replace tag operation on %s, main has no snapshot", ident) val manageSnapshot = iceberg.table.manageSnapshots() - if (!replace) { - val ref = iceberg.table().refs().get(tag) - if (ref != null && ifNotExists) { + val refExists = null != iceberg.table().refs().get(tag) + + if (create && replace && !refExists) { + manageSnapshot.createTag(tag, snapshotId) + } else if (replace) { + manageSnapshot.replaceTag(tag, snapshotId) + } else { + if (refExists && ifNotExists) { return Nil } manageSnapshot.createTag(tag, snapshotId) - } else { - manageSnapshot.replaceTag(tag, snapshotId) } if (tagOptions.snapshotRefRetain.nonEmpty) { diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 326574bf25e4..0a27d49287f2 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -72,11 +72,12 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil case CreateOrReplaceBranch( - IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) => - CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, replace, ifNotExists) :: Nil + IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, create, replace, ifNotExists) => + CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, create, replace, ifNotExists) :: Nil - case CreateOrReplaceTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, replace, ifNotExists) => - CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, replace, ifNotExists) :: Nil + case CreateOrReplaceTag( + IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, create, replace, ifNotExists) => + CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, create, replace, ifNotExists) :: Nil case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) => DropBranchExec(catalog, ident, branch, ifExists) :: Nil diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index 55a2a1c142b6..2c3cbac02820 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -512,6 +514,71 @@ public void testCreateOrReplace() throws NoSuchTableException { Assert.assertEquals(first, ref.snapshotId()); } + @Test + public void createOrReplace() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String branchName = "b1"; + insertRows(); + long second = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch(branchName, second).commit(); + + sql( + "ALTER TABLE %s CREATE OR REPLACE BRANCH %s AS OF VERSION %d", + tableName, branchName, first); + table.refresh(); + assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second); + } + + @Test + public void createOrReplaceWithNonExistingBranch() throws NoSuchTableException { + Table table = insertRows(); + String branchName = "b1"; + insertRows(); + long snapshotId = table.currentSnapshot().snapshotId(); + + sql( + "ALTER TABLE %s CREATE OR REPLACE BRANCH %s AS OF VERSION %d", + tableName, branchName, snapshotId); + table.refresh(); + assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(snapshotId); + } + + @Test + public void replaceBranch() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String branchName = "b1"; + long expectedMaxRefAgeMs = 1000; + table + .manageSnapshots() + .createBranch(branchName, first) + .setMaxRefAgeMs(branchName, expectedMaxRefAgeMs) + .commit(); + + insertRows(); + long second = table.currentSnapshot().snapshotId(); + + sql("ALTER TABLE %s REPLACE BRANCH %s AS OF VERSION %d", tableName, branchName, second); + table.refresh(); + SnapshotRef ref = table.refs().get(branchName); + assertThat(ref.snapshotId()).isEqualTo(second); + assertThat(ref.maxRefAgeMs()).isEqualTo(expectedMaxRefAgeMs); + } + + @Test + public void replaceBranchDoesNotExist() throws NoSuchTableException { + Table table = insertRows(); + + Assertions.assertThatThrownBy( + () -> + sql( + "ALTER TABLE %s REPLACE BRANCH %s AS OF VERSION %d", + tableName, "someBranch", table.currentSnapshot().snapshotId())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Branch does not exist: someBranch"); + } + private Table insertRows() throws NoSuchTableException { List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java index ec3148de6cb5..866a965e33e6 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.List; import java.util.Locale; import java.util.Map; @@ -364,6 +366,18 @@ public void testDropTagIfExists() throws NoSuchTableException { Assert.assertNull("The tag needs to be dropped.", table.refs().get(tagName)); } + @Test + public void createOrReplaceWithNonExistingTag() throws NoSuchTableException { + Table table = insertRows(); + String tagName = "t1"; + insertRows(); + long snapshotId = table.currentSnapshot().snapshotId(); + + sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId); + table.refresh(); + assertThat(table.refs().get(tagName).snapshotId()).isEqualTo(snapshotId); + } + private Table insertRows() throws NoSuchTableException { List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index f758cb08fd3d..2e438de2b8cd 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -116,6 +116,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS val branchRetention = branchOptionsContext.flatMap(branchOptions => Option(branchOptions.refRetain())) val branchRefAgeMs = branchRetention.map(retain => TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong)) + val create = createOrReplaceBranchClause.CREATE() != null val replace = ctx.createReplaceBranchClause().REPLACE() != null val ifNotExists = createOrReplaceBranchClause.EXISTS() != null @@ -130,6 +131,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS typedVisit[Seq[String]](ctx.multipartIdentifier), branchName.getText, branchOptions, + create, replace, ifNotExists) } @@ -153,12 +155,14 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS tagRefAgeMs ) + val create = createTagClause.CREATE() != null val replace = createTagClause.REPLACE() != null val ifNotExists = createTagClause.EXISTS() != null CreateOrReplaceTag(typedVisit[Seq[String]](ctx.multipartIdentifier), tagName, tagOptions, + create, replace, ifNotExists) } diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala index 2a22484499cf..b7981a3c7a0d 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceBranch.scala @@ -25,6 +25,7 @@ case class CreateOrReplaceBranch( table: Seq[String], branch: String, branchOptions: BranchOptions, + create: Boolean, replace: Boolean, ifNotExists: Boolean) extends LeafCommand { diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala index e48f7d8ed04c..6e7db84a90fb 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala @@ -25,6 +25,7 @@ case class CreateOrReplaceTag( table: Seq[String], tag: String, tagOptions: TagOptions, + create: Boolean, replace: Boolean, ifNotExists: Boolean) extends LeafCommand { diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala index 6457875b15a4..d4328d4b9227 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala @@ -32,6 +32,7 @@ case class CreateOrReplaceBranchExec( ident: Identifier, branch: String, branchOptions: BranchOptions, + create: Boolean, replace: Boolean, ifNotExists: Boolean) extends LeafV2CommandExec { @@ -51,15 +52,18 @@ case class CreateOrReplaceBranchExec( "Cannot complete create or replace branch operation on %s, main has no snapshot", ident) val manageSnapshots = iceberg.table().manageSnapshots() - if (!replace) { - val ref = iceberg.table().refs().get(branch) - if (ref != null && ifNotExists) { + val refExists = null != iceberg.table().refs().get(branch) + + if (create && replace && !refExists) { + manageSnapshots.createBranch(branch, snapshotId) + } else if (replace) { + manageSnapshots.replaceBranch(branch, snapshotId) + } else { + if (refExists && ifNotExists) { return Nil } manageSnapshots.createBranch(branch, snapshotId) - } else { - manageSnapshots.replaceBranch(branch, snapshotId) } if (branchOptions.numSnapshots.nonEmpty) { diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala index 7ca193d1b156..372cd7548632 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala @@ -31,6 +31,7 @@ case class CreateOrReplaceTagExec( ident: Identifier, tag: String, tagOptions: TagOptions, + create: Boolean, replace: Boolean, ifNotExists: Boolean) extends LeafV2CommandExec { @@ -50,15 +51,18 @@ case class CreateOrReplaceTagExec( "Cannot complete create or replace tag operation on %s, main has no snapshot", ident) val manageSnapshot = iceberg.table.manageSnapshots() - if (!replace) { - val ref = iceberg.table().refs().get(tag) - if (ref != null && ifNotExists) { + val refExists = null != iceberg.table().refs().get(tag) + + if (create && replace && !refExists) { + manageSnapshot.createTag(tag, snapshotId) + } else if (replace) { + manageSnapshot.replaceTag(tag, snapshotId) + } else { + if (refExists && ifNotExists) { return Nil } manageSnapshot.createTag(tag, snapshotId) - } else { - manageSnapshot.replaceTag(tag, snapshotId) } if (tagOptions.snapshotRefRetain.nonEmpty) { diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index b35e3a8d99ba..14e59b7f988e 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -66,11 +66,12 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi AddPartitionFieldExec(catalog, ident, transform, name) :: Nil case CreateOrReplaceBranch( - IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) => - CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, replace, ifNotExists) :: Nil + IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, create, replace, ifNotExists) => + CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, create, replace, ifNotExists) :: Nil - case CreateOrReplaceTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, replace, ifNotExists) => - CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, replace, ifNotExists) :: Nil + case CreateOrReplaceTag( + IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, create, replace, ifNotExists) => + CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, create, replace, ifNotExists) :: Nil case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) => DropBranchExec(catalog, ident, branch, ifExists) :: Nil diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index 5dd8c751863a..fcc124dee5b9 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -292,6 +294,71 @@ public void testDropBranchIfExists() { Assert.assertNull(ref); } + @Test + public void createOrReplace() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String branchName = "b1"; + insertRows(); + long second = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch(branchName, second).commit(); + + sql( + "ALTER TABLE %s CREATE OR REPLACE BRANCH %s AS OF VERSION %d", + tableName, branchName, first); + table.refresh(); + assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second); + } + + @Test + public void createOrReplaceWithNonExistingBranch() throws NoSuchTableException { + Table table = insertRows(); + String branchName = "b1"; + insertRows(); + long snapshotId = table.currentSnapshot().snapshotId(); + + sql( + "ALTER TABLE %s CREATE OR REPLACE BRANCH %s AS OF VERSION %d", + tableName, branchName, snapshotId); + table.refresh(); + assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(snapshotId); + } + + @Test + public void replaceBranch() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String branchName = "b1"; + long expectedMaxRefAgeMs = 1000; + table + .manageSnapshots() + .createBranch(branchName, first) + .setMaxRefAgeMs(branchName, expectedMaxRefAgeMs) + .commit(); + + insertRows(); + long second = table.currentSnapshot().snapshotId(); + + sql("ALTER TABLE %s REPLACE BRANCH %s AS OF VERSION %d", tableName, branchName, second); + table.refresh(); + SnapshotRef ref = table.refs().get(branchName); + assertThat(ref.snapshotId()).isEqualTo(second); + assertThat(ref.maxRefAgeMs()).isEqualTo(expectedMaxRefAgeMs); + } + + @Test + public void replaceBranchDoesNotExist() throws NoSuchTableException { + Table table = insertRows(); + + Assertions.assertThatThrownBy( + () -> + sql( + "ALTER TABLE %s REPLACE BRANCH %s AS OF VERSION %d", + tableName, "someBranch", table.currentSnapshot().snapshotId())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Branch does not exist: someBranch"); + } + private Table insertRows() throws NoSuchTableException { List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java index ec3148de6cb5..866a965e33e6 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.List; import java.util.Locale; import java.util.Map; @@ -364,6 +366,18 @@ public void testDropTagIfExists() throws NoSuchTableException { Assert.assertNull("The tag needs to be dropped.", table.refs().get(tagName)); } + @Test + public void createOrReplaceWithNonExistingTag() throws NoSuchTableException { + Table table = insertRows(); + String tagName = "t1"; + insertRows(); + long snapshotId = table.currentSnapshot().snapshotId(); + + sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId); + table.refresh(); + assertThat(table.refs().get(tagName).snapshotId()).isEqualTo(snapshotId); + } + private Table insertRows() throws NoSuchTableException { List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));