Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ public abstract class ViewCatalogTests<C extends ViewCatalog & SupportsNamespace
private static final Schema OTHER_SCHEMA =
new Schema(7, required(1, "some_id", Types.IntegerType.get()));

private static final Schema SCHEMA_WITH_EXTRA_COL =
new Schema(
9,
required(3, "id", Types.IntegerType.get(), "unique ID"),
required(4, "data", Types.StringType.get()),
required(8, "extra", Types.StringType.get()));

protected abstract C catalog();

protected abstract Catalog tableCatalog();
Expand Down Expand Up @@ -1845,6 +1852,209 @@ public void concurrentReplaceViewVersion() {
}
}

@Test
public void concurrentUpdateViewSchema() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for other reviewers: this test makes sense to me and also verifies what was fixed by #14434. Removing the fix from #14434 makes this test fail, so it's good to have this test here

TableIdentifier identifier = TableIdentifier.of("ns", "view");

if (requiresNamespaceCreate()) {
catalog().createNamespace(identifier.namespace());
}

assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse();

View view =
catalog()
.buildView(identifier)
.withSchema(SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery("trino", "select id, data from ns.tbl")
.withProperty(ViewProperties.REPLACE_DROP_DIALECT_ALLOWED, "true")
.create();

assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue();

ReplaceViewVersion replaceViewVersionOne =
view.replaceVersion()
.withQuery("trino", "select id, data, extra from ns.tbl")
.withSchema(SCHEMA_WITH_EXTRA_COL)
.withDefaultNamespace(identifier.namespace());

ReplaceViewVersion replaceViewVersionTwo =
view.replaceVersion()
.withQuery("spark", "select count(id, data) from ns.tbl")
.withSchema(OTHER_SCHEMA)
.withDefaultNamespace(identifier.namespace());

// simulate a concurrent replace of the view version with 2 different schema updates
ViewOperations viewOps = ((BaseView) view).operations();
ViewMetadata current = viewOps.current();

ViewMetadata trinoUpdate = ((ViewVersionReplace) replaceViewVersionOne).internalApply();
ViewMetadata sparkUpdate = ((ViewVersionReplace) replaceViewVersionTwo).internalApply();

viewOps.commit(current, trinoUpdate);

if (supportsServerSideRetry()) {
// retry should succeed and the changes should be applied
viewOps.commit(current, sparkUpdate);

View updatedView = catalog().loadView(identifier);
ViewVersion viewVersion = updatedView.currentVersion();
assertThat(viewVersion.versionId()).isEqualTo(3);
assertThat(updatedView.versions()).hasSize(3);
assertThat(updatedView.version(1))
.isEqualTo(
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(1).timestampMillis())
.versionId(1)
.schemaId(0)
.summary(updatedView.version(1).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
ImmutableSQLViewRepresentation.builder()
.sql("select id, data from ns.tbl")
.dialect("trino")
.build())
.build());
assertThat(updatedView.schemas().get(updatedView.version(1).schemaId()).sameSchema(SCHEMA));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the assertion isn't doing anything. Also to be consistent with other checks in this class, can you please update it to assertThat(updatedView.schemas().get(updatedView.version(1).schemaId()).asStruct()) .isEqualTo(SCHEMA.asStruct());


assertThat(updatedView.version(2))
.isEqualTo(
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(2).timestampMillis())
.versionId(2)
.schemaId(1)
.summary(updatedView.version(2).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
ImmutableSQLViewRepresentation.builder()
.sql("select id, data, extra from ns.tbl")
.dialect("trino")
.build())
.build());
assertThat(
updatedView
.schemas()
.get(updatedView.version(2).schemaId())
.sameSchema(SCHEMA_WITH_EXTRA_COL));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update this check and other checks as well


assertThat(updatedView.version(3))
.isEqualTo(
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(3).timestampMillis())
.versionId(3)
.schemaId(2)
.summary(updatedView.version(3).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
ImmutableSQLViewRepresentation.builder()
.sql("select count(id, data) from ns.tbl")
.dialect("spark")
.build())
.build());
assertThat(
updatedView.schemas().get(updatedView.version(3).schemaId()).sameSchema(OTHER_SCHEMA));
} else {
assertThatThrownBy(() -> viewOps.commit(current, sparkUpdate))
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("Cannot commit");

View updatedView = catalog().loadView(identifier);
ViewVersion viewVersion = updatedView.currentVersion();
assertThat(viewVersion.versionId()).isEqualTo(2);
assertThat(updatedView.versions()).hasSize(2);
assertThat(updatedView.version(1))
.isEqualTo(
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(1).timestampMillis())
.versionId(1)
.schemaId(0)
.summary(updatedView.version(1).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
ImmutableSQLViewRepresentation.builder()
.sql("select id, data from ns.tbl")
.dialect("trino")
.build())
.build());
assertThat(updatedView.schemas().get(updatedView.version(1).schemaId()).sameSchema(SCHEMA));

assertThat(updatedView.version(2))
.isEqualTo(
ImmutableViewVersion.builder()
.timestampMillis(updatedView.version(2).timestampMillis())
.versionId(2)
.schemaId(1)
.summary(updatedView.version(2).summary())
.defaultNamespace(identifier.namespace())
.addRepresentations(
ImmutableSQLViewRepresentation.builder()
.sql("select id, data, extra from ns.tbl")
.dialect("trino")
.build())
.build());
assertThat(
updatedView
.schemas()
.get(updatedView.version(2).schemaId())
.sameSchema(SCHEMA_WITH_EXTRA_COL));
}
}

@Test
public void concurrentIdenticalUpdates() {
TableIdentifier identifier = TableIdentifier.of("ns", "view");

if (requiresNamespaceCreate()) {
catalog().createNamespace(identifier.namespace());
}

assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse();

View view =
catalog()
.buildView(identifier)
.withSchema(SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery("spark", "select id, data from ns.tbl")
.create();

assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue();

ReplaceViewVersion replaceViewVersion =
view.replaceVersion()
.withQuery("spark", "select id, data, extra from ns.tbl")
.withSchema(SCHEMA_WITH_EXTRA_COL)
.withDefaultNamespace(identifier.namespace());

ViewOperations viewOps = ((BaseView) view).operations();
ViewMetadata current = viewOps.current();

// simulate a concurrent update with identical changes (expects idempotent behavior)
ViewMetadata update1 = ((ViewVersionReplace) replaceViewVersion).internalApply();
ViewMetadata update2 = ((ViewVersionReplace) replaceViewVersion).internalApply();

viewOps.commit(current, update1);

if (supportsServerSideRetry()) {
// retry should succeed and the changes should be applied
viewOps.commit(current, update2);
} else {
assertThatThrownBy(() -> viewOps.commit(current, update2))
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("Cannot commit");
}

View updatedView = catalog().loadView(identifier);

// idempotency: verify the latest version matches either of the
// desired schema and sql string, no matter the type of catalog
assertThat(updatedView.schema().sameSchema(SCHEMA_WITH_EXTRA_COL));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertThat(updatedView.schema().sameSchema(SCHEMA_WITH_EXTRA_COL));
assertThat(updatedView.schema().asStruct()).isEqualTo(SCHEMA_WITH_EXTRA_COL.asStruct());

assertThat(
((SQLViewRepresentation) updatedView.currentVersion().representations().get(0)).sql())
.isEqualTo("select id, data, extra from ns.tbl");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might also want to check the history() and the versions() here


@Test
public void testSqlForMultipleDialects() {
TableIdentifier identifier = TableIdentifier.of("ns", "view");
Expand Down
Loading