From b5e3d0545def79db8a2df1b5e0ecf7b04b41362e Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 11 Jun 2024 19:49:54 +0200 Subject: [PATCH] Fail deduplication if event key is null --- .../server/iceberg/IcebergChangeEvent.java | 4 +-- .../tableoperator/IcebergTableOperator.java | 11 +++++-- .../IcebergTableOperatorTest.java | 32 ++++++++++++++++--- .../IcebergChangeEventBuilder.java | 2 +- 4 files changed, 39 insertions(+), 10 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index c9187e96..ebe8b70d 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -53,7 +53,7 @@ public IcebergChangeEvent(String destination, byte[] valueData, byte[] keyData) } public JsonNode key() { - if (key == null) { + if (key == null && keyData != null) { key = keyDeserializer.deserialize(destination, keyData); } @@ -61,7 +61,7 @@ public JsonNode key() { } public JsonNode value() { - if (value == null) { + if (value == null && valueData != null) { value = valDeserializer.deserialize(destination, valueData); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 295e486f..8974668a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -56,15 +56,20 @@ protected List deduplicateBatch(List eve ConcurrentHashMap deduplicatedEvents = new ConcurrentHashMap<>(); - events.forEach(e -> - // deduplicate using key(PK) + events.forEach(e -> { + if (e.key() == null || e.key().isNull()) { + throw new DebeziumException("Cannot deduplicate data with null key! destination:'" + e.destination() + "' event: '" + e.value().toString() + "'"); + } + + // deduplicate using key(PK) deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> { if (this.compareByTsThenOp(oldValue.value(), newValue.value()) <= 0) { return newValue; } else { return oldValue; } - }) + }); + } ); return new ArrayList<>(deduplicatedEvents.values()); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java index 871d3909..ea9b0d1c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java @@ -8,6 +8,7 @@ package io.debezium.server.iceberg.tableoperator; +import io.debezium.DebeziumException; import io.debezium.server.iceberg.IcebergChangeEvent; import io.debezium.server.iceberg.IcebergUtil; import io.debezium.server.iceberg.testresources.BaseSparkTest; @@ -15,10 +16,6 @@ import io.debezium.server.iceberg.testresources.S3Minio; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; - -import java.util.ArrayList; -import java.util.List; - import jakarta.inject.Inject; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; @@ -27,8 +24,13 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -148,5 +150,27 @@ public void testDeduplicateBatch() throws Exception { List dedups2 = icebergTableOperator.deduplicateBatch(records2); Assertions.assertEquals(1, dedups2.size()); Assertions.assertEquals("u", dedups2.get(0).value().get("__op").asText("x")); + + // deduplicating wth null key should fail! + IcebergChangeEvent e31 = new IcebergChangeEventBuilder() + .destination("destination") + .addField("id", 3) + .addField("__op", "r") + .addField("__source_ts_ms", 1L) + .build(); + IcebergChangeEvent e32 = new IcebergChangeEventBuilder() + .destination("destination") + .addField("id", 3) + .addField("__op", "u") + .addField("__source_ts_ms", 1L) + .build(); + + List records3 = List.of(e31, e32); + DebeziumException thrown = assertThrows(DebeziumException.class, + () -> { + icebergTableOperator.deduplicateBatch(records3); + }); + + Assertions.assertTrue(thrown.getMessage().contains("Cannot deduplicate data with null key!")); } } \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/IcebergChangeEventBuilder.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/IcebergChangeEventBuilder.java index 36b2766e..99d5b4fa 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/IcebergChangeEventBuilder.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/IcebergChangeEventBuilder.java @@ -112,7 +112,7 @@ public IcebergChangeEvent build() { "} ").getBytes(StandardCharsets.UTF_8), ("{" + "\"schema\":" + this.keySchema() + "," + - "\"payload\":" + keyPayload.toString() + + "\"payload\":" + (keyPayload.isEmpty() ? "null" : keyPayload.toString()) + "} ").getBytes(StandardCharsets.UTF_8) ); }