diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java index 71e6f324380f..37c12c662e08 100644 --- a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java @@ -57,6 +57,13 @@ public abstract class ViewCatalogTests viewOps.commit(current, sparkUpdate)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Cannot commit"); + + View updatedView = catalog().loadView(identifier); + ViewVersion viewVersion = updatedView.currentVersion(); + assertThat(viewVersion.versionId()).isEqualTo(2); + assertThat(updatedView.versions()).hasSize(2); + assertThat(updatedView.version(1)) + .isEqualTo( + ImmutableViewVersion.builder() + .timestampMillis(updatedView.version(1).timestampMillis()) + .versionId(1) + .schemaId(0) + .summary(updatedView.version(1).summary()) + .defaultNamespace(identifier.namespace()) + .addRepresentations( + ImmutableSQLViewRepresentation.builder() + .sql("select id, data from ns.tbl") + .dialect("trino") + .build()) + .build()); + assertThat(updatedView.schemas().get(updatedView.version(1).schemaId()).sameSchema(SCHEMA)); + + assertThat(updatedView.version(2)) + .isEqualTo( + ImmutableViewVersion.builder() + .timestampMillis(updatedView.version(2).timestampMillis()) + .versionId(2) + .schemaId(1) + .summary(updatedView.version(2).summary()) + .defaultNamespace(identifier.namespace()) + .addRepresentations( + ImmutableSQLViewRepresentation.builder() + .sql("select id, data, extra from ns.tbl") + .dialect("trino") + .build()) + .build()); + assertThat( + updatedView + .schemas() + .get(updatedView.version(2).schemaId()) + .sameSchema(SCHEMA_WITH_EXTRA_COL)); + } + } + + @Test + public void concurrentIdenticalUpdates() { + TableIdentifier identifier = TableIdentifier.of("ns", "view"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(identifier.namespace()); + } + + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + + View view = + catalog() + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery("spark", "select id, data from ns.tbl") + .create(); + + assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue(); + + ReplaceViewVersion replaceViewVersion = + view.replaceVersion() + .withQuery("spark", "select id, data, extra from ns.tbl") + .withSchema(SCHEMA_WITH_EXTRA_COL) + .withDefaultNamespace(identifier.namespace()); + + ViewOperations viewOps = ((BaseView) view).operations(); + ViewMetadata current = viewOps.current(); + + // simulate a concurrent update with identical changes (expects idempotent behavior) + ViewMetadata update1 = ((ViewVersionReplace) replaceViewVersion).internalApply(); + ViewMetadata update2 = ((ViewVersionReplace) replaceViewVersion).internalApply(); + + viewOps.commit(current, update1); + + if (supportsServerSideRetry()) { + // retry should succeed and the changes should be applied + viewOps.commit(current, update2); + } else { + assertThatThrownBy(() -> viewOps.commit(current, update2)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Cannot commit"); + } + + View updatedView = catalog().loadView(identifier); + + // idempotency: verify the latest version matches either of the + // desired schema and sql string, no matter the type of catalog + assertThat(updatedView.schema().sameSchema(SCHEMA_WITH_EXTRA_COL)); + assertThat( + ((SQLViewRepresentation) updatedView.currentVersion().representations().get(0)).sql()) + .isEqualTo("select id, data, extra from ns.tbl"); + } + @Test public void testSqlForMultipleDialects() { TableIdentifier identifier = TableIdentifier.of("ns", "view");