diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 140416c8..b16c6432 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -92,6 +92,8 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String catalogName; @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") boolean upsert; + @ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true") + boolean createIdentifierFields; @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") @@ -175,7 +177,7 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); } try { - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(createIdentifierFields), writeFormat); } catch (Exception e){ throw new DebeziumException("Failed to create table from debezium event schema:"+tableId+" Error:" + e.getMessage(), e); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 38533234..cb3014d2 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -76,8 +76,8 @@ public ChangeEventSchema changeEventSchema() { } } - public Schema icebergSchema() { - return changeEventSchema().icebergSchema(); + public Schema icebergSchema(boolean createIdentifierFields) { + return changeEventSchema().icebergSchema(createIdentifierFields); } public String destination() { @@ -312,19 +312,30 @@ private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaN return schemaData; } - private Schema icebergSchema() { + private Schema icebergSchema(boolean createIdentifierFields) { if (this.valueSchema.isNull()) { throw new RuntimeException("Failed to get schema from debezium event, event schema is null"); } IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData(); - if (!eventsAreUnwrapped && keySchema != null) { - // NOTE: events re not unwrapped, align schema with event schema, so then we can scan event and key schemas synchronously + if (!createIdentifierFields) { + LOGGER.warn("Creating identifier fields is disabled, creating table without identifier field!"); + icebergSchemaFields(valueSchema, null, schemaData); + } else if (!eventsAreUnwrapped && keySchema != null) { ObjectNode nestedKeySchema = mapper.createObjectNode(); nestedKeySchema.put("type", "struct"); nestedKeySchema.putArray("fields").add(((ObjectNode) keySchema).put("field", "after")); icebergSchemaFields(valueSchema, nestedKeySchema, schemaData); + + if (!schemaData.identifierFieldIds().isEmpty()) { + // While Iceberg supports nested key fields, they cannot be set with nested events(unwrapped events, Without event flattening) + // due to inconsistency in the after and before fields. + // For insert events, only the `before` field is NULL, while for delete events after field is NULL. + // This inconsistency prevents using either field as a reliable key. + throw new DebeziumException("Events are unnested, Identifier fields are not supported for unnested events! " + + "Pleas make sure you are using event flattening SMT! or disable identifier field creation!"); + } } else { icebergSchemaFields(valueSchema, keySchema, schemaData); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 3b1423cf..1b6a7b4f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -15,6 +15,7 @@ import jakarta.enterprise.context.Dependent; import jakarta.inject.Inject; import org.apache.iceberg.*; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.BaseTaskWriter; import org.apache.iceberg.io.WriteResult; @@ -46,6 +47,8 @@ public class IcebergTableOperator { String opColumn; @ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true") boolean allowFieldAddition; + @ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true") + boolean createIdentifierFields; @Inject IcebergTableWriterFactory writerFactory; @@ -156,7 +159,7 @@ public void addToTable(Table icebergTable, List events) { for (Map.Entry> schemaEvents : eventsGroupedBySchema.entrySet()) { // extend table schema if new fields found - applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema()); + applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema(createIdentifierFields)); // add set of events to table addToTablePerSchema(icebergTable, schemaEvents.getValue()); } @@ -172,9 +175,11 @@ public void addToTable(Table icebergTable, List events) { */ private void addToTablePerSchema(Table icebergTable, List events) { // Initialize a task writer to write both INSERT and equality DELETE. + final Schema tableSchema = icebergTable.schema(); try (BaseTaskWriter writer = writerFactory.create(icebergTable)) { for (IcebergChangeEvent e : events) { - writer.write(e.asIcebergRecord(icebergTable.schema())); + final GenericRecord record = e.asIcebergRecord(tableSchema); + writer.write(record); } writer.close(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestUnwrapped.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestUnwrapped.java new file mode 100644 index 00000000..4b29546b --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestUnwrapped.java @@ -0,0 +1,132 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg; + +import com.google.common.collect.Lists; +import io.debezium.server.iceberg.testresources.BaseTest; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.debezium.server.iceberg.testresources.SourceMysqlDB; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author Ismail Simsek + */ +@QuarkusTest +@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) +@QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true) +@TestProfile(IcebergChangeConsumerMysqlTestUnwrapped.TestProfile.class) +public class IcebergChangeConsumerMysqlTestUnwrapped extends BaseTest { + + @Test + public void testSimpleUpload() throws Exception { + + // make sure its not unwrapped + assertEquals(IcebergUtil.configIncludesUnwrapSmt(), false); + assertEquals(IcebergChangeEvent.eventsAreUnwrapped, false); + + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + CloseableIterable result = getTableDataV2("testc.inventory.customers"); + printTableData(result); + return Lists.newArrayList(result).size() >= 3; + } catch (Exception e) { + return false; + } + }); + + // test nested data(struct) consumed + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + CloseableIterable result = getTableDataV2("testc.inventory.geom"); + return Lists.newArrayList(result).size() >= 3; + } catch (Exception e) { + return false; + } + }); + + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + CloseableIterable d = getTableDataV2(TableIdentifier.of("debeziumevents", "debezium_offset_storage_table")); + System.out.println(Lists.newArrayList(d)); + return Lists.newArrayList(d).size() == 1; + } catch (Exception e) { + return false; + } + }); + + } + + @Test + public void testDeleteEvents() throws Exception { + + // make sure its not unwrapped + assertEquals(IcebergUtil.configIncludesUnwrapSmt(), false); + assertEquals(IcebergChangeEvent.eventsAreUnwrapped, false); + + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + CloseableIterable result = getTableDataV2("testc.inventory.customers"); + printTableData(result); + return Lists.newArrayList(result).size() >= 4; + } catch (Exception e) { + return false; + } + }); + + SourceMysqlDB.runSQL("ALTER TABLE inventory.addresses DROP FOREIGN KEY addresses_ibfk_1;"); + SourceMysqlDB.runSQL("DELETE FROM inventory.customers where id = 1004 ;"); + + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + CloseableIterable result = getTableDataV2("testc.inventory.customers"); + printTableData(result); + return Lists.newArrayList(result).size() >= 5; + } catch (Exception e) { + return false; + } + }); + } + + + public static class TestProfile implements QuarkusTestProfile { + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("quarkus.profile", "mysql"); + config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector"); + config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); + config.put("debezium.transforms", ","); + config.put("debezium.sink.iceberg.upsert", "false"); + config.put("debezium.sink.iceberg.create-identifier-fields", "false"); + return config; + } + + @Override + public String getConfigProfile() { + return "mysql"; + } + } + +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestUnwraapped.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestUnwraapped.java new file mode 100644 index 00000000..f7efd9de --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestUnwraapped.java @@ -0,0 +1,102 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg; + +import com.google.common.collect.Lists; +import io.debezium.server.iceberg.testresources.BaseSparkTest; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.debezium.server.iceberg.testresources.SourcePostgresqlDB; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import jakarta.inject.Inject; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.awaitility.Awaitility; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Integration test that verifies basic reading from PostgreSQL database and writing to iceberg destination. + * + * @author Ismail Simsek + */ +@QuarkusTest +@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) +@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) +@TestProfile(IcebergChangeConsumerTestUnwraapped.TestProfile.class) +public class IcebergChangeConsumerTestUnwraapped extends BaseSparkTest { + + @Test + public void testSimpleUpload() { + + // make sure its not unwrapped + assertEquals(IcebergUtil.configIncludesUnwrapSmt(), false); + assertEquals(IcebergChangeEvent.eventsAreUnwrapped, false); + + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.customers"); + ds.show(false); + return ds.count() >= 3; + } catch (Exception e) { + return false; + } + }); + + // test nested data(struct) consumed + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.geom"); + ds.show(false); + return ds.count() >= 3; + } catch (Exception e) { + return false; + } + }); + + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + CloseableIterable d = getTableDataV2(TableIdentifier.of("debeziumevents", "debezium_offset_storage_table")); + System.out.println(Lists.newArrayList(d)); + return Lists.newArrayList(d).size() == 1; + } catch (Exception e) { + return false; + } + }); + } + + public static class TestProfile implements QuarkusTestProfile { + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("debezium.sink.iceberg.write.format.default", "orc"); + config.put("debezium.sink.iceberg.destination-regexp", "\\d"); + config.put("debezium.source.hstore.handling.mode", "map"); + config.put("debezium.transforms", ","); + config.put("debezium.sink.iceberg.create-identifier-fields", "false"); + return config; + } + } + +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java index d8a05fda..0f299e45 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java @@ -52,7 +52,7 @@ class IcebergChangeEventTest { public void testNestedJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", serdeWithSchema.getBytes(StandardCharsets.UTF_8), null); - Schema schema = e.icebergSchema(); + Schema schema = e.icebergSchema(true); System.out.println(schema.toString()); assertEquals(schema.toString(), (""" table { @@ -69,7 +69,7 @@ public void testNestedJsonRecord() { public void testUnwrapJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", unwrapWithSchema.getBytes(StandardCharsets.UTF_8), null); - Schema schema = e.icebergSchema(); + Schema schema = e.icebergSchema(true); GenericRecord record = e.asIcebergRecord(schema); assertEquals("orders", record.getField("__table").toString()); assertEquals(16850, record.getField("order_date")); @@ -95,7 +95,7 @@ public void testNestedArrayJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null); - Schema schema = e.icebergSchema(); + Schema schema = e.icebergSchema(true); assertEquals(schema.toString(), """ table { 1: name: optional string @@ -119,7 +119,7 @@ public void testNestedArrayJsonRecord() { public void testNestedArray2JsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null); - Schema schema = e.icebergSchema(); + Schema schema = e.icebergSchema(true); System.out.println(schema); assertEquals(schema.toString(), """ table { @@ -136,7 +136,7 @@ public void testNestedArray2JsonRecord() { public void testNestedGeomJsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null); - Schema schema = e.icebergSchema(); + Schema schema = e.icebergSchema(true); GenericRecord record = e.asIcebergRecord(schema); assertEquals(schema.toString(), """ table { @@ -180,7 +180,7 @@ public void valuePayloadWithSchemaAsJsonNode() { @Test public void testIcebergChangeEventSchemaWithKey() { TestChangeEvent debeziumEvent = TestChangeEvent.ofCompositeKey("destination", 1, "u", "user1", 2L); - Schema schema = debeziumEvent.toIcebergChangeEvent().icebergSchema(); + Schema schema = debeziumEvent.toIcebergChangeEvent().icebergSchema(true); assertEquals(schema.toString(), """ table { 1: id: required int (id) @@ -212,7 +212,7 @@ public void testIcebergChangeEventSchemaWithKey() { // test when PK is not first two columns! TestChangeEvent debeziumEvent2 = new TestChangeEvent<>(key, val, "test"); - Schema schema2 = debeziumEvent2.toIcebergChangeEvent().icebergSchema(); + Schema schema2 = debeziumEvent2.toIcebergChangeEvent().icebergSchema(true); assertEquals(schema2.toString(), """ table { 1: first_column: optional string diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java index 23494a32..94434197 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTestUnwrapped.java @@ -47,11 +47,17 @@ public void testIcebergChangeEventSchemaWithNestedKey() throws IOException { String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json")); String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json")); TestChangeEvent dbzEvent = new TestChangeEvent<>(key, val, "test"); - Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema(); + + Exception exception = assertThrows(RuntimeException.class, () -> { + dbzEvent.toIcebergChangeEvent().icebergSchema(true); + }); + assertTrue(exception.getMessage().contains("Identifier fields are not supported for unnested events")); + + Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema(false); assertEquals(""" table { 1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int> - 7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int> + 7: after: optional struct<8: order_number: optional int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int> 13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string> 31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long> 35: op: optional string @@ -59,8 +65,27 @@ public void testIcebergChangeEventSchemaWithNestedKey() throws IOException { 37: ts_us: optional long 38: ts_ns: optional long }""", schema.toString()); - assertEquals(Set.of(8), schema.identifierFieldIds()); + assertEquals(Set.of(), schema.identifierFieldIds()); } + + @Test + public void testIcebergChangeEventSchemaWithDelete() throws IOException { + + assertFalse(IcebergUtil.configIncludesUnwrapSmt()); + + String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-delete-key-withschema.json")); + String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-delete-val-withschema.json")); + TestChangeEvent dbzEvent = new TestChangeEvent<>(key, val, "test"); + IcebergChangeEvent ie = dbzEvent.toIcebergChangeEvent(); + + Exception exception = assertThrows(RuntimeException.class, () -> { + ie.icebergSchema(true); + }); + assertTrue(exception.getMessage().contains("Identifier fields are not supported for unnested events")); + // print converted event value! + System.out.println(ie.asIcebergRecord(ie.icebergSchema(false))); + } + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 92eb991b..ec74f28f 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -48,6 +48,8 @@ public TestConfigSource() { config.put("debezium.sink.iceberg.catalog-name", ICEBERG_CATALOG_NAME); // use hadoop catalog for tests config.put("debezium.sink.iceberg.type", "hadoop"); + // drop tombstones for delete events + config.put("debezium.source.tombstones.on.delete", "false"); // enable disable schema config.put("debezium.format.value.schemas.enable", "true"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java index 09f1b86a..1ffb7723 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java @@ -50,7 +50,7 @@ public void testIcebergChangeEventBuilder() { .addField("preferences", "feature1", true) .addField("preferences", "feature2", true) .build(); - Assertions.assertTrue(schema1.sameSchema(t.icebergSchema())); + Assertions.assertTrue(schema1.sameSchema(t.icebergSchema(true))); Schema schema2 = new Schema( optional(1, "id", Types.IntegerType.get()), @@ -68,7 +68,7 @@ public void testIcebergChangeEventBuilder() { .addField("preferences", "feature1", true) .addField("preferences", "feature2", true) .build(); - Assertions.assertTrue(schema2.sameSchema(t.icebergSchema())); + Assertions.assertTrue(schema2.sameSchema(t.icebergSchema(true))); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java index ea9b0d1c..ab1eff71 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java @@ -57,7 +57,7 @@ class IcebergTableOperatorTest extends BaseSparkTest { public Table createTable(IcebergChangeEvent sampleEvent) { HadoopCatalog icebergCatalog = getIcebergCatalog(); final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination()); - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(true), writeFormat); } @Test diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseTest.java index 470a6bdf..810ed4ac 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseTest.java @@ -31,6 +31,14 @@ public CloseableIterable getTableDataV2(String table) { return getTableDataV2("debeziumevents", table); } + public void printTableData(CloseableIterable data) { + System.out.println("======================"); + System.out.println(data.iterator().next().struct()); + System.out.println("======================"); + data.forEach(System.out::println); + System.out.println("======================"); + } + public CloseableIterable getTableDataV2(String catalog, String table) { String tableName = "debeziumcdc_" + table.replace(".", "_"); return getTableDataV2(TableIdentifier.of(catalog, tableName)); diff --git a/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-delete-key-withschema.json b/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-delete-key-withschema.json new file mode 100644 index 00000000..6191b1ee --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-delete-key-withschema.json @@ -0,0 +1,17 @@ +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "id" + } + ], + "optional": false, + "name": "testc.inventory.customers.Key" + }, + "payload": { + "id": 1004 + } +} \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-delete-val-withschema.json b/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-delete-val-withschema.json new file mode 100644 index 00000000..c4ac796d --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-delete-val-withschema.json @@ -0,0 +1,242 @@ +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "id" + }, + { + "type": "string", + "optional": false, + "field": "first_name" + }, + { + "type": "string", + "optional": false, + "field": "last_name" + }, + { + "type": "string", + "optional": false, + "field": "email" + } + ], + "optional": true, + "name": "testc.inventory.customers.Value", + "field": "before" + }, + { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "id" + }, + { + "type": "string", + "optional": false, + "field": "first_name" + }, + { + "type": "string", + "optional": false, + "field": "last_name" + }, + { + "type": "string", + "optional": false, + "field": "email" + } + ], + "optional": true, + "name": "testc.inventory.customers.Value", + "field": "after" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "version" + }, + { + "type": "string", + "optional": false, + "field": "connector" + }, + { + "type": "string", + "optional": false, + "field": "name" + }, + { + "type": "int64", + "optional": false, + "field": "ts_ms" + }, + { + "type": "string", + "optional": true, + "name": "io.debezium.data.Enum", + "version": 1, + "parameters": { + "allowed": "true,last,false,incremental" + }, + "default": "false", + "field": "snapshot" + }, + { + "type": "string", + "optional": false, + "field": "db" + }, + { + "type": "string", + "optional": true, + "field": "sequence" + }, + { + "type": "int64", + "optional": false, + "field": "ts_us" + }, + { + "type": "int64", + "optional": false, + "field": "ts_ns" + }, + { + "type": "string", + "optional": true, + "field": "table" + }, + { + "type": "int64", + "optional": false, + "field": "server_id" + }, + { + "type": "string", + "optional": true, + "field": "gtid" + }, + { + "type": "string", + "optional": false, + "field": "file" + }, + { + "type": "int64", + "optional": false, + "field": "pos" + }, + { + "type": "int32", + "optional": false, + "field": "row" + }, + { + "type": "int64", + "optional": true, + "field": "thread" + }, + { + "type": "string", + "optional": true, + "field": "query" + } + ], + "optional": false, + "name": "io.debezium.connector.mysql.Source", + "field": "source" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "id" + }, + { + "type": "int64", + "optional": false, + "field": "total_order" + }, + { + "type": "int64", + "optional": false, + "field": "data_collection_order" + } + ], + "optional": true, + "name": "event.block", + "version": 1, + "field": "transaction" + }, + { + "type": "string", + "optional": false, + "field": "op" + }, + { + "type": "int64", + "optional": true, + "field": "ts_ms" + }, + { + "type": "int64", + "optional": true, + "field": "ts_us" + }, + { + "type": "int64", + "optional": true, + "field": "ts_ns" + } + ], + "optional": false, + "name": "testc.inventory.customers.Envelope", + "version": 2 + }, + "payload": { + "before": { + "id": 1004, + "first_name": "Anne", + "last_name": "Kretchmar", + "email": "annek@noanswer.org" + }, + "after": null, + "source": { + "version": "2.7.0.Alpha1", + "connector": "mysql", + "name": "testc", + "ts_ms": 1718486007000, + "snapshot": "false", + "db": "inventory", + "sequence": null, + "ts_us": 1718486007000000, + "ts_ns": 1718486007000000000, + "table": "customers", + "server_id": 223344, + "gtid": null, + "file": "mysql-bin.000003", + "pos": 632, + "row": 0, + "thread": 13, + "query": null + }, + "transaction": null, + "op": "d", + "ts_ms": 1718486007972, + "ts_us": 1718486007972986, + "ts_ns": 1718486007972986077 + } +}