From a087ca87852a603fbdb004030792c76bbbe5aa5b Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 16 Jun 2024 10:00:13 +0200 Subject: [PATCH] Test consumer without event flattening --- ...ebergChangeConsumerMysqlTestUnwrapped.java | 2 +- .../IcebergChangeConsumerTestUnwraapped.java | 1 + .../iceberg/IcebergChangeEventTest.java | 14 ++++++------- .../IcebergChangeEventTestUnwrapped.java | 20 +++++++++++++++---- .../IcebergChangeEventBuilderTest.java | 4 ++-- .../IcebergTableOperatorTest.java | 2 +- 6 files changed, 28 insertions(+), 15 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestUnwrapped.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestUnwrapped.java index 4ba7b289..4b29546b 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestUnwrapped.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestUnwrapped.java @@ -119,7 +119,7 @@ public Map getConfigOverrides() { config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); config.put("debezium.transforms", ","); config.put("debezium.sink.iceberg.upsert", "false"); - config.put("debezium.sink.iceberg.upsert-keep-deletes", "true"); + config.put("debezium.sink.iceberg.create-identifier-fields", "false"); return config; } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestUnwraapped.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestUnwraapped.java index dcf3088a..f7efd9de 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestUnwraapped.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestUnwraapped.java @@ -94,6 +94,7 @@ public Map getConfigOverrides() { config.put("debezium.sink.iceberg.destination-regexp", "\\d"); config.put("debezium.source.hstore.handling.mode", "map"); config.put("debezium.transforms", ","); + config.put("debezium.sink.iceberg.create-identifier-fields", "false"); return config; } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java index d8a05fda..0f299e45 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java @@ -52,7 +52,7 @@ class IcebergChangeEventTest { public void testNestedJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", serdeWithSchema.getBytes(StandardCharsets.UTF_8), null); - Schema schema = e.icebergSchema(); + Schema schema = e.icebergSchema(true); System.out.println(schema.toString()); assertEquals(schema.toString(), (""" table { @@ -69,7 +69,7 @@ public void testNestedJsonRecord() { public void testUnwrapJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", unwrapWithSchema.getBytes(StandardCharsets.UTF_8), null); - Schema schema = e.icebergSchema(); + Schema schema = e.icebergSchema(true); GenericRecord record = e.asIcebergRecord(schema); assertEquals("orders", record.getField("__table").toString()); assertEquals(16850, record.getField("order_date")); @@ -95,7 +95,7 @@ public void testNestedArrayJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null); - Schema schema = e.icebergSchema(); + Schema schema = e.icebergSchema(true); assertEquals(schema.toString(), """ table { 1: name: optional string @@ -119,7 +119,7 @@ public void testNestedArrayJsonRecord() { public void testNestedArray2JsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null); - Schema schema = e.icebergSchema(); + Schema schema = e.icebergSchema(true); System.out.println(schema); assertEquals(schema.toString(), """ table { @@ -136,7 +136,7 @@ public void testNestedArray2JsonRecord() { public void testNestedGeomJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null); - Schema schema = e.icebergSchema(); + Schema schema = e.icebergSchema(true); GenericRecord record = e.asIcebergRecord(schema); assertEquals(schema.toString(), """ table { @@ -180,7 +180,7 @@ public void valuePayloadWithSchemaAsJsonNode() { @Test public void testIcebergChangeEventSchemaWithKey() { TestChangeEvent debeziumEvent = TestChangeEvent.ofCompositeKey("destination", 1, "u", "user1", 2L); - Schema schema = debeziumEvent.toIcebergChangeEvent().icebergSchema(); + Schema schema = debeziumEvent.toIcebergChangeEvent().icebergSchema(true); assertEquals(schema.toString(), """ table { 1: id: required int (id) @@ -212,7 +212,7 @@ public void testIcebergChangeEventSchemaWithKey() { // test when PK is not first two columns! TestChangeEvent debeziumEvent2 = new TestChangeEvent<>(key, val, "test"); - Schema schema2 = debeziumEvent2.toIcebergChangeEvent().icebergSchema(); + Schema schema2 = debeziumEvent2.toIcebergChangeEvent().icebergSchema(true); assertEquals(schema2.toString(), """ table { 1: first_column: optional string diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java index 8a553a68..9fda8ef6 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java @@ -47,11 +47,17 @@ public void testIcebergChangeEventSchemaWithNestedKey() throws IOException { String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json")); String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json")); TestChangeEvent dbzEvent = new TestChangeEvent<>(key, val, "test"); - Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema(); + + Exception exception = assertThrows(RuntimeException.class, () -> { + dbzEvent.toIcebergChangeEvent().icebergSchema(true); + }); + assertTrue(exception.getMessage().contains("Identifier fields are not supported for untested events")); + + Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema(false); assertEquals(""" table { 1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int> - 7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int> + 7: after: optional struct<8: order_number: optional int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int> 13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string> 31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long> 35: op: optional string @@ -59,7 +65,7 @@ public void testIcebergChangeEventSchemaWithNestedKey() throws IOException { 37: ts_us: optional long 38: ts_ns: optional long }""", schema.toString()); - assertEquals(Set.of(8), schema.identifierFieldIds()); + assertEquals(Set.of(), schema.identifierFieldIds()); } @Test @@ -71,7 +77,13 @@ public void testIcebergChangeEventSchemaWithDelete() throws IOException { String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-delete-val-withschema.json")); TestChangeEvent dbzEvent = new TestChangeEvent<>(key, val, "test"); IcebergChangeEvent ie = dbzEvent.toIcebergChangeEvent(); - System.out.println(ie.asIcebergRecord(ie.icebergSchema())); + + Exception exception = assertThrows(RuntimeException.class, () -> { + ie.icebergSchema(true); + }); + assertTrue(exception.getMessage().contains("Identifier fields are not supported for untested events")); + // print converted event value! + System.out.println(ie.asIcebergRecord(ie.icebergSchema(false))); } public static class TestProfile implements QuarkusTestProfile { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java index 09f1b86a..1ffb7723 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java @@ -50,7 +50,7 @@ public void testIcebergChangeEventBuilder() { .addField("preferences", "feature1", true) .addField("preferences", "feature2", true) .build(); - Assertions.assertTrue(schema1.sameSchema(t.icebergSchema())); + Assertions.assertTrue(schema1.sameSchema(t.icebergSchema(true))); Schema schema2 = new Schema( optional(1, "id", Types.IntegerType.get()), @@ -68,7 +68,7 @@ public void testIcebergChangeEventBuilder() { .addField("preferences", "feature1", true) .addField("preferences", "feature2", true) .build(); - Assertions.assertTrue(schema2.sameSchema(t.icebergSchema())); + Assertions.assertTrue(schema2.sameSchema(t.icebergSchema(true))); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java index ea9b0d1c..ab1eff71 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java @@ -57,7 +57,7 @@ class IcebergTableOperatorTest extends BaseSparkTest { public Table createTable(IcebergChangeEvent sampleEvent) { HadoopCatalog icebergCatalog = getIcebergCatalog(); final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination()); - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(true), writeFormat); } @Test