Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ buildscript {

plugins {
id 'nebula.dependency-recommender' version '9.0.2'
id 'org.projectnessie' version '0.5.1'
}

try {
Expand Down Expand Up @@ -127,6 +126,7 @@ subprojects {
compile 'com.github.stephenc.findbugs:findbugs-annotations'

testCompile 'org.junit.vintage:junit-vintage-engine'
testCompile 'org.junit.jupiter:junit-jupiter-engine'
testCompile 'org.junit.jupiter:junit-jupiter'
testCompile 'org.slf4j:slf4j-simple'
testCompile 'org.mockito:mockito-core'
Expand Down Expand Up @@ -1266,19 +1266,18 @@ project(':iceberg-pig') {
}

project(':iceberg-nessie') {
apply plugin: 'org.projectnessie'
test {
useJUnitPlatform()
}

dependencies {
compile project(':iceberg-core')
compile project(path: ':iceberg-bundled-guava', configuration: 'shadow')
compile "org.projectnessie:nessie-client"

// dependency version "recommendation" via nebula.dependency-recommender don't work here "immediately",
// so pull the Quarkus + Nessie versions from the root-project
def quarkusVersion = rootProject.dependencyRecommendations.getRecommendedVersion("io.quarkus", "quarkus-bom")
def nessieVersion = rootProject.dependencyRecommendations.getRecommendedVersion("org.projectnessie", "nessie-quarkus")
nessieQuarkusRuntime(enforcedPlatform("io.quarkus:quarkus-bom:${quarkusVersion}"))
nessieQuarkusServer "org.projectnessie:nessie-quarkus:${nessieVersion}"
testImplementation "org.projectnessie:nessie-jaxrs-testextension"
// Need to "pull in" el-api explicitly :(
testImplementation "jakarta.el:jakarta.el-api"

compileOnly "org.apache.hadoop:hadoop-common"

Expand Down
26 changes: 19 additions & 7 deletions nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.nessie;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,6 +36,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand All @@ -46,11 +46,14 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.Tasks;
import org.projectnessie.api.TreeApi;
import org.projectnessie.api.params.EntriesParams;
import org.projectnessie.client.NessieClient;
import org.projectnessie.client.NessieConfigConstants;
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.BaseNessieClientServerException;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.Branch;
import org.projectnessie.model.Contents;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.ImmutableDelete;
Expand Down Expand Up @@ -182,8 +185,9 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
.throwFailureWhenFinished()
.onFailure((c, exception) -> refresh())
.run(c -> {
client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(), reference.getHash(), c);
refresh(); // note: updated to reference.updateReference() with Nessie 0.6
Branch branch = client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(),
reference.getHash(), c);
reference.updateReference(branch);
}, BaseNessieClientServerException.class);
threw = false;
} catch (NessieConflictException e) {
Expand Down Expand Up @@ -225,8 +229,9 @@ public void renameTable(TableIdentifier from, TableIdentifier toOriginal) {
.throwFailureWhenFinished()
.onFailure((c, exception) -> refresh())
.run(c -> {
client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(), reference.getHash(), c);
refresh();
Branch branch = client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(),
reference.getHash(), c);
reference.updateReference(branch);
}, BaseNessieClientServerException.class);
} catch (NessieNotFoundException e) {
// important note: the NotFoundException refers to the ref only. If a table was not found it would imply that the
Expand All @@ -236,6 +241,12 @@ public void renameTable(TableIdentifier from, TableIdentifier toOriginal) {
throw new RuntimeException("Failed to drop table as ref is no longer valid.", e);
} catch (BaseNessieClientServerException e) {
throw new CommitFailedException(e, "Failed to rename table: the current reference is not up to date.");
} catch (HttpClientException ex) {
// Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant
// to catch all kinds of network errors (e.g. connection reset). Network code implementation
// details and all kinds of network devices can induce unexpected behavior. So better be
// safe than sorry.
throw new CommitStateUnknownException(ex);
}
// Intentionally just "throw through" Nessie's HttpClientException here and do not "special case"
// just the "timeout" variant to propagate all kinds of network errors (e.g. connection reset).
Expand Down Expand Up @@ -324,7 +335,8 @@ String currentRefName() {

private IcebergTable table(TableIdentifier tableIdentifier) {
try {
Contents table = client.getContentsApi().getContents(NessieUtil.toKey(tableIdentifier), reference.getHash());
Contents table = client.getContentsApi()
.getContents(NessieUtil.toKey(tableIdentifier), reference.getName(), reference.getHash());
return table.unwrap(IcebergTable.class).orElse(null);
} catch (NessieNotFoundException e) {
return null;
Expand Down Expand Up @@ -353,7 +365,7 @@ private UpdateableReference loadReference(String requestedRef) {
private Stream<TableIdentifier> tableStream(Namespace namespace) {
try {
return client.getTreeApi()
.getEntries(reference.getHash(), null, null, Collections.emptyList())
.getEntries(reference.getName(), EntriesParams.builder().hashOnRef(reference.getHash()).build())
.getEntries()
.stream()
.filter(NessieUtil.namespacePredicate(namespace))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.Branch;
import org.projectnessie.model.Contents;
import org.projectnessie.model.ContentsKey;
import org.projectnessie.model.IcebergTable;
Expand Down Expand Up @@ -80,7 +81,7 @@ protected void doRefresh() {
}
String metadataLocation = null;
try {
Contents contents = client.getContentsApi().getContents(key, reference.getHash());
Contents contents = client.getContentsApi().getContents(key, reference.getName(), reference.getHash());
this.table = contents.unwrap(IcebergTable.class)
.orElseThrow(() ->
new IllegalStateException("Cannot refresh iceberg table: " +
Expand All @@ -102,10 +103,17 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {

boolean delete = true;
try {
IcebergTable newTable = ImmutableIcebergTable.builder().metadataLocation(newMetadataLocation).build();
Operations op = ImmutableOperations.builder().addOperations(Operation.Put.of(key, newTable))
ImmutableIcebergTable.Builder newTable = ImmutableIcebergTable.builder();
if (table != null) {
newTable.from(table);
}
newTable.metadataLocation(newMetadataLocation);

Operations op = ImmutableOperations.builder().addOperations(Operation.Put.of(key, newTable.build()))
.commitMeta(NessieUtil.buildCommitMetadata("iceberg commit", catalogOptions)).build();
client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(), reference.getHash(), op);
Branch branch = client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(),
reference.getHash(), op);
reference.updateReference(branch);

delete = false;
} catch (NessieConflictException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.nessie;

import java.util.Objects;
import org.projectnessie.api.TreeApi;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.Branch;
Expand All @@ -44,6 +45,11 @@ public boolean refresh() throws NessieNotFoundException {
return !oldReference.equals(reference);
}

public void updateReference(Reference ref) {
Objects.requireNonNull(ref);
this.reference = ref;
}

public boolean isBranch() {
return reference instanceof Branch;
}
Expand Down
24 changes: 14 additions & 10 deletions nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.nessie;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -33,15 +34,16 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.StructType;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.projectnessie.api.ContentsApi;
import org.projectnessie.api.TreeApi;
import org.projectnessie.client.NessieClient;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.jaxrs.NessieJaxRsExtension;
import org.projectnessie.model.Branch;
import org.projectnessie.model.Reference;
import org.slf4j.Logger;
Expand All @@ -50,11 +52,13 @@
import static org.apache.iceberg.types.Types.NestedField.required;

public abstract class BaseTestIceberg {
@RegisterExtension
static NessieJaxRsExtension server = new NessieJaxRsExtension();

private static final Logger LOGGER = LoggerFactory.getLogger(BaseTestIceberg.class);

@Rule
public TemporaryFolder temp = new TemporaryFolder();
@TempDir
public Path temp;

protected NessieCatalog catalog;
protected NessieClient client;
Expand All @@ -79,10 +83,10 @@ private void resetData() throws NessieConflictException, NessieNotFoundException
tree.createReference(Branch.of("main", null));
}

@Before
@BeforeEach
public void beforeEach() throws IOException {
String port = System.getProperty("quarkus.http.test-port", "19120");
uri = String.format("http://localhost:%s/api/v1", port);
uri = server.getURI().toString();
this.client = NessieClient.builder().withUri(uri).build();
tree = client.getTreeApi();
contents = client.getContentsApi();
Expand All @@ -105,7 +109,7 @@ NessieCatalog initCatalog(String ref) {
newCatalog.initialize("nessie", ImmutableMap.of("ref", ref,
CatalogProperties.URI, uri,
"auth_type", "NONE",
CatalogProperties.WAREHOUSE_LOCATION, temp.getRoot().toURI().toString()
CatalogProperties.WAREHOUSE_LOCATION, temp.toUri().toString()
));
return newCatalog;
}
Expand Down Expand Up @@ -136,7 +140,7 @@ void createBranch(String name, String hash) throws NessieNotFoundException, Ness
tree.createReference(Branch.of(name, hash));
}

@After
@AfterEach
public void afterEach() throws Exception {
catalog.close();
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.projectnessie.model.CommitMeta;

public class NessieUtilTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;

Expand All @@ -39,7 +39,7 @@ public TestBranchVisibility() {
super("main");
}

@Before
@BeforeEach
public void before() throws NessieNotFoundException, NessieConflictException {
createTable(tableIdentifier1, 1); // table 1
createTable(tableIdentifier2, 1); // table 2
Expand All @@ -48,7 +48,7 @@ public void before() throws NessieNotFoundException, NessieConflictException {
testCatalog = initCatalog("test");
}

@After
@AfterEach
public void after() throws NessieNotFoundException, NessieConflictException {
catalog.dropTable(tableIdentifier1);
catalog.dropTable(tableIdentifier2);
Expand Down Expand Up @@ -79,31 +79,34 @@ public void testUpdateCatalogs() {
}

@Test
public void testCatalogOnReference() throws NessieNotFoundException {
public void testCatalogOnReference() {
updateSchema(catalog, tableIdentifier1);
updateSchema(testCatalog, tableIdentifier2);
String mainHash = tree.getReferenceByName("main").getHash();

// catalog created with ref points to same catalog as above
NessieCatalog refCatalog = initCatalog("test");
testCatalogEquality(refCatalog, testCatalog, true, true);

// catalog created with hash points to same catalog as above
NessieCatalog refHashCatalog = initCatalog(mainHash);
NessieCatalog refHashCatalog = initCatalog("main");
testCatalogEquality(refHashCatalog, catalog, true, true);
}

@Test
public void testCatalogWithTableNames() throws NessieNotFoundException {
public void testCatalogWithTableNames() {
updateSchema(testCatalog, tableIdentifier2);
String mainHash = tree.getReferenceByName("main").getHash();

String mainName = "main";

// asking for table@branch gives expected regardless of catalog
Assertions.assertThat(metadataLocation(catalog, TableIdentifier.of("test-ns", "table1@test")))
.isEqualTo(metadataLocation(testCatalog, tableIdentifier1));

// asking for table@branch#hash gives expected regardless of catalog
Assertions.assertThat(metadataLocation(catalog, TableIdentifier.of("test-ns", "table1@" + mainHash)))
// Asking for table@branch gives expected regardless of catalog.
// Earlier versions used "table1@" + tree.getReferenceByName("main").getHash() before, but since
// Nessie 0.8.2 the branch name became mandatory and specifying a hash within a branch is not
// possible.
Assertions.assertThat(metadataLocation(catalog, TableIdentifier.of("test-ns", "table1@" + mainName)))
.isEqualTo(metadataLocation(testCatalog, tableIdentifier1));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.jupiter.api.Test;

public class TestNamespace extends BaseTestIceberg {
private static final String BRANCH = "test-namespace";
Expand Down
Loading