Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotAncestryValidator;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
Expand All @@ -55,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -141,9 +144,13 @@ public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
commitRequestMap.entrySet()) {
Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName()));
DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable();
Snapshot latestSnapshot = table.snapshot(entry.getKey().branch());
Iterable<Snapshot> ancestors =
latestSnapshot != null
? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), table::snapshot)
: List.of();
long maxCommittedCheckpointId =
getMaxCommittedCheckpointId(
table, last.jobId(), last.operatorId(), entry.getKey().branch());
getMaxCommittedCheckpointId(ancestors, last.jobId(), last.operatorId());
// Mark the already committed FilesCommittable(s) as finished
entry
.getValue()
Expand All @@ -160,12 +167,11 @@ public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
}

private static long getMaxCommittedCheckpointId(
Table table, String flinkJobId, String operatorId, String branch) {
Snapshot snapshot = table.snapshot(branch);
Iterable<Snapshot> ancestors, String flinkJobId, String operatorId) {
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

while (snapshot != null) {
Map<String, String> summary = snapshot.summary();
for (Snapshot ancestor : ancestors) {
Map<String, String> summary = ancestor.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
String snapshotOperatorId = summary.get(OPERATOR_ID);
if (flinkJobId.equals(snapshotFlinkJobId)
Expand All @@ -176,9 +182,6 @@ private static long getMaxCommittedCheckpointId(
break;
}
}

Long parentSnapshotId = snapshot.parentId();
snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null;
}

return lastCommittedCheckpointId;
Expand Down Expand Up @@ -347,6 +350,36 @@ private void commitDeltaTxn(
}
}

private static class MaxCommittedCheckpointMismatchException extends ValidationException {
private MaxCommittedCheckpointMismatchException() {
super("Table already contains staged changes.");
}
}

private static class MaxCommittedCheckpointIdValidator implements SnapshotAncestryValidator {
private final long stagedCheckpointId;
private final String flinkJobId;
private final String flinkOperatorId;

private MaxCommittedCheckpointIdValidator(
long stagedCheckpointId, String flinkJobId, String flinkOperatorId) {
this.stagedCheckpointId = stagedCheckpointId;
this.flinkJobId = flinkJobId;
this.flinkOperatorId = flinkOperatorId;
}

@Override
public Boolean apply(Iterable<Snapshot> baseSnapshots) {
long maxCommittedCheckpointId =
getMaxCommittedCheckpointId(baseSnapshots, flinkJobId, flinkOperatorId);
if (maxCommittedCheckpointId >= stagedCheckpointId) {
throw new MaxCommittedCheckpointMismatchException();
}

return true;
}
}

@VisibleForTesting
void commitOperation(
Table table,
Expand All @@ -372,9 +405,25 @@ void commitOperation(
operation.set(FLINK_JOB_ID, newFlinkJobId);
operation.set(OPERATOR_ID, operatorId);
operation.toBranch(branch);
operation.validateWith(
new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId));

long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
try {
operation.commit(); // abort is automatically called if this fails.
} catch (MaxCommittedCheckpointMismatchException e) {
LOG.info(
"Skipping commit operation {} because the {} branch of the {} table already contains changes for checkpoint {}."
+ " This can occur when a failure prevents the committer from receiving confirmation of a"
+ " successful commit, causing the Flink job to retry committing the same set of changes.",
description,
branch,
table.name(),
checkpointId,
e);
return;
}

long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
LOG.info(
"Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand All @@ -59,6 +60,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class TestDynamicCommitter {

Expand Down Expand Up @@ -669,11 +672,15 @@ void testTableBranchAtomicCommitWithFailures() throws Exception {
assertThatThrownBy(commitExecutable);
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();

// Second fail during commit
// Second fail before table update
assertThatThrownBy(commitExecutable);
assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue();

// Third fail after commit
// Third fail after table update
assertThatThrownBy(commitExecutable);
assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue();

// Fourth fail after commit
assertThatThrownBy(commitExecutable);
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();

Expand Down Expand Up @@ -812,37 +819,128 @@ void testReplacePartitions() throws Exception {
.build());
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) throws Exception {
Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
assertThat(table.snapshots()).isEmpty();

DynamicWriteResultAggregator aggregator =
new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();

final String jobId = JobID.generate().toHexString();
final String operatorId = new OperatorID().toHexString();
final int checkpointId = 1;
final String branch = SnapshotRef.MAIN_BRANCH;

WriteTarget writeTarget = new WriteTarget(TABLE1, branch, 42, 0, false, Sets.newHashSet(1, 2));
byte[] manifest =
aggregator.writeToManifest(
writeTarget,
Sets.newHashSet(
new DynamicWriteResult(
writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())),
checkpointId);

CommitRequest<DynamicCommittable> commitRequest1 =
new MockCommitRequest<>(
new DynamicCommittable(writeTarget, manifest, jobId, operatorId, checkpointId));
Collection<CommitRequest<DynamicCommittable>> commitRequests = Sets.newHashSet(commitRequest1);

int workerPoolSize = 1;
String sinkId = "sinkId";
UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup);

CommitHook commitHook =
new TestDynamicIcebergSink.DuplicateCommitHook(
() ->
new DynamicCommitter(
CATALOG_EXTENSION.catalog(),
Map.of(),
overwriteMode,
workerPoolSize,
sinkId,
committerMetrics));

DynamicCommitter mainCommitter =
new CommitHookEnabledDynamicCommitter(
commitHook,
CATALOG_EXTENSION.catalog(),
Maps.newHashMap(),
overwriteMode,
workerPoolSize,
sinkId,
committerMetrics);

mainCommitter.commit(commitRequests);

// Only one commit should succeed
table.refresh();
assertThat(table.snapshots()).hasSize(1);
assertThat(table.currentSnapshot().summary())
.containsAllEntriesOf(
ImmutableMap.<String, String>builder()
.put("added-data-files", "1")
.put("added-records", "42")
.put("changed-partition-count", "1")
.put("flink.job-id", jobId)
.put("flink.max-committed-checkpoint-id", String.valueOf(checkpointId))
.put("flink.operator-id", operatorId)
.put("total-data-files", "1")
.put("total-delete-files", "0")
.put("total-equality-deletes", "0")
.put("total-files-size", "0")
.put("total-position-deletes", "0")
.put("total-records", "42")
.build());
}

interface CommitHook extends Serializable {
void beforeCommit();
default void beforeCommit(Collection<CommitRequest<DynamicCommittable>> commitRequests) {}

void duringCommit();
default void beforeCommitOperation() {}

void afterCommit();
default void afterCommitOperation() {}

default void afterCommit() {}
}

static class FailBeforeAndAfterCommit implements CommitHook {

static boolean failedBeforeCommit;
static boolean failedDuringCommit;
static boolean failedBeforeCommitOperation;
static boolean failedAfterCommitOperation;
static boolean failedAfterCommit;

FailBeforeAndAfterCommit() {
reset();
}

@Override
public void beforeCommit() {
public void beforeCommit(Collection<CommitRequest<DynamicCommittable>> ignored) {
if (!failedBeforeCommit) {
failedBeforeCommit = true;
throw new RuntimeException("Failing before commit");
}
}

@Override
public void duringCommit() {
if (!failedDuringCommit) {
failedDuringCommit = true;
throw new RuntimeException("Failing during commit");
public void beforeCommitOperation() {
if (!failedBeforeCommitOperation) {
failedBeforeCommitOperation = true;
throw new RuntimeException("Failing before commit operation");
}
}

@Override
public void afterCommitOperation() {
if (!failedAfterCommitOperation) {
failedAfterCommitOperation = true;
throw new RuntimeException("Failing after commit operation");
}
}

Expand All @@ -856,7 +954,8 @@ public void afterCommit() {

static void reset() {
failedBeforeCommit = false;
failedDuringCommit = false;
failedBeforeCommitOperation = false;
failedAfterCommitOperation = false;
failedAfterCommit = false;
}
}
Expand All @@ -880,7 +979,7 @@ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
@Override
public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
throws IOException, InterruptedException {
commitHook.beforeCommit();
commitHook.beforeCommit(commitRequests);
super.commit(commitRequests);
commitHook.afterCommit();
}
Expand All @@ -895,9 +994,10 @@ void commitOperation(
String newFlinkJobId,
String operatorId,
long checkpointId) {
commitHook.beforeCommitOperation();
super.commitOperation(
table, branch, operation, summary, description, newFlinkJobId, operatorId, checkpointId);
commitHook.duringCommit();
commitHook.afterCommitOperation();
}
}
}
Loading
Loading