diff --git a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java index bd4fa17d50b3..20127abd7332 100644 --- a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java @@ -101,17 +101,6 @@ default OverwriteFiles deleteFiles( */ OverwriteFiles validateAddedFilesMatchOverwriteFilter(); - /** - * Set the snapshot ID used in any reads for this operation. - * - *

Validations will check changes after this snapshot ID. If the from snapshot is not set, all - * ancestor snapshots through the table's initial snapshot are validated. - * - * @param snapshotId a snapshot ID - * @return this for method chaining - */ - OverwriteFiles validateFromSnapshot(long snapshotId); - /** * Enables or disables case sensitive expression binding for validations that accept expressions. * diff --git a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java index 7e8ab65304c5..c153e3fd54a2 100644 --- a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java +++ b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java @@ -55,22 +55,6 @@ public interface ReplacePartitions extends SnapshotUpdate { */ ReplacePartitions validateAppendOnly(); - /** - * Set the snapshot ID used in validations for this operation. - * - *

All validations will check changes after this snapshot ID. If this is not called, validation - * will occur from the beginning of the table's history. - * - *

This method should be called before this operation is committed. If a concurrent operation - * committed a data or delta file or removed a data file after the given snapshot ID that might - * contain rows matching a partition marked for deletion, validation will detect this and fail. - * - * @param snapshotId a snapshot ID, it should be set to when this operation started to read the - * table. - * @return this for method chaining - */ - ReplacePartitions validateFromSnapshot(long snapshotId); - /** * Enables validation that deletes that happened concurrently do not conflict with this commit's * operation. diff --git a/api/src/main/java/org/apache/iceberg/RewriteFiles.java b/api/src/main/java/org/apache/iceberg/RewriteFiles.java index 2ed6368905ce..f16efb7d7f5c 100644 --- a/api/src/main/java/org/apache/iceberg/RewriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/RewriteFiles.java @@ -173,15 +173,4 @@ RewriteFiles rewriteFiles( Set deleteFilesToReplace, Set dataFilesToAdd, Set deleteFilesToAdd); - - /** - * Set the snapshot ID used in any reads for this operation. - * - *

Validations will check changes after this snapshot ID. If this is not called, all ancestor - * snapshots through the table's initial snapshot are validated. - * - * @param snapshotId a snapshot ID - * @return this for method chaining - */ - RewriteFiles validateFromSnapshot(long snapshotId); } diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java b/api/src/main/java/org/apache/iceberg/RowDelta.java index f5f10b70f9ce..6dffd59bbc46 100644 --- a/api/src/main/java/org/apache/iceberg/RowDelta.java +++ b/api/src/main/java/org/apache/iceberg/RowDelta.java @@ -68,17 +68,6 @@ default RowDelta removeDeletes(DeleteFile deletes) { getClass().getName() + " does not implement removeDeletes"); } - /** - * Set the snapshot ID used in any reads for this operation. - * - *

Validations will check changes after this snapshot ID. If the from snapshot is not set, all - * ancestor snapshots through the table's initial snapshot are validated. - * - * @param snapshotId a snapshot ID - * @return this for method chaining - */ - RowDelta validateFromSnapshot(long snapshotId); - /** * Enables or disables case sensitive expression binding for validations that accept expressions. * diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java index cc6b02dee474..739a8eb735f0 100644 --- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java +++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java @@ -71,4 +71,32 @@ default ThisT toBranch(String branch) { "Cannot commit to branch %s: %s does not support branch commits", branch, this.getClass().getName())); } + + /** + * Enables snapshot validation with a user-provided function, which must throw a {@link + * org.apache.iceberg.exceptions.ValidationException} on validation failures. + * + *

Clients can use this method to validate summary and other metadata of parent snapshots. + * + * @param snapshotValidator a user function to validate parent snapshots + * @return this for method chaining + */ + default ThisT validateWith(Consumer snapshotValidator) { + throw new UnsupportedOperationException( + getClass().getName() + " does not implement validateWith"); + } + + /** + * Set the snapshot ID used in any reads for this operation. + * + *

Validations will check changes after this snapshot ID. If the from snapshot is not set, all + * ancestor snapshots through the table's initial snapshot are validated. + * + * @param snapshotId a snapshot ID + * @return this for method chaining + */ + default ThisT validateFromSnapshot(long snapshotId) { + throw new UnsupportedOperationException( + getClass().getName() + " does not implement validateFromSnapshot"); + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index fb8c309d0bce..7e4b8c8a553a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -32,7 +32,6 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer implements OverwriteFiles { private final DataFileSet deletedDataFiles = DataFileSet.create(); private boolean validateAddedFilesMatchOverwriteFilter = false; - private Long startingSnapshotId = null; private Expression conflictDetectionFilter = null; private boolean validateNewDataFiles = false; private boolean validateNewDeletes = false; @@ -98,12 +97,6 @@ public OverwriteFiles validateAddedFilesMatchOverwriteFilter() { return this; } - @Override - public OverwriteFiles validateFromSnapshot(long snapshotId) { - this.startingSnapshotId = snapshotId; - return this; - } - @Override public OverwriteFiles conflictDetectionFilter(Expression newConflictDetectionFilter) { Preconditions.checkArgument( @@ -134,6 +127,8 @@ public BaseOverwriteFiles toBranch(String branch) { @Override protected void validate(TableMetadata base, Snapshot parent) { + super.validate(base, parent); + if (validateAddedFilesMatchOverwriteFilter) { PartitionSpec spec = dataSpec(); Expression rowFilter = rowFilter(); @@ -161,19 +156,19 @@ protected void validate(TableMetadata base, Snapshot parent) { } if (validateNewDataFiles) { - validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), parent); + validateAddedDataFiles(base, startingSnapshotId(), dataConflictDetectionFilter(), parent); } if (validateNewDeletes) { if (rowFilter() != Expressions.alwaysFalse()) { Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter(); - validateNoNewDeleteFiles(base, startingSnapshotId, filter, parent); - validateDeletedDataFiles(base, startingSnapshotId, filter, parent); + validateNoNewDeleteFiles(base, startingSnapshotId(), filter, parent); + validateDeletedDataFiles(base, startingSnapshotId(), filter, parent); } if (!deletedDataFiles.isEmpty()) { validateNoNewDeletesForDataFiles( - base, startingSnapshotId, conflictDetectionFilter, deletedDataFiles, parent); + base, startingSnapshotId(), conflictDetectionFilter, deletedDataFiles, parent); } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index 892257b51b0c..8c3e6bf73c1b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -27,7 +27,6 @@ public class BaseReplacePartitions extends MergingSnapshotProducer implements RewriteFiles { private final DataFileSet replacedDataFiles = DataFileSet.create(); - private Long startingSnapshotId = null; BaseRewriteFiles(String tableName, TableOperations ops) { super(tableName, ops); @@ -119,12 +118,6 @@ public RewriteFiles rewriteFiles( return this; } - @Override - public RewriteFiles validateFromSnapshot(long snapshotId) { - this.startingSnapshotId = snapshotId; - return this; - } - @Override public BaseRewriteFiles toBranch(String branch) { targetBranch(branch); @@ -133,11 +126,13 @@ public BaseRewriteFiles toBranch(String branch) { @Override protected void validate(TableMetadata base, Snapshot parent) { + super.validate(base, parent); + validateReplacedAndAddedFiles(); if (!replacedDataFiles.isEmpty()) { // if there are replaced data files, there cannot be any new row-level deletes for those data // files - validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles, parent); + validateNoNewDeletesForDataFiles(base, startingSnapshotId(), replacedDataFiles, parent); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index b819d03dd5f8..d5a34572be0f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -29,7 +29,6 @@ import org.apache.iceberg.util.SnapshotUtil; public class BaseRowDelta extends MergingSnapshotProducer implements RowDelta { - private Long startingSnapshotId = null; // check all versions by default private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); private final DataFileSet removedDataFiles = DataFileSet.create(); private boolean validateDeletes = false; @@ -80,12 +79,6 @@ public RowDelta removeDeletes(DeleteFile deletes) { return this; } - @Override - public RowDelta validateFromSnapshot(long snapshotId) { - this.startingSnapshotId = snapshotId; - return this; - } - @Override public RowDelta validateDeletedFiles() { this.validateDeletes = true; @@ -126,18 +119,20 @@ public RowDelta toBranch(String branch) { @Override protected void validate(TableMetadata base, Snapshot parent) { + super.validate(base, parent); + if (parent != null) { - if (startingSnapshotId != null) { + if (startingSnapshotId() != null) { Preconditions.checkArgument( - SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot), + SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId(), base::snapshot), "Snapshot %s is not an ancestor of %s", - startingSnapshotId, + startingSnapshotId(), parent.snapshotId()); } if (!referencedDataFiles.isEmpty()) { validateDataFilesExist( base, - startingSnapshotId, + startingSnapshotId(), referencedDataFiles, !validateDeletes, conflictDetectionFilter, @@ -149,23 +144,23 @@ protected void validate(TableMetadata base, Snapshot parent) { } if (validateNewDataFiles) { - validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent); + validateAddedDataFiles(base, startingSnapshotId(), conflictDetectionFilter, parent); } if (validateNewDeleteFiles) { // validate that explicitly deleted files have not had added deletes if (!removedDataFiles.isEmpty()) { validateNoNewDeletesForDataFiles( - base, startingSnapshotId, conflictDetectionFilter, removedDataFiles, parent); + base, startingSnapshotId(), conflictDetectionFilter, removedDataFiles, parent); } // validate that previous deletes do not conflict with added deletes - validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent); + validateNoNewDeleteFiles(base, startingSnapshotId(), conflictDetectionFilter, parent); } validateNoConflictingFileAndPositionDeletes(); - validateAddedDVs(base, startingSnapshotId, conflictDetectionFilter, parent); + validateAddedDVs(base, startingSnapshotId(), conflictDetectionFilter, parent); } } diff --git a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java index 439524deaf24..a5c50c113cb2 100644 --- a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java +++ b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java @@ -156,6 +156,8 @@ public Object updateEvent() { @Override protected void validate(TableMetadata base, Snapshot snapshot) { + super.validate(base, snapshot); + // this is only called after apply() passes off to super, but check fast-forward status just in // case if (!isFastForward(base)) { diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index d11f466434ec..24b06ba7ff7f 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Consumer; import java.util.function.Function; +import javax.annotation.Nullable; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.events.CreateSnapshotEvent; @@ -117,6 +118,8 @@ public void accept(String file) { private TableMetadata base; private boolean stageOnly = false; private Consumer deleteFunc = defaultDelete; + @Nullable private Long startingSnapshotId = null; // check all versions by default + @Nullable private Consumer snapshotValidator = null; private ExecutorService workerPool; private String targetBranch = SnapshotRef.MAIN_BRANCH; @@ -211,6 +214,23 @@ public ThisT deleteWith(Consumer deleteCallback) { return self(); } + @Override + public ThisT validateWith(Consumer validator) { + this.snapshotValidator = validator; + return self(); + } + + @Override + public ThisT validateFromSnapshot(long startSnapshotId) { + this.startingSnapshotId = startSnapshotId; + return self(); + } + + @Nullable + protected Long startingSnapshotId() { + return this.startingSnapshotId; + } + /** * Clean up any uncommitted manifests that were created. * @@ -238,7 +258,13 @@ public ThisT deleteWith(Consumer deleteCallback) { * @param currentMetadata current table metadata to validate * @param snapshot ending snapshot on the lineage which is being validated */ - protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {} + protected void validate(TableMetadata currentMetadata, Snapshot snapshot) { + if (snapshotValidator != null && snapshot != null) { + SnapshotUtil.ancestorsBetween( + snapshot.snapshotId(), startingSnapshotId, currentMetadata::snapshot) + .forEach(snapshotValidator); + } + } /** * Apply the update's changes to the given metadata and snapshot. Return the new manifest list. diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java b/core/src/main/java/org/apache/iceberg/StreamingDelete.java index 81621164e4af..4818ebce3ed7 100644 --- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java +++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java @@ -70,6 +70,8 @@ public StreamingDelete toBranch(String branch) { @Override protected void validate(TableMetadata base, Snapshot parent) { + super.validate(base, parent); + if (validateFilesToDeleteExist) { failMissingDeletePaths(); } diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java index c3e238e3bc93..ede3e0dd9e9f 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java @@ -19,10 +19,19 @@ package org.apache.iceberg; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; -public class TestSnapshotProducer { +public class TestSnapshotProducer extends TestBase { @Test public void testManifestFileGroupSize() { @@ -69,6 +78,95 @@ public void testManifestFileGroupSize() { "Must limit parallelism to avoid tiny manifests"); } + @TestTemplate + void testCommitValidationFailsOnExistingWapIdInSnapshotHistory() { + String stagedWapId = "12345"; + table + .newFastAppend() + .appendFile(FILE_A) + .set(SnapshotSummary.STAGED_WAP_ID_PROP, stagedWapId) + .commit(); + + String validationExceptionMessage = + String.format("Duplicate %s: %s", SnapshotSummary.STAGED_WAP_ID_PROP, stagedWapId); + TestingSnapshotProducer producer = + new TestingSnapshotProducer(table.ops()) + .validateWith(new WapIdValidator(stagedWapId, validationExceptionMessage)); + + assertThatThrownBy(producer::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(validationExceptionMessage); + } + + @TestTemplate + void testCommitValidationStartsFromConfiguredSnapshot() { + String stagedWapId = "12345"; + table + .newFastAppend() + .appendFile(TestBase.FILE_A) + .set(SnapshotSummary.STAGED_WAP_ID_PROP, stagedWapId) + .commit(); + + TestingSnapshotProducer producer = + new TestingSnapshotProducer(table.ops()) + .validateFromSnapshot(table.currentSnapshot().snapshotId()) + .validateWith(new WapIdValidator(stagedWapId, "empty")); + + assertThatNoException().isThrownBy(producer::commit); + assertThat(Iterables.size(table.snapshots())).isEqualTo(2); + } + + private static class WapIdValidator implements Consumer { + private final String stagedWapId; + private final String validationErrorMessage; + + private WapIdValidator(String stagedWapId, String validationErrorMessage) { + this.stagedWapId = stagedWapId; + this.validationErrorMessage = validationErrorMessage; + } + + @Override + public void accept(Snapshot snapshot) { + if (stagedWapId.equals(snapshot.summary().get(SnapshotSummary.STAGED_WAP_ID_PROP))) { + throw new ValidationException(validationErrorMessage); + } + } + } + + private static class TestingSnapshotProducer extends SnapshotProducer { + private TestingSnapshotProducer(TableOperations ops) { + super(ops); + } + + @Override + protected TestingSnapshotProducer self() { + return this; + } + + @Override + protected void cleanUncommitted(Set committed) {} + + @Override + protected String operation() { + return ""; + } + + @Override + protected List apply(TableMetadata metadataToUpdate, Snapshot snapshot) { + return List.of(); + } + + @Override + protected Map summary() { + return Map.of(); + } + + @Override + public TestingSnapshotProducer set(String property, String value) { + return null; + } + } + private void assertManifestWriterCount( int workerPoolSize, int fileCount, int expectedManifestWriterCount, String errMsg) { int writerCount = SnapshotProducer.manifestWriterCount(workerPoolSize, fileCount); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index 847ab9224c13..bc58d89cbcc1 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -40,6 +41,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -51,6 +53,7 @@ import org.apache.iceberg.connect.events.StartCommit; import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -240,33 +243,38 @@ private void commitToTable( if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); } else { + SnapshotUpdate operation; if (deleteFiles.isEmpty()) { AppendFiles appendOp = table.newAppend(); - if (branch != null) { - appendOp.toBranch(branch); - } - appendOp.set(snapshotOffsetsProp, offsetsJson); - appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (validThroughTs != null) { - appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); - } dataFiles.forEach(appendOp::appendFile); - appendOp.commit(); + operation = appendOp; } else { RowDelta deltaOp = table.newRowDelta(); - if (branch != null) { - deltaOp.toBranch(branch); - } - deltaOp.set(snapshotOffsetsProp, offsetsJson); - deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (validThroughTs != null) { - deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); - } dataFiles.forEach(deltaOp::addRows); deleteFiles.forEach(deltaOp::addDeletes); - deltaOp.commit(); + operation = deltaOp; + } + + if (branch != null) { + operation.toBranch(branch); + } + + operation.set(snapshotOffsetsProp, offsetsJson); + operation.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (validThroughTs != null) { + operation.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString()); } + // Validate concurrent commits + operation.validateWith( + new OffsetValidator(tableIdentifier, snapshotOffsetsProp, committedOffsets)); + Snapshot latestSnapshot = latestSnapshot(table, branch); + if (latestSnapshot != null) { + operation.validateFromSnapshot(latestSnapshot.snapshotId()); + } + + operation.commit(); + Long snapshotId = latestSnapshot(table, branch).snapshotId(); Event event = new Event( @@ -284,6 +292,37 @@ private void commitToTable( } } + private static class OffsetValidator implements Consumer { + private final TableIdentifier tableIdentifier; + private final String snapshotOffsetsProp; + private final Map expectedOffsets; + + private OffsetValidator( + TableIdentifier tableIdentifier, + String snapshotOffsetsProp, + Map expectedOffsets) { + this.tableIdentifier = tableIdentifier; + this.snapshotOffsetsProp = snapshotOffsetsProp; + this.expectedOffsets = expectedOffsets; + } + + @Override + public void accept(Snapshot snapshot) { + Map lastCommittedOffsets = + extractLastCommittedOffsets(snapshot, snapshotOffsetsProp); + if (expectedOffsets.isEmpty() && lastCommittedOffsets == null) { + return; // there are no stored offsets, so assume we're starting with new offsets + } + + ValidationException.check( + expectedOffsets.equals(lastCommittedOffsets), + "Latest offsets do not match expected offsets for this commit. Table: %s, Expected: %s, Last Committed: %s", + tableIdentifier, + expectedOffsets, + lastCommittedOffsets); + } + } + private Predicate distinctByKey(Function keyExtractor) { Map seen = Maps.newConcurrentMap(); return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; @@ -299,22 +338,32 @@ private Snapshot latestSnapshot(Table table, String branch) { private Map lastCommittedOffsetsForTable(Table table, String branch) { Snapshot snapshot = latestSnapshot(table, branch); while (snapshot != null) { - Map summary = snapshot.summary(); - String value = summary.get(snapshotOffsetsProp); - if (value != null) { - TypeReference> typeRef = new TypeReference>() {}; - try { - return MAPPER.readValue(value, typeRef); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + Map offsets = extractLastCommittedOffsets(snapshot, snapshotOffsetsProp); + if (offsets != null) { + return offsets; } + Long parentSnapshotId = snapshot.parentId(); snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; } return ImmutableMap.of(); } + private static Map extractLastCommittedOffsets( + Snapshot snapshot, String snapshotOffsetsProp) { + String snapshotOffsets = snapshot.summary().get(snapshotOffsetsProp); + if (snapshotOffsets == null) { + return null; + } + + TypeReference> typeRef = new TypeReference>() {}; + try { + return MAPPER.readValue(snapshotOffsets, typeRef); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + void terminate() { this.terminated = true; diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java index 117a5258d3d2..63bde7c0c3ff 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.IOException; @@ -92,7 +93,7 @@ private InMemoryCatalog initInMemoryCatalog() { @BeforeEach @SuppressWarnings("deprecation") public void before() { - catalog = initInMemoryCatalog(); + catalog = spy(initInMemoryCatalog()); catalog.createNamespace(NAMESPACE); table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index 44ef43877cb1..ad100f00761c 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -32,6 +32,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.connect.events.AvroUtil; import org.apache.iceberg.connect.events.CommitComplete; import org.apache.iceberg.connect.events.CommitToTable; @@ -229,4 +230,45 @@ public void testCoordinatorRunning() { sourceConsumer.rebalance(ImmutableList.of(tp1)); assertThat(mockIcebergSinkTask.isCoordinatorRunning()).isFalse(); } + + @Test + public void testCoordinatorCommittedOffsetValidation() { + // This test demonstrates that the Coordinator's validateAndCommit method + // prevents commits when another independent commit has updated the offsets + // during the commit process + + // Set the initial offsets + table + .newAppend() + .appendFile(EventTestUtil.createDataFile()) + .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":1}") + .commit(); + + Table frozenTable = catalog.loadTable(TABLE_IDENTIFIER); + + // return the original table state on the first load, so that the update will happen + // during the commit refresh + when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(frozenTable).thenCallRealMethod(); + + // Independently update the offsets + table + .newAppend() + .appendFile(EventTestUtil.createDataFile()) + .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}") + .commit(); + + table.refresh(); + assertThat(table.snapshots()).hasSize(2); + Snapshot firstSnapshot = table.currentSnapshot(); + assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); + + // Trigger commit to the table + coordinatorTest( + ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), EventTestUtil.now()); + + // Assert that the table was not updated and offsets remain + table.refresh(); + assertThat(table.snapshots()).hasSize(2); + assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); + } }