diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java index 0f979b2f..7eddfa21 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.debezium.serde.DebeziumSerdes; +import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder; import jakarta.inject.Inject; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; @@ -22,6 +23,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; +import java.util.Set; import static org.junit.jupiter.api.Assertions.*; @@ -58,6 +60,7 @@ public void testNestedJsonRecord() { 26: op: optional string 27: ts_ms: optional long }""")); + assertEquals(schema.identifierFieldIds(), Set.of()); } @Test @@ -81,6 +84,8 @@ public void testUnwrapJsonRecord() { 9: __source_ts_ms: optional timestamptz 10: __deleted: optional string }"""); + + assertEquals(schema.identifierFieldIds(), Set.of()); System.out.println(schema); System.out.println(record); } @@ -102,6 +107,7 @@ public void testNestedArrayJsonRecord() { 11: __db: optional string 12: __deleted: optional string }"""); + assertEquals(schema.identifierFieldIds(), Set.of()); System.out.println(schema); System.out.println(schema.asStruct()); System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType()); @@ -128,6 +134,7 @@ public void testNestedArray2JsonRecord() { 19: ddl: optional string 20: tableChanges: optional list, 29: columns: optional list>>>> }"""); + assertEquals(schema.identifierFieldIds(), Set.of()); System.out.println(schema.findField("tableChanges")); System.out.println(schema.findField("tableChanges").type().asListType().elementType()); //GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); @@ -153,6 +160,7 @@ public void testNestedGeomJsonRecord() { 11: __db: optional string 12: __deleted: optional string }"""); + assertEquals(schema.identifierFieldIds(), Set.of()); GenericRecord g = (GenericRecord) record.getField("g"); GenericRecord h = (GenericRecord) record.getField("h"); assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass())); @@ -180,6 +188,55 @@ public void valuePayloadWithSchemaAsJsonNode() { JsonNode deserializedSchema = valueSerde.deserializer().deserialize("xx", serdeWithSchema.getBytes()); System.out.println(deserializedSchema); assertFalse(deserializedSchema.has("schema")); + + } + + @Test + public void testIcebergChangeEventSchemaWithKey() { + TestChangeEvent debeziumEvent = TestChangeEvent.ofCompositeKey("destination", 1, "u", "user1", 2L); + Schema schema = debeziumEvent.toIcebergChangeEvent().icebergSchema(); + assertEquals(schema.toString(), """ + table { + 1: id: required int (id) + 2: first_name: required string (id) + 3: __op: optional string + 4: __source_ts_ms: optional timestamptz + 5: __deleted: optional boolean + }"""); + assertEquals(schema.identifierFieldIds(), Set.of(1, 2)); + + + final IcebergChangeEvent t = new IcebergChangeEventBuilder() + .destination("test") + .addField("first_column", "dummy-value") + .addKeyField("id", 1) + .addKeyField("first_name", "Marx") + .addField("__op", "c") + .addField("__source_ts_ms", 0L) + .addField("__deleted", false) + .build(); + final String key = "{" + + "\"schema\":" + t.changeEventSchema().keySchema() + "," + + "\"payload\":" + t.key() + + "} "; + final String val = "{" + + "\"schema\":" + t.changeEventSchema().valueSchema() + "," + + "\"payload\":" + t.value() + + "} "; + + // test when PK is not first two columns! + TestChangeEvent debeziumEvent2 = new TestChangeEvent<>(key, val, "test"); + Schema schema2 = debeziumEvent2.toIcebergChangeEvent().icebergSchema(); + assertEquals(schema2.toString(), """ + table { + 1: first_column: optional string + 2: id: required int (id) + 3: first_name: required string (id) + 4: __op: optional string + 5: __source_ts_ms: optional timestamptz + 6: __deleted: optional boolean + }"""); + assertEquals(schema2.identifierFieldIds(), Set.of(2, 3)); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java index dd2ea415..fd0ff303 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java @@ -14,10 +14,11 @@ import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder; import io.debezium.server.iceberg.testresources.TestUtil; +import java.nio.charset.StandardCharsets; import java.time.Instant; /** - * helper class used to generate test customer change events + * helper class used to generate test debezium change events * * @author Ismail Simsek */ @@ -37,6 +38,18 @@ public TestChangeEvent(V value) { this(null, value, null); } + public byte[] getKeyBytes() { + return this.key.toString().getBytes(StandardCharsets.UTF_8); + } + + public byte[] getValueBytes() { + return this.value.toString().getBytes(StandardCharsets.UTF_8); + } + + public IcebergChangeEvent toIcebergChangeEvent() { + return new IcebergChangeEvent(this.destination(), this.getValueBytes(), this.getKeyBytes()); + } + public static TestChangeEvent of(String destination, Integer id, String operation, String name, Long epoch) { final IcebergChangeEvent t = new IcebergChangeEventBuilder()