diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index 30ae5f33c71e..5c516311bd4c 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -41,6 +42,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotAncestryValidator; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -52,9 +54,10 @@ import org.apache.iceberg.connect.events.StartCommit; import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.connect.errors.ConnectException; @@ -251,7 +254,8 @@ private void commitToTable( } else { String taskId = String.format("%s-%s", config.connectorName(), config.taskId()); if (deleteFiles.isEmpty()) { - AppendFiles appendOp = table.newAppend(); + AppendFiles appendOp = + table.newAppend().validateWith(offsetValidator(tableIdentifier, committedOffsets)); if (branch != null) { appendOp.toBranch(branch); } @@ -264,7 +268,8 @@ private void commitToTable( dataFiles.forEach(appendOp::appendFile); appendOp.commit(); } else { - RowDelta deltaOp = table.newRowDelta(); + RowDelta deltaOp = + table.newRowDelta().validateWith(offsetValidator(tableIdentifier, committedOffsets)); if (branch != null) { deltaOp.toBranch(branch); } @@ -296,6 +301,28 @@ private void commitToTable( } } + private SnapshotAncestryValidator offsetValidator( + TableIdentifier tableIdentifier, Map expectedOffsets) { + + return new SnapshotAncestryValidator() { + private Map lastCommittedOffsets; + + @Override + public Boolean apply(Iterable baseSnapshots) { + lastCommittedOffsets = lastCommittedOffsets(baseSnapshots); + + return expectedOffsets.equals(lastCommittedOffsets); + } + + @Override + public String errorMessage() { + return String.format( + "Cannot commit to %s, stale offsets: Expected: %s Committed: %s", + tableIdentifier, expectedOffsets, lastCommittedOffsets); + } + }; + } + private Predicate distinctByKey(Function keyExtractor) { Map seen = Maps.newConcurrentMap(); return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; @@ -310,21 +337,37 @@ private Snapshot latestSnapshot(Table table, String branch) { private Map lastCommittedOffsetsForTable(Table table, String branch) { Snapshot snapshot = latestSnapshot(table, branch); - while (snapshot != null) { - Map summary = snapshot.summary(); - String value = summary.get(snapshotOffsetsProp); - if (value != null) { - TypeReference> typeRef = new TypeReference>() {}; - try { - return MAPPER.readValue(value, typeRef); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - Long parentSnapshotId = snapshot.parentId(); - snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + + if (snapshot == null) { + return Map.of(); + } + + Iterable branchAncestry = + SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot); + return lastCommittedOffsets(branchAncestry); + } + + private Map lastCommittedOffsets(Iterable snapshots) { + return Streams.stream(snapshots) + .filter(Objects::nonNull) + .filter(snapshot -> snapshot.summary().containsKey(snapshotOffsetsProp)) + .map(snapshot -> snapshot.summary().get(snapshotOffsetsProp)) + .map(this::parseOffsets) + .findFirst() + .orElseGet(Map::of); + } + + private Map parseOffsets(String value) { + if (value == null) { + return Map.of(); + } + + TypeReference> typeRef = new TypeReference>() {}; + try { + return MAPPER.readValue(value, typeRef); + } catch (IOException e) { + throw new UncheckedIOException(e); } - return ImmutableMap.of(); } void terminate() { diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java index 117a5258d3d2..63bde7c0c3ff 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.IOException; @@ -92,7 +93,7 @@ private InMemoryCatalog initInMemoryCatalog() { @BeforeEach @SuppressWarnings("deprecation") public void before() { - catalog = initInMemoryCatalog(); + catalog = spy(initInMemoryCatalog()); catalog.createNamespace(NAMESPACE); table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index 4b1a878e5692..60a085781171 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -32,6 +32,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.connect.events.AvroUtil; import org.apache.iceberg.connect.events.CommitComplete; import org.apache.iceberg.connect.events.CommitToTable; @@ -253,4 +254,44 @@ public void testCoordinatorCommittedOffsetMerging() { assertThat(table.currentSnapshot().summary()) .containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3,\"1\":7}"); } + + @Test + public void testCoordinatorCommittedOffsetValidation() { + // This test demonstrates that the Coordinator's validateAndCommit method + // prevents commits when another independent commit has updated the offsets + // during the commit process + + // Set the initial offsets + table + .newAppend() + .appendFile(EventTestUtil.createDataFile()) + .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":1}") + .commit(); + + Table frozenTable = catalog.loadTable(TABLE_IDENTIFIER); + + // return the original table state on the first load, so that the update will happen + // during the commit refresh + when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(frozenTable).thenCallRealMethod(); + + // Independently update the offsets + table + .newAppend() + .appendFile(EventTestUtil.createDataFile()) + .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}") + .commit(); + + assertThat(table.snapshots()).hasSize(2); + Snapshot firstSnapshot = table.currentSnapshot(); + assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); + + // Trigger commit to the table + coordinatorTest( + ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), EventTestUtil.now()); + + // Assert that the table was not updated and offsets remain + table.refresh(); + assertThat(table.snapshots()).hasSize(2); + assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); + } }