diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java index 596f72bb592d..b006846566ba 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java @@ -40,6 +40,7 @@ import org.projectnessie.client.NessieConfigConstants; import org.projectnessie.client.api.CommitMultipleOperationsBuilder; import org.projectnessie.client.api.NessieApiV1; +import org.projectnessie.client.api.OnReferenceBuilder; import org.projectnessie.client.http.HttpClientException; import org.projectnessie.error.BaseNessieClientServerException; import org.projectnessie.error.NessieConflictException; @@ -132,7 +133,7 @@ private UpdateableReference loadReference(String requestedRef, String hash) { public List listTables(Namespace namespace) { try { - return api.getEntries().reference(getRef().getReference()).get().getEntries().stream() + return withReference(api.getEntries()).get().getEntries().stream() .filter(namespacePredicate(namespace)) .filter(e -> Content.Type.ICEBERG_TABLE == e.getType()) .map(this::toIdentifier) @@ -168,7 +169,7 @@ private TableIdentifier toIdentifier(EntriesResponse.Entry entry) { public IcebergTable table(TableIdentifier tableIdentifier) { try { ContentKey key = NessieUtil.toKey(tableIdentifier); - Content table = api.getContent().key(key).reference(getRef().getReference()).get().get(key); + Content table = withReference(api.getContent().key(key)).get().get(key); return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null; } catch (NessieNotFoundException e) { return null; @@ -178,11 +179,11 @@ public IcebergTable table(TableIdentifier tableIdentifier) { public void createNamespace(Namespace namespace, Map metadata) { try { getRef().checkMutable(); - getApi() - .createNamespace() - .reference(getRef().getReference()) - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .properties(metadata) + withReference( + getApi() + .createNamespace() + .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) + .properties(metadata)) .create(); refresh(); } catch (NessieNamespaceAlreadyExistsException e) { @@ -199,10 +200,10 @@ namespace, getRef().getName()), public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { try { GetNamespacesResponse response = - getApi() - .getMultipleNamespaces() - .reference(getRef().getReference()) - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) + withReference( + getApi() + .getMultipleNamespaces() + .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))) .get(); return response.getNamespaces().stream() .map(ns -> Namespace.of(ns.getElements().toArray(new String[0]))) @@ -219,10 +220,10 @@ namespace, getRef().getName()), public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { try { getRef().checkMutable(); - getApi() - .deleteNamespace() - .reference(getRef().getReference()) - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) + withReference( + getApi() + .deleteNamespace() + .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))) .delete(); refresh(); return true; @@ -244,10 +245,10 @@ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyExcept public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { try { - return getApi() - .getNamespace() - .reference(getRef().getReference()) - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) + return withReference( + getApi() + .getNamespace() + .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))) .get() .getProperties(); } catch (NessieNamespaceNotFoundException e) { @@ -263,11 +264,11 @@ namespace, getRef().getName()), public boolean setProperties(Namespace namespace, Map properties) { try { - getApi() - .updateProperties() - .reference(getRef().getReference()) - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .updateProperties(properties) + withReference( + getApi() + .updateProperties() + .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) + .updateProperties(properties)) .update(); refresh(); // always successful, otherwise an exception is thrown @@ -285,11 +286,11 @@ namespace, getRef().getName()), public boolean removeProperties(Namespace namespace, Set properties) { try { - getApi() - .updateProperties() - .reference(getRef().getReference()) - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .removeProperties(properties) + withReference( + getApi() + .updateProperties() + .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) + .removeProperties(properties)) .update(); refresh(); // always successful, otherwise an exception is thrown @@ -335,7 +336,7 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { .onFailure((o, exception) -> refresh()) .run( ops -> { - Branch branch = ops.branch(getRef().getAsBranch()).commit(); + Branch branch = ops.branch((Branch) getRef().getReference()).commit(); getRef().updateReference(branch); }, BaseNessieClientServerException.class); @@ -402,7 +403,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { .onFailure((o, exception) -> refresh()) .run( commitBuilder -> { - Branch branch = commitBuilder.branch(getRef().getAsBranch()).commit(); + Branch branch = commitBuilder.branch((Branch) getRef().getReference()).commit(); getRef().updateReference(branch); }, BaseNessieClientServerException.class); @@ -431,7 +432,7 @@ public void commitTable( updateableReference.checkMutable(); - Branch current = updateableReference.getAsBranch(); + Branch current = (Branch) updateableReference.getReference(); Branch expectedHead = current; if (base != null) { String metadataCommitId = @@ -491,6 +492,16 @@ private boolean isSnapshotOperation(TableMetadata base, TableMetadata metadata) || snapshot.snapshotId() != base.currentSnapshot().snapshotId()); } + private > T withReference(T builder) { + UpdateableReference ref = getRef(); + if (!ref.isMutable()) { + builder.reference(ref.getReference()); + } else { + builder.refName(ref.getName()); + } + return builder; + } + private String buildCommitMsg(TableMetadata base, TableMetadata metadata, String tableName) { if (isSnapshotOperation(base, metadata)) { return String.format( diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java b/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java index 7e49457981bf..28ef7fe7c22b 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/UpdateableReference.java @@ -52,21 +52,10 @@ public void updateReference(Reference ref) { this.reference = Preconditions.checkNotNull(ref, "ref is null"); } - public boolean isBranch() { - return reference instanceof Branch; - } - public String getHash() { return reference.getHash(); } - public Branch getAsBranch() { - if (!isBranch()) { - throw new IllegalArgumentException("Reference is not a branch"); - } - return (Branch) reference; - } - public Reference getReference() { return reference; } @@ -79,4 +68,8 @@ public void checkMutable() { public String getName() { return reference.getName(); } + + public boolean isMutable() { + return mutable; + } } diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java new file mode 100644 index 000000000000..8da96c224d51 --- /dev/null +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.nessie; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.net.URI; +import java.util.AbstractMap; +import java.util.Collections; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.projectnessie.client.ext.NessieClientFactory; +import org.projectnessie.client.ext.NessieClientUri; + +public class TestMultipleClients extends BaseTestIceberg { + + private static final String BRANCH = "multiple-clients-test"; + private static final Schema schema = + new Schema(Types.StructType.of(required(1, "id", Types.LongType.get())).fields()); + + public TestMultipleClients() { + super(BRANCH); + } + + // another client that connects to the same nessie server. + NessieCatalog anotherCatalog; + + @Override + @BeforeEach + public void beforeEach(NessieClientFactory clientFactory, @NessieClientUri URI nessieUri) + throws IOException { + super.beforeEach(clientFactory, nessieUri); + anotherCatalog = initCatalog(branch); + } + + @AfterEach + public void afterEach() throws Exception { + anotherCatalog.close(); + } + + @Test + public void testListNamespaces() { + catalog.createNamespace(Namespace.of("db1"), Collections.emptyMap()); + Assertions.assertThat(catalog.listNamespaces()).containsExactlyInAnyOrder(Namespace.of("db1")); + + // another client creates a namespace with the same nessie server + anotherCatalog.createNamespace(Namespace.of("db2"), Collections.emptyMap()); + Assertions.assertThat(anotherCatalog.listNamespaces()) + .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2")); + + Assertions.assertThat(catalog.listNamespaces()) + .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2")); + } + + @Test + public void testLoadNamespaceMetadata() { + catalog.createNamespace(Namespace.of("namespace1"), Collections.emptyMap()); + Assertions.assertThat(catalog.listNamespaces()) + .containsExactlyInAnyOrder(Namespace.of("namespace1")); + + // another client adds a metadata to the same namespace + anotherCatalog.setProperties(Namespace.of("namespace1"), Collections.singletonMap("k1", "v1")); + AbstractMap.SimpleEntry entry = new AbstractMap.SimpleEntry<>("k1", "v1"); + Assertions.assertThat(anotherCatalog.loadNamespaceMetadata(Namespace.of("namespace1"))) + .containsExactly(entry); + + Assertions.assertThat(catalog.loadNamespaceMetadata(Namespace.of("namespace1"))) + .containsExactly(entry); + } + + @Test + public void testListTables() { + catalog.createTable(TableIdentifier.parse("foo.tbl1"), schema); + Assertions.assertThat(catalog.listTables(Namespace.of("foo"))) + .containsExactlyInAnyOrder(TableIdentifier.parse("foo.tbl1")); + + // another client creates a table with the same nessie server + anotherCatalog.createTable(TableIdentifier.parse("foo.tbl2"), schema); + Assertions.assertThat(anotherCatalog.listTables(Namespace.of("foo"))) + .containsExactlyInAnyOrder( + TableIdentifier.parse("foo.tbl1"), TableIdentifier.parse("foo.tbl2")); + + Assertions.assertThat(catalog.listTables(Namespace.of("foo"))) + .containsExactlyInAnyOrder( + TableIdentifier.parse("foo.tbl1"), TableIdentifier.parse("foo.tbl2")); + } + + @Test + public void testCommits() { + TableIdentifier identifier = TableIdentifier.parse("foo.tbl1"); + catalog.createTable(identifier, schema); + Table tableFromCatalog = catalog.loadTable(identifier); + tableFromCatalog.updateSchema().addColumn("x1", Types.LongType.get()).commit(); + + Table tableFromAnotherCatalog = anotherCatalog.loadTable(identifier); + tableFromAnotherCatalog.updateSchema().addColumn("x2", Types.LongType.get()).commit(); + + tableFromCatalog.updateSchema().addColumn("x3", Types.LongType.get()).commit(); + tableFromAnotherCatalog.updateSchema().addColumn("x4", Types.LongType.get()).commit(); + + Assertions.assertThat(catalog.loadTable(identifier).schema().columns()).hasSize(5); + Assertions.assertThat(anotherCatalog.loadTable(identifier).schema().columns()).hasSize(5); + } + + @Test + public void testConcurrentCommitsWithRefresh() { + TableIdentifier identifier = TableIdentifier.parse("foo.tbl1"); + catalog.createTable(identifier, schema); + + String hashBefore = catalog.currentHash(); + + TableOperations ops1 = catalog.newTableOps(identifier); + TableMetadata metadata1 = + TableMetadata.buildFrom(ops1.current()).setProperties(ImmutableMap.of("k1", "v1")).build(); + + // commit should succeed + TableOperations ops2 = catalog.newTableOps(identifier); + TableMetadata metadata2 = + TableMetadata.buildFrom(ops2.current()).setProperties(ImmutableMap.of("k2", "v2")).build(); + ops2.commit(ops2.current(), metadata2); + + // refresh the catalog's client. + String hashAfter = catalog.currentHash(); + Assertions.assertThat(hashBefore).isNotEqualTo(hashAfter); + + // client refresh should not affect the ongoing commits (commit should still fail due staleness) + Assertions.assertThatThrownBy(() -> ops1.commit(ops1.current(), metadata1)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining( + "Cannot commit: Reference hash is out of date. Update the reference 'multiple-clients-test' and try again"); + } +}