From 8159cb5064f50fcd2457bc7a7d1a5eb927973591 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 22 Oct 2021 18:20:27 +0200 Subject: [PATCH 1/8] support nested data types --- .../debezium/server/iceberg/IcebergUtil.java | 30 +++--- .../debezium/server/iceberg/ConfigSource.java | 2 +- .../iceberg/IcebergChangeConsumerTest.java | 13 +++ .../server/iceberg/TestIcebergUtil.java | 22 +++++ .../json/serde-with-schema_geom.json | 93 +++++++++++++++++++ 5 files changed, 141 insertions(+), 19 deletions(-) create mode 100644 debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 50ce9e33..c17ec16a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -33,7 +33,7 @@ public class IcebergUtil { public static List getIcebergSchema(JsonNode eventSchema) { LOGGER.debug(eventSchema.toString()); - return getIcebergSchema(eventSchema, "", -1); + return getIcebergSchema(eventSchema, "", 0); } public static List getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) { @@ -80,13 +80,11 @@ public static List getIcebergSchema(JsonNode eventSchema, Str //schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get())); //break; case "struct": - throw new RuntimeException("Field:'" + fieldName + "' has nested data type, " + - "nested data types are not supported by consumer"); -// //recursive call -// Schema subSchema = SchemaUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, ++columnId); -// schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema.columns()))); -// columnId += subSchema.columns().size(); -// break; + // create it as struct, nested type + List subSchema = IcebergUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, columnId); + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); + columnId += subSchema.size(); + break; default: // default to String type schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get())); @@ -122,8 +120,8 @@ public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonN return GenericRecord.create(tableFields).copy(mappedResult); } - private static Object jsonToGenericRecordVal(Types.NestedField field, - JsonNode node) { + static Object jsonToGenericRecordVal(Types.NestedField field, + JsonNode node) { LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); final Object val; switch (field.type().typeId()) { @@ -160,14 +158,10 @@ private static Object jsonToGenericRecordVal(Types.NestedField field, val = jsonObjectMapper.convertValue(node, Map.class); break; case STRUCT: - throw new RuntimeException("Cannot process recursive records!"); - // Disabled because cannot recursively process StructType/NestedField -// // recursive call to get nested data/record -// Types.StructType nestedField = Types.StructType.of(field); -// GenericRecord r = getIcebergRecord(schema, nestedField, node.get(field.name())); -// return r); -// // throw new RuntimeException("Cannot process recursive record!"); -// break; + // create it as struct, nested type + // recursive call to get nested data/record + val = getIcebergRecord(field.type().asStructType(), node); + break; default: // default to String type val = node.asText(null); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index 4ea5ebde..b89aac41 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -69,7 +69,7 @@ public ConfigSource() { config.put("debezium.source.database.server.name", "testc"); config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," + - "inventory.table_datatypes,inventory.test_date_table"); + "inventory.table_datatypes,inventory.test_date_table,inventory.geom"); config.put("%postgresql.debezium.source.database.whitelist", "inventory"); config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); config.put("debezium.source.include.schema.changes", "false"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 273d0e68..9fb70800 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -220,4 +220,17 @@ public void testSimpleUpload() { } }); } + + @Test + public void testGeomData() { + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.geom"); + ds.show(false); + return ds.count() >= 3; + } catch (Exception e) { + return false; + } + }); + } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index 3a1fb0e0..221cc185 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -12,6 +12,7 @@ import io.debezium.util.Testing; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -28,6 +29,7 @@ class TestIcebergUtil { final String serdeWithSchema = Testing.Files.readResourceAsString("json/serde-with-schema.json"); final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json"); + final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json"); @Test public void testNestedJsonRecord() { @@ -46,6 +48,26 @@ public void testUnwrapJsonRecord() throws IOException { assertEquals(16850, record.getField("order_date")); } + @Test + public void testNestedGeomJsonRecord() throws JsonProcessingException { + JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema); + JsonNode jsonPayload = jsonData.get("payload"); + JsonNode jsonSchema = jsonData.get("schema"); + List schemaFields = IcebergUtil.getIcebergSchema(jsonSchema); + Schema schema = new Schema(schemaFields); + System.out.println(schema); + assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional binary, 4: srid: optional int>")); + + GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); + GenericRecord g = (GenericRecord) record.getField("g"); + GenericRecord h = (GenericRecord) record.getField("h"); + assertEquals(123, g.get(1, Types.IntegerType.get().typeId().javaClass())); + ByteBuffer gwkbBb = (ByteBuffer) g.get(0, Types.BinaryType.get().typeId().javaClass()); + assertEquals("java.nio.HeapByteBuffer[pos=0 lim=21 cap=21]", gwkbBb.toString()); + assertEquals("Record(null, null)", h.toString()); + assertNull(h.get(0, Types.BinaryType.get().typeId().javaClass())); + } + @Test public void valuePayloadWithSchemaAsJsonNode() { // testing Debezium deserializer diff --git a/debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json b/debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json new file mode 100644 index 00000000..41a62fbb --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json @@ -0,0 +1,93 @@ +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "default": 0, + "field": "id" + }, + { + "type": "struct", + "fields": [ + { + "type": "bytes", + "optional": false, + "field": "wkb" + }, + { + "type": "int32", + "optional": true, + "field": "srid" + } + ], + "optional": true, + "name": "io.debezium.data.geometry.Geometry", + "version": 1, + "doc": "Geometry", + "field": "g" + }, + { + "type": "struct", + "fields": [ + { + "type": "bytes", + "optional": false, + "field": "wkb" + }, + { + "type": "int32", + "optional": true, + "field": "srid" + } + ], + "optional": true, + "name": "io.debezium.data.geometry.Geometry", + "version": 1, + "doc": "Geometry", + "field": "h" + }, + { + "type": "string", + "optional": true, + "field": "__op" + }, + { + "type": "string", + "optional": true, + "field": "__table" + }, + { + "type": "int64", + "optional": true, + "field": "__source_ts_ms" + }, + { + "type": "string", + "optional": true, + "field": "__db" + }, + { + "type": "string", + "optional": true, + "field": "__deleted" + } + ], + "optional": false, + "name": "testc.inventory.geom.Value" + }, + "payload": { + "id": 1, + "g": { + "wkb": "AQEAAAAAAAAAAADwPwAAAAAAAPA/", + "srid": 123 + }, + "h": null, + "__op": "r", + "__table": "geom", + "__source_ts_ms": 1634844424986, + "__db": "postgres", + "__deleted": "false" + } +} From 5516f98c54efb8a419b30f70e72e8c01a4c53037 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 22 Oct 2021 18:27:27 +0200 Subject: [PATCH 2/8] support nested data types --- .../test/java/io/debezium/server/iceberg/TestIcebergUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index 221cc185..14c4ff8b 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -63,7 +63,7 @@ public void testNestedGeomJsonRecord() throws JsonProcessingException { GenericRecord h = (GenericRecord) record.getField("h"); assertEquals(123, g.get(1, Types.IntegerType.get().typeId().javaClass())); ByteBuffer gwkbBb = (ByteBuffer) g.get(0, Types.BinaryType.get().typeId().javaClass()); - assertEquals("java.nio.HeapByteBuffer[pos=0 lim=21 cap=21]", gwkbBb.toString()); + assertEquals("java.nio.HeapByteBuffer", gwkbBb.getClass().getName()); assertEquals("Record(null, null)", h.toString()); assertNull(h.get(0, Types.BinaryType.get().typeId().javaClass())); } From 0352d56a4662eb0b235db104d4811660ec6cf985 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 22 Oct 2021 18:29:48 +0200 Subject: [PATCH 3/8] support nested data types --- .../java/io/debezium/server/iceberg/TestIcebergUtil.java | 6 ++---- .../src/test/resources/json/serde-with-schema_geom.json | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index 14c4ff8b..7ea52b31 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -12,7 +12,6 @@ import io.debezium.util.Testing; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -56,14 +55,13 @@ public void testNestedGeomJsonRecord() throws JsonProcessingException { List schemaFields = IcebergUtil.getIcebergSchema(jsonSchema); Schema schema = new Schema(schemaFields); System.out.println(schema); - assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional binary, 4: srid: optional int>")); + assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>")); GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); GenericRecord g = (GenericRecord) record.getField("g"); GenericRecord h = (GenericRecord) record.getField("h"); + assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass())); assertEquals(123, g.get(1, Types.IntegerType.get().typeId().javaClass())); - ByteBuffer gwkbBb = (ByteBuffer) g.get(0, Types.BinaryType.get().typeId().javaClass()); - assertEquals("java.nio.HeapByteBuffer", gwkbBb.getClass().getName()); assertEquals("Record(null, null)", h.toString()); assertNull(h.get(0, Types.BinaryType.get().typeId().javaClass())); } diff --git a/debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json b/debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json index 41a62fbb..b8cbb0e4 100644 --- a/debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json +++ b/debezium-server-iceberg-sink/src/test/resources/json/serde-with-schema_geom.json @@ -12,7 +12,7 @@ "type": "struct", "fields": [ { - "type": "bytes", + "type": "string", "optional": false, "field": "wkb" }, @@ -32,7 +32,7 @@ "type": "struct", "fields": [ { - "type": "bytes", + "type": "string", "optional": false, "field": "wkb" }, From 546d192e04ba4c1902211cc8dfb5f634819c3491 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 22 Oct 2021 18:37:36 +0200 Subject: [PATCH 4/8] support nested data types --- README.md | 2 ++ docs/DOCS.md | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f2c1c288..4bf3278c 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ This project adds iceberg consumer to [Debezium Server](https://debezium.io/documentation/reference/operations/debezium-server.html). It could be used to replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform. +More detail available in [Debezium Iceberg Consumer Documentation](docs/DOCS.md) + ![Debezium Iceberg](docs/images/debezium-iceberg.png) # Contributing diff --git a/docs/DOCS.md b/docs/DOCS.md index 83ecc476..5509afed 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -97,8 +97,7 @@ database table = `inventory.customers` will be replicated to `default.testc_cdc_ ## Debezium Event Flattening -Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported. - +Iceberg consumer requires event flattening. ```properties debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState From 016e10f5a2ce1daa21285ffe0596ba7e0e3c75e1 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 22 Oct 2021 18:41:53 +0200 Subject: [PATCH 5/8] support nested data types --- .../test/java/io/debezium/server/iceberg/ConfigSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index b89aac41..10ce70ae 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -68,8 +68,8 @@ public ConfigSource() { config.put("debezium.source.offset.flush.interval.ms", "60000"); config.put("debezium.source.database.server.name", "testc"); config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); - config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," + - "inventory.table_datatypes,inventory.test_date_table,inventory.geom"); +// config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," + +// "inventory.table_datatypes,inventory.test_date_table,inventory.geom"); config.put("%postgresql.debezium.source.database.whitelist", "inventory"); config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); config.put("debezium.source.include.schema.changes", "false"); From 140e0a149a1882431e848cbeab67fdc61efbffaa Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 22 Oct 2021 18:50:14 +0200 Subject: [PATCH 6/8] support nested data types --- .../java/io/debezium/server/iceberg/TestIcebergUtil.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index 7ea52b31..d706be38 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -31,9 +31,9 @@ class TestIcebergUtil { final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json"); @Test - public void testNestedJsonRecord() { - Exception exception = assertThrows(Exception.class, () -> IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema"))); - assertTrue(exception.getMessage().contains("nested data type")); + public void testNestedJsonRecord() throws JsonProcessingException { + List d = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema")); + assertTrue(d.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, 4:")); } @Test From f9993314e4680660c0d389cc920341b7f83cfc60 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 22 Oct 2021 20:08:19 +0200 Subject: [PATCH 7/8] support nested data types --- .../src/test/java/io/debezium/server/iceberg/ConfigSource.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index 10ce70ae..2cdddc02 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -68,8 +68,7 @@ public ConfigSource() { config.put("debezium.source.offset.flush.interval.ms", "60000"); config.put("debezium.source.database.server.name", "testc"); config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); -// config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," + -// "inventory.table_datatypes,inventory.test_date_table,inventory.geom"); + config.put("debezium.source.table.whitelist", "inventory.*"); config.put("%postgresql.debezium.source.database.whitelist", "inventory"); config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); config.put("debezium.source.include.schema.changes", "false"); From e9863dfc856b72d12c67024ef17f8920640577d0 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 22 Oct 2021 20:37:37 +0200 Subject: [PATCH 8/8] support nested data types --- .../src/test/java/io/debezium/server/iceberg/ConfigSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index 2cdddc02..f1faa470 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -38,7 +38,7 @@ public ConfigSource() { // ==== configure batch behaviour/size ==== // Positive integer value that specifies the maximum size of each batch of events that should be processed during // each iteration of this connector. Defaults to 2048. - config.put("debezium.source.max.batch.size", "2"); + config.put("debezium.source.max.batch.size", "1255"); // Positive integer value that specifies the number of milliseconds the connector should wait for new change // events to appear before it starts processing a batch of events. Defaults to 1000 milliseconds, or 1 second. config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds!