diff --git a/README.md b/README.md index f2c1c288..4bf3278c 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ This project adds iceberg consumer to [Debezium Server](https://debezium.io/documentation/reference/operations/debezium-server.html). It could be used to replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform. +More detail available in [Debezium Iceberg Consumer Documentation](docs/DOCS.md) + ![Debezium Iceberg](docs/images/debezium-iceberg.png) # Contributing diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 50ce9e33..c17ec16a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -33,7 +33,7 @@ public class IcebergUtil { public static List getIcebergSchema(JsonNode eventSchema) { LOGGER.debug(eventSchema.toString()); - return getIcebergSchema(eventSchema, "", -1); + return getIcebergSchema(eventSchema, "", 0); } public static List getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) { @@ -80,13 +80,11 @@ public static List getIcebergSchema(JsonNode eventSchema, Str //schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get())); //break; case "struct": - throw new RuntimeException("Field:'" + fieldName + "' has nested data type, " + - "nested data types are not supported by consumer"); -// //recursive call -// Schema subSchema = SchemaUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, ++columnId); -// schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema.columns()))); -// columnId += subSchema.columns().size(); -// break; + // create it as struct, nested type + List subSchema = IcebergUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, columnId); + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); + columnId += subSchema.size(); + break; default: // default to String type schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get())); @@ -122,8 +120,8 @@ public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonN return GenericRecord.create(tableFields).copy(mappedResult); } - private static Object jsonToGenericRecordVal(Types.NestedField field, - JsonNode node) { + static Object jsonToGenericRecordVal(Types.NestedField field, + JsonNode node) { LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); final Object val; switch (field.type().typeId()) { @@ -160,14 +158,10 @@ private static Object jsonToGenericRecordVal(Types.NestedField field, val = jsonObjectMapper.convertValue(node, Map.class); break; case STRUCT: - throw new RuntimeException("Cannot process recursive records!"); - // Disabled because cannot recursively process StructType/NestedField -// // recursive call to get nested data/record -// Types.StructType nestedField = Types.StructType.of(field); -// GenericRecord r = getIcebergRecord(schema, nestedField, node.get(field.name())); -// return r); -// // throw new RuntimeException("Cannot process recursive record!"); -// break; + // create it as struct, nested type + // recursive call to get nested data/record + val = getIcebergRecord(field.type().asStructType(), node); + break; default: // default to String type val = node.asText(null); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index 4ea5ebde..f1faa470 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -38,7 +38,7 @@ public ConfigSource() { // ==== configure batch behaviour/size ==== // Positive integer value that specifies the maximum size of each batch of events that should be processed during // each iteration of this connector. Defaults to 2048. - config.put("debezium.source.max.batch.size", "2"); + config.put("debezium.source.max.batch.size", "1255"); // Positive integer value that specifies the number of milliseconds the connector should wait for new change // events to appear before it starts processing a batch of events. Defaults to 1000 milliseconds, or 1 second. config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds! @@ -68,8 +68,7 @@ public ConfigSource() { config.put("debezium.source.offset.flush.interval.ms", "60000"); config.put("debezium.source.database.server.name", "testc"); config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); - config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," + - "inventory.table_datatypes,inventory.test_date_table"); + config.put("debezium.source.table.whitelist", "inventory.*"); config.put("%postgresql.debezium.source.database.whitelist", "inventory"); config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); config.put("debezium.source.include.schema.changes", "false"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 273d0e68..9fb70800 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -220,4 +220,17 @@ public void testSimpleUpload() { } }); } + + @Test + public void testGeomData() { + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.geom"); + ds.show(false); + return ds.count() >= 3; + } catch (Exception e) { + return false; + } + }); + } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index 3a1fb0e0..d706be38 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -28,11 +28,12 @@ class TestIcebergUtil { final String serdeWithSchema = Testing.Files.readResourceAsString("json/serde-with-schema.json"); final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json"); + final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json"); @Test - public void testNestedJsonRecord() { - Exception exception = assertThrows(Exception.class, () -> IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema"))); - assertTrue(exception.getMessage().contains("nested data type")); + public void testNestedJsonRecord() throws JsonProcessingException { + List d = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema")); + assertTrue(d.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, 4:")); } @Test @@ -46,6 +47,25 @@ public void testUnwrapJsonRecord() throws IOException { assertEquals(16850, record.getField("order_date")); } + @Test + public void testNestedGeomJsonRecord() throws JsonProcessingException { + JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema); + JsonNode jsonPayload = jsonData.get("payload"); + JsonNode jsonSchema = jsonData.get("schema"); + List schemaFields = IcebergUtil.getIcebergSchema(jsonSchema); + Schema schema = new Schema(schemaFields); + System.out.println(schema); + assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>")); + + GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); + GenericRecord g = (GenericRecord) record.getField("g"); + GenericRecord h = (GenericRecord) record.getField("h"); + assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass())); + assertEquals(123, g.get(1, Types.IntegerType.get().typeId().javaClass())); + assertEquals("Record(null, null)", h.toString()); + assertNull(h.get(0, Types.BinaryType.get().typeId().javaClass())); + } + @Test public void valuePayloadWithSchemaAsJsonNode() { // testing Debezium deserializer diff --git a/debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json b/debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json new file mode 100644 index 00000000..b8cbb0e4 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json @@ -0,0 +1,93 @@ +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "default": 0, + "field": "id" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "wkb" + }, + { + "type": "int32", + "optional": true, + "field": "srid" + } + ], + "optional": true, + "name": "io.debezium.data.geometry.Geometry", + "version": 1, + "doc": "Geometry", + "field": "g" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "wkb" + }, + { + "type": "int32", + "optional": true, + "field": "srid" + } + ], + "optional": true, + "name": "io.debezium.data.geometry.Geometry", + "version": 1, + "doc": "Geometry", + "field": "h" + }, + { + "type": "string", + "optional": true, + "field": "__op" + }, + { + "type": "string", + "optional": true, + "field": "__table" + }, + { + "type": "int64", + "optional": true, + "field": "__source_ts_ms" + }, + { + "type": "string", + "optional": true, + "field": "__db" + }, + { + "type": "string", + "optional": true, + "field": "__deleted" + } + ], + "optional": false, + "name": "testc.inventory.geom.Value" + }, + "payload": { + "id": 1, + "g": { + "wkb": "AQEAAAAAAAAAAADwPwAAAAAAAPA/", + "srid": 123 + }, + "h": null, + "__op": "r", + "__table": "geom", + "__source_ts_ms": 1634844424986, + "__db": "postgres", + "__deleted": "false" + } +} diff --git a/docs/DOCS.md b/docs/DOCS.md index 83ecc476..5509afed 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -97,8 +97,7 @@ database table = `inventory.customers` will be replicated to `default.testc_cdc_ ## Debezium Event Flattening -Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported. - +Iceberg consumer requires event flattening. ```properties debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState