diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 8c25a45c..9c749953 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -77,13 +77,23 @@ public ChangeEventSchema changeEventSchema() { } public Schema icebergSchema() { - return changeEventSchema().icebergSchema(); + return changeEventSchema().icebergSchema(this.isUnwrapped()); } public String destination() { return destination; } + public boolean isUnwrapped() { + return !( + this.value().has("after") && + this.value().has("source") && + this.value().has("before") && + this.value().get("after").isObject() && + this.value().get("source").isObject() + ); + } + public GenericRecord asIcebergRecord(Schema schema) { return asIcebergRecord(schema.asStruct(), value()); } @@ -266,7 +276,27 @@ public int hashCode() { return Objects.hash(valueSchema, keySchema); } - private Schema icebergSchema() { + private static JsonNode getNodeFieldsArray(JsonNode node) { + if (node != null && !node.isNull() && node.has("fields") && node.get("fields").isArray()) { + return node.get("fields"); + } + + return mapper.createObjectNode(); + } + + private static JsonNode findNodeFieldByName(String fieldName, JsonNode node) { + + for (JsonNode field : getNodeFieldsArray(node)) { + + if (Objects.equals(field.get("field").textValue(), fieldName)) { + return field; + } + } + + return null; + } + + private Schema icebergSchema(boolean isUnwrapped) { if (this.valueSchema.isNull()) { throw new RuntimeException("Failed to get schema from debezium event, event schema is null"); @@ -313,14 +343,13 @@ private Schema icebergSchema() { private static List icebergSchemaFields(JsonNode schemaNode) { List schemaColumns = new ArrayList<>(); AtomicReference fieldId = new AtomicReference<>(1); - if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) { - LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode); - schemaNode.get("fields").forEach(field -> { - Map.Entry df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get()); - fieldId.set(df.getKey() + 1); - schemaColumns.add(df.getValue()); - }); + LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode); + for (JsonNode field : getNodeFieldsArray(schemaNode)) { + Map.Entry df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get()); + fieldId.set(df.getKey() + 1); + schemaColumns.add(df.getValue()); } + return schemaColumns; } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java index df91802e..0f979b2f 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java @@ -50,8 +50,14 @@ public void testNestedJsonRecord() { serdeWithSchema.getBytes(StandardCharsets.UTF_8), null); Schema schema = e.icebergSchema(); System.out.println(schema.toString()); - assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " + - "4:")); + assertEquals(schema.toString(), (""" + table { + 1: before: optional struct<2: id: optional int, 3: first_name: optional string, 4: last_name: optional string, 5: email: optional string> + 6: after: optional struct<7: id: optional int, 8: first_name: optional string, 9: last_name: optional string, 10: email: optional string> + 11: source: optional struct<12: version: optional string, 13: connector: optional string, 14: name: optional string, 15: ts_ms: optional long, 16: snapshot: optional boolean, 17: db: optional string, 18: table: optional string, 19: server_id: optional long, 20: gtid: optional string, 21: file: optional string, 22: pos: optional long, 23: row: optional int, 24: thread: optional long, 25: query: optional string> + 26: op: optional string + 27: ts_ms: optional long + }""")); } @Test @@ -62,6 +68,19 @@ public void testUnwrapJsonRecord() { GenericRecord record = e.asIcebergRecord(schema); assertEquals("orders", record.getField("__table").toString()); assertEquals(16850, record.getField("order_date")); + assertEquals(schema.toString(), """ + table { + 1: id: optional int + 2: order_date: optional int + 3: purchaser: optional int + 4: quantity: optional int + 5: product_id: optional int + 6: __op: optional string + 7: __table: optional string + 8: __lsn: optional long + 9: __source_ts_ms: optional timestamptz + 10: __deleted: optional string + }"""); System.out.println(schema); System.out.println(record); } @@ -72,10 +91,19 @@ public void testNestedArrayJsonRecord() { unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null); Schema schema = e.icebergSchema(); + assertEquals(schema.toString(), """ + table { + 1: name: optional string + 2: pay_by_quarter: optional list + 5: schedule: optional list + 8: __op: optional string + 9: __table: optional string + 10: __source_ts_ms: optional timestamptz + 11: __db: optional string + 12: __deleted: optional string + }"""); System.out.println(schema); System.out.println(schema.asStruct()); - assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list, 5: schedule: optional list, 8:")); - System.out.println(schema.asStruct()); System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType()); System.out.println(schema.findField("schedule").type().asListType().elementType()); assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(), "int"); @@ -92,7 +120,14 @@ public void testNestedArray2JsonRecord() { Schema schema = e.icebergSchema(); System.out.println(schema.asStruct()); System.out.println(schema); - assertTrue(schema.asStruct().toString().contains("20: tableChanges: optional list + 17: databaseName: optional string + 18: schemaName: optional string + 19: ddl: optional string + 20: tableChanges: optional list, 29: columns: optional list>>>> + }"""); System.out.println(schema.findField("tableChanges")); System.out.println(schema.findField("tableChanges").type().asListType().elementType()); //GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); @@ -107,7 +142,17 @@ public void testNestedGeomJsonRecord() { GenericRecord record = e.asIcebergRecord(schema); //System.out.println(schema); //System.out.println(record); - assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>")); + assertEquals(schema.toString(), """ + table { + 1: id: optional int + 2: g: optional struct<3: wkb: optional string, 4: srid: optional int> + 5: h: optional struct<6: wkb: optional string, 7: srid: optional int> + 8: __op: optional string + 9: __table: optional string + 10: __source_ts_ms: optional timestamptz + 11: __db: optional string + 12: __deleted: optional string + }"""); GenericRecord g = (GenericRecord) record.getField("g"); GenericRecord h = (GenericRecord) record.getField("h"); assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass()));