Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,10 @@ project(':iceberg-nessie') {
testImplementation "org.projectnessie:nessie-jaxrs-testextension"
// Need to "pull in" el-api explicitly :(
testImplementation "jakarta.el:jakarta.el-api"
testImplementation 'org.assertj:assertj-core'

compileOnly "org.apache.hadoop:hadoop-common"
testImplementation "org.apache.avro:avro"

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand Down Expand Up @@ -166,6 +167,12 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) {

protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry,
int numRetries) {
refreshFromMetadataLocation(newLocation, shouldRetry, numRetries,
metadataLocation -> TableMetadataParser.read(io(), metadataLocation));
}

protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry,
int numRetries, Function<String, TableMetadata> metadataLoader) {
// use null-safe equality check because new tables have a null metadata location
if (!Objects.equal(currentMetadataLocation, newLocation)) {
LOG.info("Refreshing table metadata from new version: {}", newLocation);
Expand All @@ -175,8 +182,7 @@ protected void refreshFromMetadataLocation(String newLocation, Predicate<Excepti
.retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
.throwFailureWhenFinished()
.shouldRetryTest(shouldRetry)
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(io(), metadataLocation)));
.run(metadataLocation -> newMetadata.set(metadataLoader.apply(metadataLocation)));
Copy link
Contributor

@rdblue rdblue Dec 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rymurr, I'm just catching up on this PR, sorry to be late.

I'm curious why this uses a Function passed into refresh rather than defining a load metadata method. The function passed in from NessieTableOperations is actually a method reference, so couldn't this just be a call to that method, loadTableMetadata?

    .run(metadataLocation -> loadTableMetadata(metadataLocation));

...

  public TableMetadata loadTableMetadata(String metadataLocation) {
    return TableMetadataParser.read(io(), metadataLocation);
  }


String newUUID = newMetadata.get().uuid();
if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) {
Expand Down
58 changes: 58 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,64 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
snapshots, newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
}

/**
* Returns an updated {@link TableMetadata} with the current-snapshot-ID set to the given
* snapshot-ID and the snapshot-log reset to contain only the snapshot with the given snapshot-ID.
*
* @param snapshotId ID of a snapshot that must exist, or {@code -1L} to remove the current snapshot
* and return an empty snapshot log.
* @return {@link TableMetadata} with updated {@link #currentSnapshotId} and {@link #snapshotLog}
*/
public TableMetadata withCurrentSnapshotOnly(long snapshotId) {
if ((currentSnapshotId == -1L && snapshotId == -1L && snapshots.isEmpty()) ||
(currentSnapshotId == snapshotId && snapshots.size() == 1)) {
return this;
}
List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
if (snapshotId != -1L) {
Snapshot snapshot = snapshotsById.get(snapshotId);
Preconditions.checkArgument(snapshot != null, "Non-existent snapshot");
newSnapshotLog.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshotId));
}
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshotId,
snapshots, newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
}

public TableMetadata withCurrentSchema(int schemaId) {
if (currentSchemaId == schemaId) {
return this;
}
Preconditions.checkArgument(schemasById.containsKey(schemaId), "Non-existent schema");
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schemaId, schemas, defaultSpecId, specs,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
}

public TableMetadata withDefaultSortOrder(int sortOrderId) {
if (defaultSortOrderId == sortOrderId) {
return this;
}
Preconditions.checkArgument(sortOrdersById.containsKey(sortOrderId), "Non-existent sort-order");
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
lastAssignedPartitionId, sortOrderId, sortOrders, properties, currentSnapshotId,
snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
}

public TableMetadata withDefaultSpec(int specId) {
if (defaultSpecId == specId) {
return this;
}
Preconditions.checkArgument(specsById.containsKey(specId), "Non-existent partition spec");
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, specId, specs,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
}

private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) {
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(partitionSpec.schema())
.withSpecId(partitionSpec.specId());
Expand Down
123 changes: 70 additions & 53 deletions nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,27 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.Tasks;
import org.projectnessie.api.TreeApi;
import org.projectnessie.api.params.EntriesParams;
import org.projectnessie.client.NessieClient;
import org.projectnessie.client.NessieConfigConstants;
import org.projectnessie.client.api.CommitMultipleOperationsBuilder;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.http.HttpClientBuilder;
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.BaseNessieClientServerException;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.Branch;
import org.projectnessie.model.Contents;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.ImmutableDelete;
import org.projectnessie.model.ImmutableOperations;
import org.projectnessie.model.ImmutablePut;
import org.projectnessie.model.Operations;
import org.projectnessie.model.Operation;
import org.projectnessie.model.Reference;
import org.projectnessie.model.TableReference;
import org.projectnessie.model.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -75,7 +77,7 @@
public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable, SupportsNamespaces, Configurable {
private static final Logger logger = LoggerFactory.getLogger(NessieCatalog.class);
private static final Joiner SLASH = Joiner.on("/");
private NessieClient client;
private NessieApiV1 api;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client is relatively a better variable name at the consumption. clientApi could be an option since the class name is also changing.

private String warehouseLocation;
private Configuration config;
private UpdateableReference reference;
Expand All @@ -95,7 +97,8 @@ public void initialize(String inputName, Map<String, String> options) {
// remove nessie prefix
final Function<String, String> removePrefix = x -> x.replace("nessie.", "");

this.client = NessieClient.builder().fromConfig(x -> options.get(removePrefix.apply(x))).build();
this.api = HttpClientBuilder.builder().fromConfig(x -> options.get(removePrefix.apply(x)))
.build(NessieApiV1.class);

this.warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION);
if (warehouseLocation == null) {
Expand All @@ -120,12 +123,12 @@ public void initialize(String inputName, Map<String, String> options) {
throw new IllegalStateException("Parameter 'warehouse' not set, Nessie can't store data.");
}
final String requestedRef = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF));
this.reference = loadReference(requestedRef);
this.reference = loadReference(requestedRef, null);
}

@Override
public void close() {
client.close();
api.close();
}

@Override
Expand All @@ -135,15 +138,17 @@ public String name() {

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
TableReference pti = TableReference.parse(tableIdentifier);
TableReference tr = TableReference.parse(tableIdentifier.name());
Preconditions.checkArgument(!tr.hasTimestamp(), "Invalid table name: # is only allowed for hashes (reference by " +
"timestamp is not supported)");
UpdateableReference newReference = this.reference;
if (pti.reference() != null) {
newReference = loadReference(pti.reference());
if (tr.getReference() != null) {
newReference = loadReference(tr.getReference(), tr.getHash());
}
return new NessieTableOperations(
NessieUtil.toKey(pti.tableIdentifier()),
ContentKey.of(org.projectnessie.model.Namespace.of(tableIdentifier.namespace().levels()), tr.getName()),
newReference,
client,
api,
fileIO,
catalogOptions);
}
Expand All @@ -170,23 +175,27 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
return false;
}

Operations contents = ImmutableOperations.builder()
.addOperations(ImmutableDelete.builder().key(NessieUtil.toKey(identifier)).build())
.commitMeta(NessieUtil.buildCommitMetadata(String.format("iceberg delete table '%s'", identifier),
if (purge) {
logger.info("Purging data for table {} was set to true but is ignored", identifier.toString());
}

CommitMultipleOperationsBuilder commitBuilderBase = api.commitMultipleOperations()
.commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg delete table %s", identifier),
catalogOptions))
.build();
.operation(Operation.Delete.of(NessieUtil.toKey(identifier)));

// We try to drop the table. Simple retry after ref update.
boolean threw = true;
try {
Tasks.foreach(contents)
Tasks.foreach(commitBuilderBase)
.retry(5)
.stopRetryOn(NessieNotFoundException.class)
.throwFailureWhenFinished()
.onFailure((c, exception) -> refresh())
.run(c -> {
Branch branch = client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(),
reference.getHash(), c);
.onFailure((o, exception) -> refresh())
.run(commitBuilder -> {
Branch branch = commitBuilder
.branch(reference.getAsBranch())
.commit();
reference.updateReference(branch);
}, BaseNessieClientServerException.class);
threw = false;
Expand Down Expand Up @@ -215,24 +224,22 @@ public void renameTable(TableIdentifier from, TableIdentifier toOriginal) {
throw new AlreadyExistsException("table %s already exists", to.name());
}

Operations contents = ImmutableOperations.builder()
.addOperations(
ImmutablePut.builder().key(NessieUtil.toKey(to)).contents(existingFromTable).build(),
ImmutableDelete.builder().key(NessieUtil.toKey(from)).build())
.commitMeta(NessieUtil.buildCommitMetadata(String.format("iceberg rename table from '%s' to '%s'",
from, to),
catalogOptions))
.build();
CommitMultipleOperationsBuilder operations = api.commitMultipleOperations()
.commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg rename table from '%s' to '%s'",
from, to), catalogOptions))
.operation(Operation.Put.of(NessieUtil.toKey(to), existingFromTable, existingFromTable))
.operation(Operation.Delete.of(NessieUtil.toKey(from)));

try {
Tasks.foreach(contents)
Tasks.foreach(operations)
.retry(5)
.stopRetryOn(NessieNotFoundException.class)
.throwFailureWhenFinished()
.onFailure((c, exception) -> refresh())
.run(c -> {
Branch branch = client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(),
reference.getHash(), c);
.onFailure((o, exception) -> refresh())
.run(ops -> {
Branch branch = ops
.branch(reference.getAsBranch())
.commit();
reference.updateReference(branch);
}, BaseNessieClientServerException.class);
} catch (NessieNotFoundException e) {
Expand Down Expand Up @@ -322,37 +329,46 @@ public Configuration getConf() {
return config;
}

TreeApi getTreeApi() {
return client.getTreeApi();
}

public void refresh() throws NessieNotFoundException {
reference.refresh();
reference.refresh(api);
}

public String currentHash() {
return reference.getHash();
}

@VisibleForTesting
String currentRefName() {
return reference.getName();
}

@VisibleForTesting
FileIO fileIO() {
return fileIO;
}

private IcebergTable table(TableIdentifier tableIdentifier) {
try {
Contents table = client.getContentsApi()
.getContents(NessieUtil.toKey(tableIdentifier), reference.getName(), reference.getHash());
return table.unwrap(IcebergTable.class).orElse(null);
ContentKey key = NessieUtil.toKey(tableIdentifier);
Content table = api.getContent().key(key).reference(reference.getReference()).get().get(key);
return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null;
} catch (NessieNotFoundException e) {
return null;
}
}

private UpdateableReference loadReference(String requestedRef) {
private UpdateableReference loadReference(String requestedRef, String hash) {
try {
Reference ref = requestedRef == null ? client.getTreeApi().getDefaultBranch()
: client.getTreeApi().getReferenceByName(requestedRef);
return new UpdateableReference(ref, client.getTreeApi());
Reference ref = requestedRef == null ? api.getDefaultBranch()
: api.getReference().refName(requestedRef).get();
if (hash != null) {
if (ref instanceof Branch) {
ref = Branch.of(ref.getName(), hash);
} else {
ref = Tag.of(ref.getName(), hash);
}
}
return new UpdateableReference(ref, hash != null);
} catch (NessieNotFoundException ex) {
if (requestedRef != null) {
throw new IllegalArgumentException(String.format(
Expand All @@ -369,8 +385,9 @@ private UpdateableReference loadReference(String requestedRef) {

private Stream<TableIdentifier> tableStream(Namespace namespace) {
try {
return client.getTreeApi()
.getEntries(reference.getName(), EntriesParams.builder().hashOnRef(reference.getHash()).build())
return api.getEntries()
.reference(reference.getReference())
.get()
.getEntries()
.stream()
.filter(NessieUtil.namespacePredicate(namespace))
Expand Down
Loading