From c62bd7ba1cb1cd090b242e5a92aa60c357054924 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 7 Nov 2025 10:22:25 -0800 Subject: [PATCH 1/4] Kafka Connect: validate offsets for refrehsed table state on commit --- .../iceberg/connect/channel/Coordinator.java | 81 +++++++++++++++---- .../connect/channel/ChannelTestBase.java | 3 +- .../connect/channel/TestCoordinator.java | 42 ++++++++++ 3 files changed, 108 insertions(+), 18 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index 30ae5f33c71e..eca8f8f8254c 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -41,6 +42,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotAncestryValidator; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -52,9 +54,10 @@ import org.apache.iceberg.connect.events.StartCommit; import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.connect.errors.ConnectException; @@ -251,7 +254,8 @@ private void commitToTable( } else { String taskId = String.format("%s-%s", config.connectorName(), config.taskId()); if (deleteFiles.isEmpty()) { - AppendFiles appendOp = table.newAppend(); + AppendFiles appendOp = + table.newAppend().validateWith(offsetValidator(tableIdentifier, committedOffsets)); if (branch != null) { appendOp.toBranch(branch); } @@ -264,7 +268,8 @@ private void commitToTable( dataFiles.forEach(appendOp::appendFile); appendOp.commit(); } else { - RowDelta deltaOp = table.newRowDelta(); + RowDelta deltaOp = + table.newRowDelta().validateWith(offsetValidator(tableIdentifier, committedOffsets)); if (branch != null) { deltaOp.toBranch(branch); } @@ -296,6 +301,32 @@ private void commitToTable( } } + private SnapshotAncestryValidator offsetValidator( + TableIdentifier tableIdentifier, Map expectedOffsets) { + + return new SnapshotAncestryValidator() { + private Map lastCommittedOffsets; + + @Override + public Boolean apply(Iterable baseSnapshots) { + lastCommittedOffsets = lastCommittedOffsets(baseSnapshots); + + if (expectedOffsets.isEmpty() && lastCommittedOffsets.isEmpty()) { + return true; // there are no stored offsets, so assume we're starting with new offsets + } + + return expectedOffsets.equals(lastCommittedOffsets); + } + + @Override + public String errorMessage() { + return String.format( + "Latest offsets do not match expected offsets for this commit. Table: %s, Expected: %s, Last Committed: %s", + tableIdentifier, expectedOffsets, lastCommittedOffsets); + } + }; + } + private Predicate distinctByKey(Function keyExtractor) { Map seen = Maps.newConcurrentMap(); return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; @@ -310,21 +341,37 @@ private Snapshot latestSnapshot(Table table, String branch) { private Map lastCommittedOffsetsForTable(Table table, String branch) { Snapshot snapshot = latestSnapshot(table, branch); - while (snapshot != null) { - Map summary = snapshot.summary(); - String value = summary.get(snapshotOffsetsProp); - if (value != null) { - TypeReference> typeRef = new TypeReference>() {}; - try { - return MAPPER.readValue(value, typeRef); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - Long parentSnapshotId = snapshot.parentId(); - snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + + if (snapshot == null) { + return Map.of(); + } + + Iterable branchAncestry = + SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot); + return lastCommittedOffsets(branchAncestry); + } + + private Map lastCommittedOffsets(Iterable snapshots) { + return Streams.stream(snapshots) + .filter(Objects::nonNull) + .filter(snapshot -> snapshot.summary().containsKey(snapshotOffsetsProp)) + .map(snapshot -> snapshot.summary().get(snapshotOffsetsProp)) + .map(this::parseOffsets) + .findFirst() + .orElseGet(Map::of); + } + + private Map parseOffsets(String value) { + if (value == null) { + return Map.of(); + } + + TypeReference> typeRef = new TypeReference>() {}; + try { + return MAPPER.readValue(value, typeRef); + } catch (IOException e) { + throw new UncheckedIOException(e); } - return ImmutableMap.of(); } void terminate() { diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java index 117a5258d3d2..63bde7c0c3ff 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/ChannelTestBase.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.IOException; @@ -92,7 +93,7 @@ private InMemoryCatalog initInMemoryCatalog() { @BeforeEach @SuppressWarnings("deprecation") public void before() { - catalog = initInMemoryCatalog(); + catalog = spy(initInMemoryCatalog()); catalog.createNamespace(NAMESPACE); table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index 4b1a878e5692..c9c5b4edfb29 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -32,6 +32,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.connect.events.AvroUtil; import org.apache.iceberg.connect.events.CommitComplete; import org.apache.iceberg.connect.events.CommitToTable; @@ -253,4 +254,45 @@ public void testCoordinatorCommittedOffsetMerging() { assertThat(table.currentSnapshot().summary()) .containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3,\"1\":7}"); } + + @Test + public void testCoordinatorCommittedOffsetValidation() { + // This test demonstrates that the Coordinator's validateAndCommit method + // prevents commits when another independent commit has updated the offsets + // during the commit process + + // Set the initial offsets + table + .newAppend() + .appendFile(EventTestUtil.createDataFile()) + .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":1}") + .commit(); + + Table frozenTable = catalog.loadTable(TABLE_IDENTIFIER); + + // return the original table state on the first load, so that the update will happen + // during the commit refresh + when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(frozenTable).thenCallRealMethod(); + + // Independently update the offsets + table + .newAppend() + .appendFile(EventTestUtil.createDataFile()) + .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}") + .commit(); + + table.refresh(); + assertThat(table.snapshots()).hasSize(2); + Snapshot firstSnapshot = table.currentSnapshot(); + assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); + + // Trigger commit to the table + coordinatorTest( + ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), EventTestUtil.now()); + + // Assert that the table was not updated and offsets remain + table.refresh(); + assertThat(table.snapshots()).hasSize(2); + assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); + } } From a1972d1a0b855e2d53696cf52e562d3abd3fde7e Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 7 Nov 2025 13:50:51 -0800 Subject: [PATCH 2/4] Address comments --- .../org/apache/iceberg/connect/channel/Coordinator.java | 4 ---- .../apache/iceberg/connect/channel/TestCoordinator.java | 8 ++++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index eca8f8f8254c..3a7fecfafdb1 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -311,10 +311,6 @@ private SnapshotAncestryValidator offsetValidator( public Boolean apply(Iterable baseSnapshots) { lastCommittedOffsets = lastCommittedOffsets(baseSnapshots); - if (expectedOffsets.isEmpty() && lastCommittedOffsets.isEmpty()) { - return true; // there are no stored offsets, so assume we're starting with new offsets - } - return expectedOffsets.equals(lastCommittedOffsets); } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index c9c5b4edfb29..2fd1092ad07a 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -18,10 +18,6 @@ */ package org.apache.iceberg.connect.channel; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import java.time.OffsetDateTime; import java.util.List; import java.util.UUID; @@ -51,6 +47,10 @@ import org.apache.kafka.connect.sink.SinkTaskContext; import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestCoordinator extends ChannelTestBase { @Test From 65c81e6b67d39575e0bd32ea581f8b221d28add8 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 7 Nov 2025 13:55:29 -0800 Subject: [PATCH 3/4] Spotless --- .../apache/iceberg/connect/channel/TestCoordinator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index 2fd1092ad07a..c9c5b4edfb29 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -18,6 +18,10 @@ */ package org.apache.iceberg.connect.channel; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.time.OffsetDateTime; import java.util.List; import java.util.UUID; @@ -47,10 +51,6 @@ import org.apache.kafka.connect.sink.SinkTaskContext; import org.junit.jupiter.api.Test; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class TestCoordinator extends ChannelTestBase { @Test From d02db030753d2fd057b6c6e98c928e2913e151d9 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 7 Nov 2025 15:45:40 -0800 Subject: [PATCH 4/4] Address comments --- .../java/org/apache/iceberg/connect/channel/Coordinator.java | 2 +- .../org/apache/iceberg/connect/channel/TestCoordinator.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index 3a7fecfafdb1..5c516311bd4c 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -317,7 +317,7 @@ public Boolean apply(Iterable baseSnapshots) { @Override public String errorMessage() { return String.format( - "Latest offsets do not match expected offsets for this commit. Table: %s, Expected: %s, Last Committed: %s", + "Cannot commit to %s, stale offsets: Expected: %s Committed: %s", tableIdentifier, expectedOffsets, lastCommittedOffsets); } }; diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index c9c5b4edfb29..60a085781171 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -281,7 +281,6 @@ public void testCoordinatorCommittedOffsetValidation() { .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}") .commit(); - table.refresh(); assertThat(table.snapshots()).hasSize(2); Snapshot firstSnapshot = table.currentSnapshot(); assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}");