diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala index 08230afb5a3f..6457875b15a4 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.iceberg.relocated.com.google.common.base.Preconditions import org.apache.iceberg.spark.source.SparkTable import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -41,10 +42,17 @@ case class CreateOrReplaceBranchExec( override protected def run(): Seq[InternalRow] = { catalog.loadTable(ident) match { case iceberg: SparkTable => - val snapshotId = branchOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId()) + val snapshotId: java.lang.Long = branchOptions.snapshotId + .orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId())) + .map(java.lang.Long.valueOf) + .orNull + + Preconditions.checkArgument(snapshotId != null, + "Cannot complete create or replace branch operation on %s, main has no snapshot", ident) + val manageSnapshots = iceberg.table().manageSnapshots() if (!replace) { - val ref = iceberg.table().refs().get(branch); + val ref = iceberg.table().refs().get(branch) if (ref != null && ifNotExists) { return Nil } @@ -76,6 +84,6 @@ case class CreateOrReplaceBranchExec( } override def simpleString(maxFields: Int): String = { - s"CreateOrReplace branch: ${branch} for table: ${ident.quoted}" + s"CreateOrReplace branch: $branch for table: ${ident.quoted}" } } diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala index d41f9f03ff4c..7ca193d1b156 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.iceberg.relocated.com.google.common.base.Preconditions import org.apache.iceberg.spark.source.SparkTable import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -40,10 +41,17 @@ case class CreateOrReplaceTagExec( override protected def run(): Seq[InternalRow] = { catalog.loadTable(ident) match { case iceberg: SparkTable => - val snapshotId = tagOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId()) + val snapshotId: java.lang.Long = tagOptions.snapshotId + .orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId())) + .map(java.lang.Long.valueOf) + .orNull + + Preconditions.checkArgument(snapshotId != null, + "Cannot complete create or replace tag operation on %s, main has no snapshot", ident) + val manageSnapshot = iceberg.table.manageSnapshots() if (!replace) { - val ref = iceberg.table().refs().get(tag); + val ref = iceberg.table().refs().get(tag) if (ref != null && ifNotExists) { return Nil } @@ -67,6 +75,6 @@ case class CreateOrReplaceTagExec( } override def simpleString(maxFields: Int): String = { - s"Create tag: ${tag} for table: ${ident.quoted}" + s"Create tag: $tag for table: ${ident.quoted}" } } diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index cc60be55ba0c..5dd8c751863a 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -31,6 +31,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -89,6 +90,15 @@ public void testCreateBranch() throws NoSuchTableException { () -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName)); } + @Test + public void testCreateBranchOnEmptyTable() { + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot complete create or replace branch operation on %s, main has no snapshot", + tableName); + } + @Test public void testCreateBranchUseDefaultConfig() throws NoSuchTableException { Table table = insertRows(); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java index 25efaaf766ea..ec3148de6cb5 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java @@ -33,6 +33,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -119,6 +120,15 @@ public void testCreateTagWithRetain() throws NoSuchTableException { tableName, tagName, firstSnapshotId, maxRefAge)); } + @Test + public void testCreateTagOnEmptyTable() { + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "abc")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot complete create or replace tag operation on %s, main has no snapshot", + tableName); + } + @Test public void testCreateTagUseDefaultConfig() throws NoSuchTableException { Table table = insertRows();