Skip to content

Commit

Permalink
Fail deduplication if event key is null (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 11, 2024
1 parent fcc6e49 commit a47eb1d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public IcebergChangeEvent(String destination, byte[] valueData, byte[] keyData)
}

public JsonNode key() {
if (key == null) {
if (key == null && keyData != null) {
key = keyDeserializer.deserialize(destination, keyData);
}

return key;
}

public JsonNode value() {
if (value == null) {
if (value == null && valueData != null) {
value = valDeserializer.deserialize(destination, valueData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,20 @@ protected List<IcebergChangeEvent> deduplicateBatch(List<IcebergChangeEvent> eve

ConcurrentHashMap<JsonNode, IcebergChangeEvent> deduplicatedEvents = new ConcurrentHashMap<>();

events.forEach(e ->
// deduplicate using key(PK)
events.forEach(e -> {
if (e.key() == null || e.key().isNull()) {
throw new DebeziumException("Cannot deduplicate data with null key! destination:'" + e.destination() + "' event: '" + e.value().toString() + "'");
}

// deduplicate using key(PK)
deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> {
if (this.compareByTsThenOp(oldValue.value(), newValue.value()) <= 0) {
return newValue;
} else {
return oldValue;
}
})
});
}
);

return new ArrayList<>(deduplicatedEvents.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@

package io.debezium.server.iceberg.tableoperator;

import io.debezium.DebeziumException;
import io.debezium.server.iceberg.IcebergChangeEvent;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;

import java.util.ArrayList;
import java.util.List;

import jakarta.inject.Inject;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -27,8 +24,13 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.junit.jupiter.api.Assertions.assertThrows;


/**
Expand Down Expand Up @@ -148,5 +150,27 @@ public void testDeduplicateBatch() throws Exception {
List<IcebergChangeEvent> dedups2 = icebergTableOperator.deduplicateBatch(records2);
Assertions.assertEquals(1, dedups2.size());
Assertions.assertEquals("u", dedups2.get(0).value().get("__op").asText("x"));

// deduplicating wth null key should fail!
IcebergChangeEvent e31 = new IcebergChangeEventBuilder()
.destination("destination")
.addField("id", 3)
.addField("__op", "r")
.addField("__source_ts_ms", 1L)
.build();
IcebergChangeEvent e32 = new IcebergChangeEventBuilder()
.destination("destination")
.addField("id", 3)
.addField("__op", "u")
.addField("__source_ts_ms", 1L)
.build();

List<IcebergChangeEvent> records3 = List.of(e31, e32);
DebeziumException thrown = assertThrows(DebeziumException.class,
() -> {
icebergTableOperator.deduplicateBatch(records3);
});

Assertions.assertTrue(thrown.getMessage().contains("Cannot deduplicate data with null key!"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public IcebergChangeEvent build() {
"} ").getBytes(StandardCharsets.UTF_8),
("{" +
"\"schema\":" + this.keySchema() + "," +
"\"payload\":" + keyPayload.toString() +
"\"payload\":" + (keyPayload.isEmpty() ? "null" : keyPayload.toString()) +
"} ").getBytes(StandardCharsets.UTF_8)
);
}
Expand Down

0 comments on commit a47eb1d

Please sign in to comment.