Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 13 additions & 36 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
Expand Down Expand Up @@ -65,7 +65,6 @@ enum TransactionType {
private final TransactionTable transactionTable;
private final TableOperations transactionOps;
private final List<PendingUpdate> updates;
private final Set<Long> intermediateSnapshotIds;
private final Set<String> deletedFiles =
Sets.newHashSet(); // keep track of files deleted in the most recent commit
private final Consumer<String> enqueueDelete = deletedFiles::add;
Expand All @@ -92,7 +91,6 @@ enum TransactionType {
this.current = start;
this.transactionOps = new TransactionTableOperations();
this.updates = Lists.newArrayList();
this.intermediateSnapshotIds = Sets.newHashSet();
this.base = ops.current();
this.type = type;
this.hasLastOpCommitted = true;
Expand Down Expand Up @@ -395,9 +393,8 @@ private void commitSimpleTransaction() {
return;
}

// this is always set to the latest commit attempt's snapshot id.
AtomicLong currentSnapshotId = new AtomicLong(-1L);

Set<Long> startingSnapshots =
base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
try {
Tasks.foreach(ops)
.retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
Expand All @@ -411,11 +408,6 @@ private void commitSimpleTransaction() {
underlyingOps -> {
applyUpdates(underlyingOps);

if (current.currentSnapshot() != null) {
currentSnapshotId.set(current.currentSnapshot().snapshotId());
}

// fix up the snapshot log, which should not contain intermediate snapshots
underlyingOps.commit(base, current);
});

Expand All @@ -433,17 +425,18 @@ private void commitSimpleTransaction() {
// the commit succeeded

try {
if (currentSnapshotId.get() != -1) {
intermediateSnapshotIds.add(currentSnapshotId.get());
// clean up the data files that were deleted by each operation. first, get the list of
// committed manifests to ensure that no committed manifest is deleted.
// A manifest could be deleted in one successful operation commit, but reused in another
// successful commit of that operation if the whole transaction is retried.
Set<Long> newSnapshots = Sets.newHashSet();
for (Snapshot snapshot : current.snapshots()) {
if (!startingSnapshots.contains(snapshot.snapshotId())) {
newSnapshots.add(snapshot.snapshotId());
}
}

// clean up the data files that were deleted by each operation. first, get the list of
// committed manifests to
// ensure that no committed manifest is deleted. a manifest could be deleted in one successful
// operation
// commit, but reused in another successful commit of that operation if the whole transaction
// is retried.
Set<String> committedFiles = committedFiles(ops, intermediateSnapshotIds);
Set<String> committedFiles = committedFiles(ops, newSnapshots);
if (committedFiles != null) {
// delete all of the files that were deleted in the most recent set of operation commits
Tasks.foreach(deletedFiles)
Expand Down Expand Up @@ -520,15 +513,6 @@ private static Set<String> committedFiles(TableOperations ops, Set<Long> snapsho
return committedFiles;
}

private static Long currentId(TableMetadata meta) {
if (meta != null) {
if (meta.currentSnapshot() != null) {
return meta.currentSnapshot().snapshotId();
}
}
return null;
}

public class TransactionTableOperations implements TableOperations {
private TableOperations tempOps = ops.temp(current);

Expand All @@ -550,13 +534,6 @@ public void commit(TableMetadata underlyingBase, TableMetadata metadata) {
throw new CommitFailedException("Table metadata refresh is required");
}

// track the intermediate snapshot ids for rewriting the snapshot log
// an id is intermediate if it isn't the base snapshot id and it is replaced by a new current
Long oldId = currentId(current);
if (oldId != null && !oldId.equals(currentId(metadata)) && !oldId.equals(currentId(base))) {
intermediateSnapshotIds.add(oldId);
}

BaseTransaction.this.current = metadata;

this.tempOps = ops.temp(metadata);
Expand Down