diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index a5cd3382..12c73309 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -76,7 +76,11 @@ public static List getIcebergSchema(JsonNode eventSchema, Str case "array": JsonNode items = jsonSchemaFieldNode.get("items"); if (items != null && items.has("type")) { - PrimitiveType item = IcebergUtil.getIcebergFieldType(items.get("type").textValue()); + String listItemType = items.get("type").textValue(); + if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) { + throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName); + } + PrimitiveType item = IcebergUtil.getIcebergFieldType(listItemType); schemaColumns.add(Types.NestedField.optional( columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); //throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!"); @@ -104,9 +108,9 @@ public static List getIcebergSchema(JsonNode eventSchema, Str public static boolean hasSchema(JsonNode jsonNode) { return jsonNode != null - && jsonNode.has("schema") - && jsonNode.get("schema").has("fields") - && jsonNode.get("schema").get("fields").isArray(); + && jsonNode.has("schema") + && jsonNode.get("schema").has("fields") + && jsonNode.get("schema").get("fields").isArray(); } public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) { @@ -158,7 +162,7 @@ static Object jsonToGenericRecordVal(Types.NestedField field, try { val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()); } catch (IOException e) { - LOGGER.error("Failed converting '" + field.name() + "' binary value to iceberg record", e); + LOGGER.error("Failed to convert binary value to iceberg value, field:" + field.name(), e); throw new RuntimeException("Failed Processing Event!", e); } break; @@ -197,11 +201,11 @@ public static Map getConfigSubset(Config config, String prefix) } public static List getIcebergFieldsFromEventSchema(byte[] eventVal) { - - if(eventVal == null){ + + if (eventVal == null) { return new ArrayList<>(); } - + try { JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal); if (IcebergUtil.hasSchema(jsonEvent)) { @@ -219,7 +223,7 @@ public static Schema getSchema(List tableColumns, List keyColumns) { Set identifierFieldIds = new HashSet<>(); - + for (Types.NestedField ic : keyColumns) { boolean found = false; @@ -240,7 +244,7 @@ public static Schema getSchema(List tableColumns, } } - + return new Schema(tableColumns, identifierFieldIds); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index bf615825..2a7f0594 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -30,6 +30,7 @@ class TestIcebergUtil { final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json"); final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json"); final String unwrapWithArraySchema = Testing.Files.readResourceAsString("json/serde-with-array.json"); + final String unwrapWithArraySchema2 = Testing.Files.readResourceAsString("json/serde-with-array2.json"); @Test public void testNestedJsonRecord() throws JsonProcessingException { @@ -56,12 +57,34 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException { List schemaFields = IcebergUtil.getIcebergSchema(jsonSchema); Schema schema = new Schema(schemaFields); assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list, 4: schedule: optional list, 6:")); - //System.out.println(schema.asStruct()); + System.out.println(schema.asStruct()); + System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType()); + System.out.println(schema.findField("schedule").type().asListType().elementType()); + assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(),"int"); + assertEquals(schema.findField("schedule").type().asListType().elementType().toString(),"string"); GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); //System.out.println(record); assertTrue( record.toString().contains("[10000, 10001, 10002, 10003]")); } + @Test + public void testNestedArray2JsonRecord() throws JsonProcessingException { + JsonNode jsonData = new ObjectMapper().readTree(unwrapWithArraySchema2); + JsonNode jsonPayload = jsonData.get("payload"); + JsonNode jsonSchema = jsonData.get("schema"); + + assertThrows(RuntimeException.class, () -> { + List schemaFields = IcebergUtil.getIcebergSchema(jsonSchema); + Schema schema = new Schema(schemaFields); + System.out.println(schema.asStruct()); + System.out.println(schema); + System.out.println(schema.findField("tableChanges")); + System.out.println(schema.findField("tableChanges").type().asListType().elementType()); + }); + //GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); + //System.out.println(record); + } + @Test public void testNestedGeomJsonRecord() throws JsonProcessingException { JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema); diff --git a/debezium-server-iceberg-sink/src/test/resources/json/serde-with-array2.json b/debezium-server-iceberg-sink/src/test/resources/json/serde-with-array2.json new file mode 100644 index 00000000..c4b194db --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/resources/json/serde-with-array2.json @@ -0,0 +1,252 @@ +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "version" + }, + { + "type": "string", + "optional": false, + "field": "connector" + }, + { + "type": "string", + "optional": false, + "field": "name" + }, + { + "type": "int64", + "optional": false, + "field": "ts_ms" + }, + { + "type": "string", + "optional": true, + "name": "io.debezium.data.Enum", + "version": 1, + "parameters": { + "allowed": "true,last,false" + }, + "default": "false", + "field": "snapshot" + }, + { + "type": "string", + "optional": false, + "field": "db" + }, + { + "type": "string", + "optional": true, + "field": "sequence" + }, + { + "type": "string", + "optional": true, + "field": "table" + }, + { + "type": "int64", + "optional": false, + "field": "server_id" + }, + { + "type": "string", + "optional": true, + "field": "gtid" + }, + { + "type": "string", + "optional": false, + "field": "file" + }, + { + "type": "int64", + "optional": false, + "field": "pos" + }, + { + "type": "int32", + "optional": false, + "field": "row" + }, + { + "type": "int64", + "optional": true, + "field": "thread" + }, + { + "type": "string", + "optional": true, + "field": "query" + } + ], + "optional": false, + "name": "io.debezium.connector.mysql.Source", + "field": "source" + }, + { + "type": "string", + "optional": true, + "field": "databaseName" + }, + { + "type": "string", + "optional": true, + "field": "schemaName" + }, + { + "type": "string", + "optional": true, + "field": "ddl" + }, + { + "type": "array", + "items": { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "type" + }, + { + "type": "string", + "optional": false, + "field": "id" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": true, + "field": "defaultCharsetName" + }, + { + "type": "array", + "items": { + "type": "string", + "optional": false + }, + "optional": true, + "field": "primaryKeyColumnNames" + }, + { + "type": "array", + "items": { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "name" + }, + { + "type": "int32", + "optional": false, + "field": "jdbcType" + }, + { + "type": "int32", + "optional": true, + "field": "nativeType" + }, + { + "type": "string", + "optional": false, + "field": "typeName" + }, + { + "type": "string", + "optional": true, + "field": "typeExpression" + }, + { + "type": "string", + "optional": true, + "field": "charsetName" + }, + { + "type": "int32", + "optional": true, + "field": "length" + }, + { + "type": "int32", + "optional": true, + "field": "scale" + }, + { + "type": "int32", + "optional": false, + "field": "position" + }, + { + "type": "boolean", + "optional": true, + "field": "optional" + }, + { + "type": "boolean", + "optional": true, + "field": "autoIncremented" + }, + { + "type": "boolean", + "optional": true, + "field": "generated" + } + ], + "optional": false, + "name": "io.debezium.connector.schema.Column" + }, + "optional": false, + "field": "columns" + } + ], + "optional": false, + "name": "io.debezium.connector.schema.Table", + "field": "table" + } + ], + "optional": false, + "name": "io.debezium.connector.schema.Change" + }, + "optional": false, + "field": "tableChanges" + } + ], + "optional": false, + "name": "io.debezium.connector.mysql.SchemaChangeValue" + }, + "payload": { + "source": { + "version": "1.7.0.Final", + "connector": "mysql", + "name": "testc", + "ts_ms": 1638187055631, + "snapshot": "true", + "db": "inventory", + "sequence": null, + "table": "geom", + "server_id": 0, + "gtid": null, + "file": "mysql-bin.000003", + "pos": 154, + "row": 0, + "thread": null, + "query": null + }, + "databaseName": "inventory", + "schemaName": null, + "ddl": "CREATE TABLE `geom` (\n `id` int(11) NOT NULL AUTO_INCREMENT,\n `g` geometry NOT NULL,\n `h` geometry DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=latin1", + "tableChanges": [] + } +}