Skip to content

Commit

Permalink
Support nested data types (#41)
Browse files Browse the repository at this point in the history
* support nested data types
  • Loading branch information
ismailsimsek committed Oct 22, 2021
1 parent 4b575db commit 062f49f
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 26 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
This project adds iceberg consumer to [Debezium Server](https://debezium.io/documentation/reference/operations/debezium-server.html). It could be used to
replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform.

More detail available in [Debezium Iceberg Consumer Documentation](docs/DOCS.md)

![Debezium Iceberg](docs/images/debezium-iceberg.png)

# Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class IcebergUtil {

public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema) {
LOGGER.debug(eventSchema.toString());
return getIcebergSchema(eventSchema, "", -1);
return getIcebergSchema(eventSchema, "", 0);
}

public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
Expand Down Expand Up @@ -80,13 +80,11 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str
//schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
//break;
case "struct":
throw new RuntimeException("Field:'" + fieldName + "' has nested data type, " +
"nested data types are not supported by consumer");
// //recursive call
// Schema subSchema = SchemaUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, ++columnId);
// schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema.columns())));
// columnId += subSchema.columns().size();
// break;
// create it as struct, nested type
List<Types.NestedField> subSchema = IcebergUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default:
// default to String type
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
Expand Down Expand Up @@ -122,8 +120,8 @@ public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonN
return GenericRecord.create(tableFields).copy(mappedResult);
}

private static Object jsonToGenericRecordVal(Types.NestedField field,
JsonNode node) {
static Object jsonToGenericRecordVal(Types.NestedField field,
JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand Down Expand Up @@ -160,14 +158,10 @@ private static Object jsonToGenericRecordVal(Types.NestedField field,
val = jsonObjectMapper.convertValue(node, Map.class);
break;
case STRUCT:
throw new RuntimeException("Cannot process recursive records!");
// Disabled because cannot recursively process StructType/NestedField
// // recursive call to get nested data/record
// Types.StructType nestedField = Types.StructType.of(field);
// GenericRecord r = getIcebergRecord(schema, nestedField, node.get(field.name()));
// return r);
// // throw new RuntimeException("Cannot process recursive record!");
// break;
// create it as struct, nested type
// recursive call to get nested data/record
val = getIcebergRecord(field.type().asStructType(), node);
break;
default:
// default to String type
val = node.asText(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ConfigSource() {
// ==== configure batch behaviour/size ====
// Positive integer value that specifies the maximum size of each batch of events that should be processed during
// each iteration of this connector. Defaults to 2048.
config.put("debezium.source.max.batch.size", "2");
config.put("debezium.source.max.batch.size", "1255");
// Positive integer value that specifies the number of milliseconds the connector should wait for new change
// events to appear before it starts processing a batch of events. Defaults to 1000 milliseconds, or 1 second.
config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds!
Expand Down Expand Up @@ -68,8 +68,7 @@ public ConfigSource() {
config.put("debezium.source.offset.flush.interval.ms", "60000");
config.put("debezium.source.database.server.name", "testc");
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," +
"inventory.table_datatypes,inventory.test_date_table");
config.put("debezium.source.table.whitelist", "inventory.*");
config.put("%postgresql.debezium.source.database.whitelist", "inventory");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
config.put("debezium.source.include.schema.changes", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,17 @@ public void testSimpleUpload() {
}
});
}

@Test
public void testGeomData() {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.geom");
ds.show(false);
return ds.count() >= 3;
} catch (Exception e) {
return false;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
class TestIcebergUtil {
final String serdeWithSchema = Testing.Files.readResourceAsString("json/serde-with-schema.json");
final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json");
final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json");

@Test
public void testNestedJsonRecord() {
Exception exception = assertThrows(Exception.class, () -> IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema")));
assertTrue(exception.getMessage().contains("nested data type"));
public void testNestedJsonRecord() throws JsonProcessingException {
List<Types.NestedField> d = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema"));
assertTrue(d.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, 4:"));
}

@Test
Expand All @@ -46,6 +47,25 @@ public void testUnwrapJsonRecord() throws IOException {
assertEquals(16850, record.getField("order_date"));
}

@Test
public void testNestedGeomJsonRecord() throws JsonProcessingException {
JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema);
JsonNode jsonPayload = jsonData.get("payload");
JsonNode jsonSchema = jsonData.get("schema");
List<Types.NestedField> schemaFields = IcebergUtil.getIcebergSchema(jsonSchema);
Schema schema = new Schema(schemaFields);
System.out.println(schema);
assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>"));

GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
GenericRecord g = (GenericRecord) record.getField("g");
GenericRecord h = (GenericRecord) record.getField("h");
assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass()));
assertEquals(123, g.get(1, Types.IntegerType.get().typeId().javaClass()));
assertEquals("Record(null, null)", h.toString());
assertNull(h.get(0, Types.BinaryType.get().typeId().javaClass()));
}

@Test
public void valuePayloadWithSchemaAsJsonNode() {
// testing Debezium deserializer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "wkb"
},
{
"type": "int32",
"optional": true,
"field": "srid"
}
],
"optional": true,
"name": "io.debezium.data.geometry.Geometry",
"version": 1,
"doc": "Geometry",
"field": "g"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "wkb"
},
{
"type": "int32",
"optional": true,
"field": "srid"
}
],
"optional": true,
"name": "io.debezium.data.geometry.Geometry",
"version": 1,
"doc": "Geometry",
"field": "h"
},
{
"type": "string",
"optional": true,
"field": "__op"
},
{
"type": "string",
"optional": true,
"field": "__table"
},
{
"type": "int64",
"optional": true,
"field": "__source_ts_ms"
},
{
"type": "string",
"optional": true,
"field": "__db"
},
{
"type": "string",
"optional": true,
"field": "__deleted"
}
],
"optional": false,
"name": "testc.inventory.geom.Value"
},
"payload": {
"id": 1,
"g": {
"wkb": "AQEAAAAAAAAAAADwPwAAAAAAAPA/",
"srid": 123
},
"h": null,
"__op": "r",
"__table": "geom",
"__source_ts_ms": 1634844424986,
"__db": "postgres",
"__deleted": "false"
}
}
3 changes: 1 addition & 2 deletions docs/DOCS.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ database table = `inventory.customers` will be replicated to `default.testc_cdc_

## Debezium Event Flattening

Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported.

Iceberg consumer requires event flattening.
```properties
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
Expand Down

0 comments on commit 062f49f

Please sign in to comment.