diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index b8786f259a9c..b1ffda156011 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -94,7 +94,11 @@ public void write(RowData row) throws IOException { writer.delete(row); break; case DELETE: - writer.delete(row); + if (upsert) { + writer.deleteKey(keyProjection.wrap(row)); + } else { + writer.delete(row); + } break; default: diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 916c337bee6f..ccc3c0f23df7 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -324,6 +324,24 @@ public void testChangeLogOnDataKey() throws Exception { expectedRecords); } + @Test + public void testUpsertOnlyDeletesOnDataKey() throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of(row("+I", 1, "aaa")), + ImmutableList.of(row("-D", 1, "aaa"), row("-D", 2, "bbb"))); + + List> expectedRecords = + ImmutableList.of(ImmutableList.of(record(1, "aaa")), ImmutableList.of()); + + testChangeLogs( + ImmutableList.of("data"), + row -> row.getField(ROW_DATA_POS), + true, + elementsPerCheckpoint, + expectedRecords); + } + @Test public void testChangeLogOnIdDataKey() throws Exception { List> elementsPerCheckpoint = diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index b8786f259a9c..b1ffda156011 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -94,7 +94,11 @@ public void write(RowData row) throws IOException { writer.delete(row); break; case DELETE: - writer.delete(row); + if (upsert) { + writer.deleteKey(keyProjection.wrap(row)); + } else { + writer.delete(row); + } break; default: diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 916c337bee6f..5d8a56ebbfe6 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -353,6 +353,24 @@ public void testChangeLogOnIdDataKey() throws Exception { expectedRecords); } + @Test + public void testUpsertOnlyDeletesOnDataKey() throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of(row("+I", 1, "aaa")), + ImmutableList.of(row("-D", 1, "aaa"), row("-D", 2, "bbb"))); + + List> expectedRecords = + ImmutableList.of(ImmutableList.of(record(1, "aaa")), ImmutableList.of()); + + testChangeLogs( + ImmutableList.of("data"), + row -> row.getField(ROW_DATA_POS), + true, + elementsPerCheckpoint, + expectedRecords); + } + @Test public void testChangeLogOnSameKey() throws Exception { List> elementsPerCheckpoint =