From 12c4643957d405562c8dd8c4e5280791b644f520 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 15 Jun 2024 16:17:33 +0200 Subject: [PATCH] Allow neste fields to be set as identifier --- .../server/iceberg/IcebergChangeEvent.java | 62 +++-- .../iceberg/IcebergChangeEventSchemaData.java | 4 +- .../tableoperator/BaseDeltaTaskWriter.java | 10 +- .../IcebergChangeEventSchemaDataTest.java | 19 +- .../iceberg/IcebergChangeEventTest.java | 38 +-- .../serde-unnested-order-key-withschema.json | 17 ++ .../serde-unnested-order-val-withschema.json | 257 ++++++++++++++++++ 7 files changed, 354 insertions(+), 53 deletions(-) create mode 100644 debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-order-key-withschema.json create mode 100644 debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-order-val-withschema.json diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 7487cfe6..e1016de9 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.debezium.DebeziumException; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; @@ -234,14 +235,15 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode switch (fieldType) { case "struct": int rootStructId = schemaData.nextFieldId().getAndIncrement(); - IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepNextFieldId(); + final IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId(); for (JsonNode subFieldSchema : fieldSchema.get("fields")) { String subFieldName = subFieldSchema.get("field").textValue(); JsonNode equivalentNestedKeyField = findNodeFieldByName(subFieldName, keySchemaNode); debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData, equivalentNestedKeyField); } // create it as struct, nested type - schemaData.fields().add(Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subSchemaData.fields()))); + final Types.NestedField structField = Types.NestedField.of(rootStructId, !isPkField, fieldName, Types.StructType.of(subSchemaData.fields())); + schemaData.fields().add(structField); return schemaData; case "map": if (isPkField) { @@ -250,12 +252,12 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode int rootMapId = schemaData.nextFieldId().getAndIncrement(); int keyFieldId = schemaData.nextFieldId().getAndIncrement(); int valFieldId = schemaData.nextFieldId().getAndIncrement(); - IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepNextFieldId(); + final IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId(); debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData, null); schemaData.nextFieldId().incrementAndGet(); - IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepNextFieldId(); + final IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId(); debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData, null); - Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type()); + final Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type()); schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField)); return schemaData; @@ -264,14 +266,14 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode throw new DebeziumException("Cannot set array field '" + fieldName + "' as a identifier field, array types are not supported as an identifier field!"); } int rootArrayId = schemaData.nextFieldId().getAndIncrement(); - IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepNextFieldId(); + final IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId(); debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData, null); - Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type()); + final Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type()); schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField)); return schemaData; default: // its primitive field - Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType)); + final Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType)); schemaData.fields().add(field); if (isPkField) schemaData.identifierFieldIds().add(field.fieldId()); return schemaData; @@ -303,13 +305,38 @@ private static JsonNode findNodeFieldByName(String fieldName, JsonNode node) { return null; } + /*** + * Converts debezium event fields to iceberg equivalent and returns list of iceberg fields. + * @param schemaNode + * @return + */ + private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode, IcebergChangeEventSchemaData schemaData) { + LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode); + for (JsonNode field : getNodeFieldsArray(schemaNode)) { + String fieldName = field.get("field").textValue(); + JsonNode equivalentKeyFieldNode = findNodeFieldByName(fieldName, keySchemaNode); + debeziumFieldToIcebergField(field, fieldName, schemaData, equivalentKeyFieldNode); + } + + return schemaData; + } + private Schema icebergSchema(boolean isUnwrapped) { if (this.valueSchema.isNull()) { throw new RuntimeException("Failed to get schema from debezium event, event schema is null"); } - final IcebergChangeEventSchemaData schemaData = icebergSchemaFields(valueSchema, keySchema); + IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData(); + if (!isUnwrapped && keySchema != null) { + // NOTE: events re not unwrapped, align schema with event schema, so then we can scan event and key schemas synchronously + ObjectNode nestedKeySchema = mapper.createObjectNode(); + nestedKeySchema.put("type", "struct"); + nestedKeySchema.putArray("fields").add(((ObjectNode) keySchema).put("field", "after")); + icebergSchemaFields(valueSchema, nestedKeySchema, schemaData); + } else { + icebergSchemaFields(valueSchema, keySchema, schemaData); + } if (schemaData.fields().isEmpty()) { throw new RuntimeException("Failed to get schema from debezium event, event schema has no fields!"); @@ -320,23 +347,6 @@ private Schema icebergSchema(boolean isUnwrapped) { } - /*** - * Converts debezium event fields to iceberg equivalent and returns list of iceberg fields. - * @param schemaNode - * @return - */ - private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode) { - IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData(); - LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode); - for (JsonNode field : getNodeFieldsArray(schemaNode)) { - String fieldName = field.get("field").textValue(); - JsonNode equivalentKeyFieldNode = findNodeFieldByName(fieldName, keySchemaNode); - debeziumFieldToIcebergField(field, fieldName, schemaData, equivalentKeyFieldNode); - } - - return schemaData; - } - private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) { switch (fieldType) { case "int8": diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java index 00196ca9..6b42ebda 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java @@ -20,8 +20,8 @@ public IcebergChangeEventSchemaData() { this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(1)); } - public IcebergChangeEventSchemaData copyKeepNextFieldId() { - return new IcebergChangeEventSchemaData(new ArrayList<>(), new HashSet<>(), this.nextFieldId); + public IcebergChangeEventSchemaData copyKeepIdentifierFieldIdsAndNextFieldId() { + return new IcebergChangeEventSchemaData(new ArrayList<>(), this.identifierFieldIds, this.nextFieldId); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java index 6718516e..484bbd10 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java @@ -1,8 +1,6 @@ package io.debezium.server.iceberg.tableoperator; -import java.io.IOException; -import java.util.List; - +import com.google.common.collect.Sets; import org.apache.iceberg.*; import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.Record; @@ -10,9 +8,11 @@ import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import java.io.IOException; +import java.util.List; + abstract class BaseDeltaTaskWriter extends BaseTaskWriter { private final Schema schema; @@ -50,11 +50,13 @@ InternalRecordWrapper wrapper() { @Override public void write(Record row) throws IOException { RowDataDeltaWriter writer = route(row); + // @TODO __op field should not be hardcoded! when unwrapped its __op when not ist op if (upsert && !row.getField("__op").equals("c")) {// anything which not an insert is upsert writer.delete(row); } // if its deleted row and upsertKeepDeletes = true then add deleted record to target table // else deleted records are deleted from target table + // @TODO __op field should not be hardcoded! when unwrapped its __op when not ist op if ( upsertKeepDeletes || !(row.getField("__op").equals("d")))// anything which not an insert is upsert diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventSchemaDataTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventSchemaDataTest.java index dd1f44a6..8fa42fc2 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventSchemaDataTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventSchemaDataTest.java @@ -2,20 +2,31 @@ import org.junit.jupiter.api.Test; +import java.util.Set; + import static org.junit.jupiter.api.Assertions.assertEquals; class IcebergChangeEventSchemaDataTest { @Test - void nextFieldId() { + void testIcebergChangeEventSchemaDataBehaviourAndCloning() { IcebergChangeEventSchemaData test = new IcebergChangeEventSchemaData(5); - test.identifierFieldIds().add(1); + test.identifierFieldIds().add(3); assertEquals(6, test.nextFieldId().incrementAndGet()); + assertEquals(Set.of(3), test.identifierFieldIds()); - IcebergChangeEventSchemaData testSubschemaField = test.copyKeepNextFieldId(); - testSubschemaField.nextFieldId().incrementAndGet(); + // test cloning and then changing nextFieldId is persisting + IcebergChangeEventSchemaData copy = test.copyKeepIdentifierFieldIdsAndNextFieldId(); + assertEquals(6, test.nextFieldId().get()); + copy.nextFieldId().incrementAndGet(); assertEquals(7, test.nextFieldId().get()); + + // test cloning and then changing identifier fields is persisting + assertEquals(Set.of(3), copy.identifierFieldIds()); + copy.identifierFieldIds().add(7); + assertEquals(Set.of(3, 7), test.identifierFieldIds()); + } } \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java index 7eddfa21..4512b272 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java @@ -86,8 +86,6 @@ public void testUnwrapJsonRecord() { }"""); assertEquals(schema.identifierFieldIds(), Set.of()); - System.out.println(schema); - System.out.println(record); } @Test @@ -108,10 +106,6 @@ public void testNestedArrayJsonRecord() { 12: __deleted: optional string }"""); assertEquals(schema.identifierFieldIds(), Set.of()); - System.out.println(schema); - System.out.println(schema.asStruct()); - System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType()); - System.out.println(schema.findField("schedule").type().asListType().elementType()); assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(), "int"); assertEquals(schema.findField("schedule").type().asListType().elementType().toString(), "string"); GenericRecord record = e.asIcebergRecord(schema); @@ -124,7 +118,6 @@ public void testNestedArray2JsonRecord() { IcebergChangeEvent e = new IcebergChangeEvent("test", unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null); Schema schema = e.icebergSchema(); - System.out.println(schema.asStruct()); System.out.println(schema); assertEquals(schema.toString(), """ table { @@ -135,10 +128,6 @@ public void testNestedArray2JsonRecord() { 20: tableChanges: optional list, 29: columns: optional list>>>> }"""); assertEquals(schema.identifierFieldIds(), Set.of()); - System.out.println(schema.findField("tableChanges")); - System.out.println(schema.findField("tableChanges").type().asListType().elementType()); - //GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); - //System.out.println(record); } @Test @@ -147,8 +136,6 @@ public void testNestedGeomJsonRecord() { unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null); Schema schema = e.icebergSchema(); GenericRecord record = e.asIcebergRecord(schema); - //System.out.println(schema); - //System.out.println(record); assertEquals(schema.toString(), """ table { 1: id: optional int @@ -175,18 +162,15 @@ public void valuePayloadWithSchemaAsJsonNode() { final Serde valueSerde = DebeziumSerdes.payloadJson(JsonNode.class); valueSerde.configure(Collections.emptyMap(), false); JsonNode deserializedData = valueSerde.deserializer().deserialize("xx", serdeWithSchema.getBytes()); - System.out.println(deserializedData.getClass().getSimpleName()); - System.out.println(deserializedData.has("payload")); assertEquals(deserializedData.getClass().getSimpleName(), "ObjectNode"); - System.out.println(deserializedData); assertTrue(deserializedData.has("after")); assertTrue(deserializedData.has("op")); assertTrue(deserializedData.has("before")); assertFalse(deserializedData.has("schema")); + assertFalse(deserializedData.has("payload")); valueSerde.configure(Collections.singletonMap("from.field", "schema"), false); JsonNode deserializedSchema = valueSerde.deserializer().deserialize("xx", serdeWithSchema.getBytes()); - System.out.println(deserializedSchema); assertFalse(deserializedSchema.has("schema")); } @@ -240,4 +224,24 @@ public void testIcebergChangeEventSchemaWithKey() { } + @Test + public void testIcebergChangeEventSchemaWithNestedKey() throws IOException { + String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json")); + String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json")); + TestChangeEvent dbzEvent = new TestChangeEvent<>(key, val, "test"); + Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema(); + assertEquals(""" + table { + 1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int> + 7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int> + 13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string> + 31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long> + 35: op: optional string + 36: ts_ms: optional long + 37: ts_us: optional long + 38: ts_ns: optional long + }""", schema.toString()); + assertEquals(Set.of(8), schema.identifierFieldIds()); + } + } diff --git a/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-order-key-withschema.json b/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-order-key-withschema.json new file mode 100644 index 00000000..cdbe0a3d --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-order-key-withschema.json @@ -0,0 +1,17 @@ +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "order_number" + } + ], + "optional": false, + "name": "testc.inventory.orders.Key" + }, + "payload": { + "order_number": 10004 + } +} \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-order-val-withschema.json b/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-order-val-withschema.json new file mode 100644 index 00000000..41bd538c --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/resources/json/serde-unnested-order-val-withschema.json @@ -0,0 +1,257 @@ +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "order_number" + }, + { + "type": "int32", + "optional": false, + "name": "io.debezium.time.Date", + "version": 1, + "field": "order_date" + }, + { + "type": "int32", + "optional": false, + "field": "purchaser" + }, + { + "type": "int32", + "optional": false, + "field": "quantity" + }, + { + "type": "int32", + "optional": false, + "field": "product_id" + } + ], + "optional": true, + "name": "testc.inventory.orders.Value", + "field": "before" + }, + { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "order_number" + }, + { + "type": "int32", + "optional": false, + "name": "io.debezium.time.Date", + "version": 1, + "field": "order_date" + }, + { + "type": "int32", + "optional": false, + "field": "purchaser" + }, + { + "type": "int32", + "optional": false, + "field": "quantity" + }, + { + "type": "int32", + "optional": false, + "field": "product_id" + } + ], + "optional": true, + "name": "testc.inventory.orders.Value", + "field": "after" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "version" + }, + { + "type": "string", + "optional": false, + "field": "connector" + }, + { + "type": "string", + "optional": false, + "field": "name" + }, + { + "type": "int64", + "optional": false, + "field": "ts_ms" + }, + { + "type": "string", + "optional": true, + "name": "io.debezium.data.Enum", + "version": 1, + "parameters": { + "allowed": "true,last,false,incremental" + }, + "default": "false", + "field": "snapshot" + }, + { + "type": "string", + "optional": false, + "field": "db" + }, + { + "type": "string", + "optional": true, + "field": "sequence" + }, + { + "type": "int64", + "optional": false, + "field": "ts_us" + }, + { + "type": "int64", + "optional": false, + "field": "ts_ns" + }, + { + "type": "string", + "optional": true, + "field": "table" + }, + { + "type": "int64", + "optional": false, + "field": "server_id" + }, + { + "type": "string", + "optional": true, + "field": "gtid" + }, + { + "type": "string", + "optional": false, + "field": "file" + }, + { + "type": "int64", + "optional": false, + "field": "pos" + }, + { + "type": "int32", + "optional": false, + "field": "row" + }, + { + "type": "int64", + "optional": true, + "field": "thread" + }, + { + "type": "string", + "optional": true, + "field": "query" + } + ], + "optional": false, + "name": "io.debezium.connector.mysql.Source", + "field": "source" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "id" + }, + { + "type": "int64", + "optional": false, + "field": "total_order" + }, + { + "type": "int64", + "optional": false, + "field": "data_collection_order" + } + ], + "optional": true, + "name": "event.block", + "version": 1, + "field": "transaction" + }, + { + "type": "string", + "optional": false, + "field": "op" + }, + { + "type": "int64", + "optional": true, + "field": "ts_ms" + }, + { + "type": "int64", + "optional": true, + "field": "ts_us" + }, + { + "type": "int64", + "optional": true, + "field": "ts_ns" + } + ], + "optional": false, + "name": "testc.inventory.orders.Envelope", + "version": 2 + }, + "payload": { + "before": null, + "after": { + "order_number": 10004, + "order_date": 16852, + "purchaser": 1003, + "quantity": 1, + "product_id": 107 + }, + "source": { + "version": "2.7.0.Alpha1", + "connector": "mysql", + "name": "testc", + "ts_ms": 1718461481000, + "snapshot": "last_in_data_collection", + "db": "inventory", + "sequence": null, + "ts_us": 1718461481000000, + "ts_ns": 1718461481000000000, + "table": "orders", + "server_id": 0, + "gtid": null, + "file": "mysql-bin.000003", + "pos": 824, + "row": 0, + "thread": null, + "query": null + }, + "transaction": null, + "op": "r", + "ts_ms": 1718461481941, + "ts_us": 1718461481941184, + "ts_ns": 1718461481941184957 + } +}