Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.projectnessie.client.NessieClient;
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.CommitMeta;
import org.projectnessie.model.Contents;
import org.projectnessie.model.ContentsKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.ImmutableCommitMeta.Builder;
import org.projectnessie.model.ImmutableIcebergTable;
import org.projectnessie.model.ImmutableOperations;
import org.projectnessie.model.Operation;
Expand Down Expand Up @@ -92,22 +95,33 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {

String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);

boolean threw = true;
boolean delete = true;
try {
IcebergTable newTable = ImmutableIcebergTable.builder().metadataLocation(newMetadataLocation).build();
Builder cm = CommitMeta.builder().message("iceberg commit");
String appId = applicationId();
if (appId != null) {
cm.putProperties("spark.app.id", appId);
}
Operations op = ImmutableOperations.builder().addOperations(Operation.Put.of(key, newTable))
.commitMeta(CommitMeta.fromMessage(String.format("iceberg commit%s", applicationId())))
.build();
.commitMeta(cm.build()).build();
client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(), reference.getHash(), op);

threw = false;
delete = false;
} catch (NessieConflictException ex) {
throw new CommitFailedException(ex, "Commit failed: Reference hash is out of date. " +
"Update the reference %s and try again", reference.getName());
} catch (HttpClientException ex) {
// Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant
// to catch all kinds of network errors (e.g. connection reset). Network code implementation
// details and all kinds of network devices can induce unexpected behavior. So better be
// safe than sorry.
delete = false;
throw new CommitStateUnknownException(ex);
} catch (NessieNotFoundException ex) {
throw new RuntimeException(String.format("Commit failed: Reference %s no longer exist", reference.getName()), ex);
} finally {
if (threw) {
if (delete) {
io().deleteFile(newMetadataLocation);
}
}
Expand Down Expand Up @@ -137,7 +151,7 @@ private String applicationId() {
}

}
return appId == null ? "" : ("\nspark.app.id= " + appId);
return appId;
}

}