Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions api/src/main/java/org/apache/iceberg/ReplacePartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg;

import java.util.function.Consumer;

/**
* API for overwriting files in a table by partition.
*
Expand Down Expand Up @@ -71,6 +73,20 @@ public interface ReplacePartitions extends SnapshotUpdate<ReplacePartitions> {
*/
ReplacePartitions validateFromSnapshot(long snapshotId);

/**
* Enables snapshot validation with a user-provided function, which must throw an exception on
* validation failures.
*
* <p>Clients can use this method to validate summary and other metadata of parent snapshots.
*
* @param snapshotValidator a user function to validate parent snapshots
* @return this for method chaining
*/
default ReplacePartitions validateSnapshot(Consumer<Snapshot> snapshotValidator) {
throw new UnsupportedOperationException(
getClass().getName() + " does not implement validateSnapshot");
}

/**
* Enables validation that deletes that happened concurrently do not conflict with this commit's
* operation.
Expand Down
15 changes: 15 additions & 0 deletions api/src/main/java/org/apache/iceberg/RowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg;

import java.util.function.Consumer;
import org.apache.iceberg.expressions.Expression;

/**
Expand Down Expand Up @@ -79,6 +80,20 @@ default RowDelta removeDeletes(DeleteFile deletes) {
*/
RowDelta validateFromSnapshot(long snapshotId);

/**
* Enables snapshot validation with a user-provided function, which must throw an exception on
* validation failures.
*
* <p>Clients can use this method to validate summary and other metadata of parent snapshots.
*
* @param snapshotValidator a user function to validate parent snapshots
* @return this for method chaining
*/
default RowDelta validateSnapshot(Consumer<Snapshot> snapshotValidator) {
throw new UnsupportedOperationException(
getClass().getName() + " does not implement validateSnapshot");
}

/**
* Enables or disables case sensitive expression binding for validations that accept expressions.
*
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.iceberg;

import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.util.PartitionSet;
Expand All @@ -28,6 +30,7 @@ public class BaseReplacePartitions extends MergingSnapshotProducer<ReplacePartit

private final PartitionSet replacedPartitions;
private Long startingSnapshotId;
@Nullable private Consumer<Snapshot> snapshotValidator = null;
private boolean validateConflictingData = false;
private boolean validateConflictingDeletes = false;

Expand Down Expand Up @@ -67,6 +70,12 @@ public ReplacePartitions validateFromSnapshot(long newStartingSnapshotId) {
return this;
}

@Override
public ReplacePartitions validateSnapshot(Consumer<Snapshot> validator) {
this.snapshotValidator = validator;
return this;
}

@Override
public ReplacePartitions validateNoConflictingDeletes() {
this.validateConflictingDeletes = true;
Expand All @@ -87,6 +96,10 @@ public BaseReplacePartitions toBranch(String branch) {

@Override
public void validate(TableMetadata currentMetadata, Snapshot parent) {
if (snapshotValidator != null) {
validateSnapshots(snapshotValidator, currentMetadata, startingSnapshotId, parent);
}

if (validateConflictingData) {
if (dataSpec().isUnpartitioned()) {
validateAddedDataFiles(
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.iceberg;

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand All @@ -30,6 +32,7 @@

public class BaseRowDelta extends MergingSnapshotProducer<RowDelta> implements RowDelta {
private Long startingSnapshotId = null; // check all versions by default
@Nullable private Consumer<Snapshot> snapshotValidator = null;
private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
private final DataFileSet removedDataFiles = DataFileSet.create();
private boolean validateDeletes = false;
Expand Down Expand Up @@ -86,6 +89,12 @@ public RowDelta validateFromSnapshot(long snapshotId) {
return this;
}

@Override
public RowDelta validateSnapshot(Consumer<Snapshot> validator) {
this.snapshotValidator = validator;
return this;
}

@Override
public RowDelta validateDeletedFiles() {
this.validateDeletes = true;
Expand Down Expand Up @@ -144,6 +153,10 @@ protected void validate(TableMetadata base, Snapshot parent) {
parent);
}

if (snapshotValidator != null) {
validateSnapshots(snapshotValidator, base, startingSnapshotId, parent);
}

if (validateDeletes) {
failMissingDeletePaths();
}
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -674,6 +676,27 @@ protected void validateDeletedDataFiles(
}
}

/**
* Validates parent snapshots with a user-provided function.
*
* @param validator the validation function
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param parent ending snapshot on the branch being validated
*/
protected void validateSnapshots(
Consumer<Snapshot> validator,
TableMetadata base,
@Nullable Long startingSnapshotId,
@Nullable Snapshot parent) {
if (parent == null) {
return;
}

SnapshotUtil.ancestorsBetween(parent.snapshotId(), startingSnapshotId, base::snapshot)
.forEach(validator);
}

/**
* Returns an iterable of files matching a filter have been added to the table since a starting
* snapshot.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerialization;
Expand All @@ -40,6 +42,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
Expand Down Expand Up @@ -158,26 +161,36 @@ public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
private static long getMaxCommittedCheckpointId(
Table table, String flinkJobId, String operatorId, String branch) {
Snapshot snapshot = table.snapshot(branch);
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

while (snapshot != null) {
Map<String, String> summary = snapshot.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
String snapshotOperatorId = summary.get(OPERATOR_ID);
if (flinkJobId.equals(snapshotFlinkJobId)
&& (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) {
String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
if (value != null) {
lastCommittedCheckpointId = Long.parseLong(value);
break;
}
@Nullable
Long committedCheckpointId = extractCommittedCheckpointId(snapshot, flinkJobId, operatorId);
if (committedCheckpointId != null) {
return committedCheckpointId;
}

Long parentSnapshotId = snapshot.parentId();
snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null;
}

return lastCommittedCheckpointId;
return INITIAL_CHECKPOINT_ID;
}

@Nullable
private static Long extractCommittedCheckpointId(
Snapshot snapshot, String flinkJobId, String operatorId) {
Map<String, String> summary = snapshot.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
String snapshotOperatorId = summary.get(OPERATOR_ID);
if (flinkJobId.equals(snapshotFlinkJobId)
&& (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) {
String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
if (value != null) {
return Long.parseLong(value);
}
}

return null;
}

/**
Expand Down Expand Up @@ -276,7 +289,17 @@ private void replacePartitions(
String operatorId) {
// Iceberg tables are unsorted. So the order of the append data does not matter.
// Hence, we commit everything in one snapshot.
ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool);
long checkpointId = pendingResults.lastKey();
ReplacePartitions dynamicOverwrite =
table
.newReplacePartitions()
.scanManifestsWith(workerPool)
.validateSnapshot(
new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId));
@Nullable Snapshot latestSnapshot = table.snapshot(branch);
if (latestSnapshot != null) {
dynamicOverwrite = dynamicOverwrite.validateFromSnapshot(latestSnapshot.snapshotId());
}

for (List<WriteResult> writeResults : pendingResults.values()) {
for (WriteResult result : writeResults) {
Expand All @@ -292,7 +315,7 @@ private void replacePartitions(
"dynamic partition overwrite",
newFlinkJobId,
operatorId,
pendingResults.lastKey());
checkpointId);
}

private void commitDeltaTxn(
Expand All @@ -306,7 +329,17 @@ private void commitDeltaTxn(
long checkpointId = e.getKey();
List<WriteResult> writeResults = e.getValue();

RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
RowDelta rowDelta =
table
.newRowDelta()
.scanManifestsWith(workerPool)
.validateSnapshot(
new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId));
@Nullable Snapshot latestSnapshot = table.snapshot(branch);
if (latestSnapshot != null) {
rowDelta = rowDelta.validateFromSnapshot(latestSnapshot.snapshotId());
}

for (WriteResult result : writeResults) {
// Row delta validations are not needed for streaming changes that write equality deletes.
// Equality deletes are applied to data in all previous sequence numbers, so retries may
Expand All @@ -329,6 +362,39 @@ private void commitDeltaTxn(
}
}

static class MaxCommittedCheckpointIdValidator implements Consumer<Snapshot> {
private final long stagedCheckpointId;
private final String flinkJobId;
private final String flinkOperatorId;

MaxCommittedCheckpointIdValidator(
long stagedCheckpointId, String flinkJobId, String flinkOperatorId) {
this.stagedCheckpointId = stagedCheckpointId;
this.flinkJobId = flinkJobId;
this.flinkOperatorId = flinkOperatorId;
}

@Override
public void accept(Snapshot snapshot) {
@Nullable
Long checkpointId = extractCommittedCheckpointId(snapshot, flinkJobId, flinkOperatorId);
if (checkpointId == null) {
return;
}

ValidationException.check(
checkpointId < stagedCheckpointId,
"The new parent snapshot '%s' has '%s': '%s' >= '%s' of the currently staged committable."
+ "\nThis can happen, for example, when using the REST catalog: if the previous commit request failed"
+ " in the Flink client but succeeded on the server after the Flink job decided to retry it with the new request."
+ "\nFlink should retry this exception, and the committer should skip the duplicate request during the next retry.",
snapshot.snapshotId(),
MAX_COMMITTED_CHECKPOINT_ID,
checkpointId,
stagedCheckpointId);
}
}

@VisibleForTesting
void commitOperation(
Table table,
Expand Down
Loading