diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 45fe8f30..a5cd3382 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -18,6 +18,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types; import org.eclipse.microprofile.config.Config; import org.slf4j.Logger; @@ -35,6 +36,33 @@ public static List getIcebergSchema(JsonNode eventSchema) { return getIcebergSchema(eventSchema, "", 0); } + public static PrimitiveType getIcebergFieldType(String fieldType) { + switch (fieldType) { + case "int8": + case "int16": + case "int32": // int 4 bytes + return Types.IntegerType.get(); + case "int64": // long 8 bytes + return Types.LongType.get(); + case "float8": + case "float16": + case "float32": // float is represented in 32 bits, + return Types.FloatType.get(); + case "float64": // double is represented in 64 bits + return Types.DoubleType.get(); + case "boolean": + return Types.BooleanType.get(); + case "string": + return Types.StringType.get(); + case "bytes": + return Types.BinaryType.get(); + default: + // default to String type + return Types.StringType.get(); + //throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!"); + } + } + public static List getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) { List schemaColumns = new ArrayList<>(); String schemaType = eventSchema.get("type").textValue(); @@ -45,35 +73,17 @@ public static List getIcebergSchema(JsonNode eventSchema, Str String fieldType = jsonSchemaFieldNode.get("type").textValue(); LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType); switch (fieldType) { - case "int8": - case "int16": - case "int32": // int 4 bytes - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.IntegerType.get())); - break; - case "int64": // long 8 bytes - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.LongType.get())); - break; - case "float8": - case "float16": - case "float32": // float is represented in 32 bits, - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.FloatType.get())); - break; - case "float64": // double is represented in 64 bits - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.DoubleType.get())); - break; - case "boolean": - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.BooleanType.get())); - break; - case "string": - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get())); - break; - case "bytes": - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.BinaryType.get())); - break; case "array": - throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!"); - //schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional())); - //break; + JsonNode items = jsonSchemaFieldNode.get("items"); + if (items != null && items.has("type")) { + PrimitiveType item = IcebergUtil.getIcebergFieldType(items.get("type").textValue()); + schemaColumns.add(Types.NestedField.optional( + columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); + //throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!"); + } else { + throw new RuntimeException("Unexpected Array type for field " + fieldName); + } + break; case "map": throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!"); //schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get())); @@ -84,9 +94,8 @@ public static List getIcebergSchema(JsonNode eventSchema, Str schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); columnId += subSchema.size(); break; - default: - // default to String type - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get())); + default: //primitive types + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, IcebergUtil.getIcebergFieldType(fieldType))); break; } } @@ -154,7 +163,7 @@ static Object jsonToGenericRecordVal(Types.NestedField field, } break; case LIST: - val = jsonObjectMapper.convertValue(node, List.class); + val = jsonObjectMapper.convertValue(node, ArrayList.class); break; case MAP: val = jsonObjectMapper.convertValue(node, Map.class); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index 40e72109..6eb10725 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -61,9 +61,8 @@ public ConfigSource() { config.put("debezium.source.offset.flush.interval.ms", "60000"); config.put("debezium.source.database.server.name", "testc"); config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); - config.put("debezium.source.table.whitelist", "inventory.*"); config.put("%postgresql.debezium.source.database.whitelist", "inventory"); - config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); + config.put("debezium.source.table.whitelist", "inventory.*"); config.put("debezium.source.include.schema.changes", "false"); config.put("quarkus.log.level", "INFO"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java index a951bf99..1873ddbe 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java @@ -21,6 +21,7 @@ public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("quarkus.profile", "mysql"); config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector"); + config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); return config; } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index c394c0bc..8e879800 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -111,6 +111,39 @@ public void testConsumingVariousDataTypes() throws Exception { }); } + @Test + public void testConsumingArrayDataType() throws Exception { + String sql = " DROP TABLE IF EXISTS inventory.array_data;\n" + + " CREATE TABLE IF NOT EXISTS inventory.array_data (\n" + + " name text,\n" + + " pay_by_quarter integer[],\n" + + " schedule text[][]\n" + + " );\n" + + " INSERT INTO inventory.array_data\n" + + " VALUES " + + "('Carol2',\n" + + " ARRAY[20000, 25000, 25000, 25000],\n" + + " ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n"+ + "('Bill',\n" + + " '{10000, 10000, 10000, 10000}',\n" + + " '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" + + " ('Carol1',\n" + + " '{20000, 25000, 25000, 25000}',\n" + + " '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" + + ";"; + SourcePostgresqlDB.runSQL(sql); + + Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> { + try { + Dataset df = getTableData("testc.inventory.array_data"); + df.show(false); + return df.count() >= 3; + } catch (Exception e) { + return false; + } + }); + } + @Test public void testSchemaChanges() throws Exception { // TEST add new columns, drop not null constraint diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index 4670edac..bf615825 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -29,6 +29,7 @@ class TestIcebergUtil { final String serdeWithSchema = Testing.Files.readResourceAsString("json/serde-with-schema.json"); final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json"); final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json"); + final String unwrapWithArraySchema = Testing.Files.readResourceAsString("json/serde-with-array.json"); @Test public void testNestedJsonRecord() throws JsonProcessingException { @@ -47,6 +48,20 @@ public void testUnwrapJsonRecord() throws IOException { assertEquals(16850, record.getField("order_date")); } + @Test + public void testNestedArrayJsonRecord() throws JsonProcessingException { + JsonNode jsonData = new ObjectMapper().readTree(unwrapWithArraySchema); + JsonNode jsonPayload = jsonData.get("payload"); + JsonNode jsonSchema = jsonData.get("schema"); + List schemaFields = IcebergUtil.getIcebergSchema(jsonSchema); + Schema schema = new Schema(schemaFields); + assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list, 4: schedule: optional list, 6:")); + //System.out.println(schema.asStruct()); + GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); + //System.out.println(record); + assertTrue( record.toString().contains("[10000, 10001, 10002, 10003]")); + } + @Test public void testNestedGeomJsonRecord() throws JsonProcessingException { JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema); diff --git a/debezium-server-iceberg-sink/src/test/resources/json/serde-with-array.json b/debezium-server-iceberg-sink/src/test/resources/json/serde-with-array.json new file mode 100644 index 00000000..cc96c0a0 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/resources/json/serde-with-array.json @@ -0,0 +1,75 @@ +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": true, + "field": "name" + }, + { + "type": "array", + "items": { + "type": "int32", + "optional": true + }, + "optional": true, + "field": "pay_by_quarter" + }, + { + "type": "array", + "items": { + "type": "string", + "optional": true + }, + "optional": true, + "field": "schedule" + }, + { + "type": "string", + "optional": true, + "field": "__op" + }, + { + "type": "string", + "optional": true, + "field": "__table" + }, + { + "type": "int64", + "optional": true, + "field": "__source_ts_ms" + }, + { + "type": "string", + "optional": true, + "field": "__db" + }, + { + "type": "string", + "optional": true, + "field": "__deleted" + } + ], + "optional": false, + "name": "testc.inventory.array_data.Value" + }, + "payload": { + "name": "Bill", + "pay_by_quarter": [ + 10000, + 10001, + 10002, + 10003 + ], + "schedule": [ + "[Ljava.lang.String;@508917a0", + "[Ljava.lang.String;@7412bd2" + ], + "__op": "c", + "__table": "array_data", + "__source_ts_ms": 1638128893618, + "__db": "postgres", + "__deleted": "false" + } +}