Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nested data types #41

Merged
merged 9 commits into from
Oct 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
This project adds iceberg consumer to [Debezium Server](https://debezium.io/documentation/reference/operations/debezium-server.html). It could be used to
replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform.

More detail available in [Debezium Iceberg Consumer Documentation](docs/DOCS.md)

![Debezium Iceberg](docs/images/debezium-iceberg.png)

# Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class IcebergUtil {

public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema) {
LOGGER.debug(eventSchema.toString());
return getIcebergSchema(eventSchema, "", -1);
return getIcebergSchema(eventSchema, "", 0);
}

public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
Expand Down Expand Up @@ -80,13 +80,11 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str
//schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
//break;
case "struct":
throw new RuntimeException("Field:'" + fieldName + "' has nested data type, " +
"nested data types are not supported by consumer");
// //recursive call
// Schema subSchema = SchemaUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, ++columnId);
// schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema.columns())));
// columnId += subSchema.columns().size();
// break;
// create it as struct, nested type
List<Types.NestedField> subSchema = IcebergUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default:
// default to String type
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
Expand Down Expand Up @@ -122,8 +120,8 @@ public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonN
return GenericRecord.create(tableFields).copy(mappedResult);
}

private static Object jsonToGenericRecordVal(Types.NestedField field,
JsonNode node) {
static Object jsonToGenericRecordVal(Types.NestedField field,
JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand Down Expand Up @@ -160,14 +158,10 @@ private static Object jsonToGenericRecordVal(Types.NestedField field,
val = jsonObjectMapper.convertValue(node, Map.class);
break;
case STRUCT:
throw new RuntimeException("Cannot process recursive records!");
// Disabled because cannot recursively process StructType/NestedField
// // recursive call to get nested data/record
// Types.StructType nestedField = Types.StructType.of(field);
// GenericRecord r = getIcebergRecord(schema, nestedField, node.get(field.name()));
// return r);
// // throw new RuntimeException("Cannot process recursive record!");
// break;
// create it as struct, nested type
// recursive call to get nested data/record
val = getIcebergRecord(field.type().asStructType(), node);
break;
default:
// default to String type
val = node.asText(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ConfigSource() {
// ==== configure batch behaviour/size ====
// Positive integer value that specifies the maximum size of each batch of events that should be processed during
// each iteration of this connector. Defaults to 2048.
config.put("debezium.source.max.batch.size", "2");
config.put("debezium.source.max.batch.size", "1255");
// Positive integer value that specifies the number of milliseconds the connector should wait for new change
// events to appear before it starts processing a batch of events. Defaults to 1000 milliseconds, or 1 second.
config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds!
Expand Down Expand Up @@ -68,8 +68,7 @@ public ConfigSource() {
config.put("debezium.source.offset.flush.interval.ms", "60000");
config.put("debezium.source.database.server.name", "testc");
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," +
"inventory.table_datatypes,inventory.test_date_table");
config.put("debezium.source.table.whitelist", "inventory.*");
config.put("%postgresql.debezium.source.database.whitelist", "inventory");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
config.put("debezium.source.include.schema.changes", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,17 @@ public void testSimpleUpload() {
}
});
}

@Test
public void testGeomData() {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.geom");
ds.show(false);
return ds.count() >= 3;
} catch (Exception e) {
return false;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
class TestIcebergUtil {
final String serdeWithSchema = Testing.Files.readResourceAsString("json/serde-with-schema.json");
final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json");
final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json");

@Test
public void testNestedJsonRecord() {
Exception exception = assertThrows(Exception.class, () -> IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema")));
assertTrue(exception.getMessage().contains("nested data type"));
public void testNestedJsonRecord() throws JsonProcessingException {
List<Types.NestedField> d = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema"));
assertTrue(d.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, 4:"));
}

@Test
Expand All @@ -46,6 +47,25 @@ public void testUnwrapJsonRecord() throws IOException {
assertEquals(16850, record.getField("order_date"));
}

@Test
public void testNestedGeomJsonRecord() throws JsonProcessingException {
JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema);
JsonNode jsonPayload = jsonData.get("payload");
JsonNode jsonSchema = jsonData.get("schema");
List<Types.NestedField> schemaFields = IcebergUtil.getIcebergSchema(jsonSchema);
Schema schema = new Schema(schemaFields);
System.out.println(schema);
assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>"));

GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
GenericRecord g = (GenericRecord) record.getField("g");
GenericRecord h = (GenericRecord) record.getField("h");
assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass()));
assertEquals(123, g.get(1, Types.IntegerType.get().typeId().javaClass()));
assertEquals("Record(null, null)", h.toString());
assertNull(h.get(0, Types.BinaryType.get().typeId().javaClass()));
}

@Test
public void valuePayloadWithSchemaAsJsonNode() {
// testing Debezium deserializer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "wkb"
},
{
"type": "int32",
"optional": true,
"field": "srid"
}
],
"optional": true,
"name": "io.debezium.data.geometry.Geometry",
"version": 1,
"doc": "Geometry",
"field": "g"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "wkb"
},
{
"type": "int32",
"optional": true,
"field": "srid"
}
],
"optional": true,
"name": "io.debezium.data.geometry.Geometry",
"version": 1,
"doc": "Geometry",
"field": "h"
},
{
"type": "string",
"optional": true,
"field": "__op"
},
{
"type": "string",
"optional": true,
"field": "__table"
},
{
"type": "int64",
"optional": true,
"field": "__source_ts_ms"
},
{
"type": "string",
"optional": true,
"field": "__db"
},
{
"type": "string",
"optional": true,
"field": "__deleted"
}
],
"optional": false,
"name": "testc.inventory.geom.Value"
},
"payload": {
"id": 1,
"g": {
"wkb": "AQEAAAAAAAAAAADwPwAAAAAAAPA/",
"srid": 123
},
"h": null,
"__op": "r",
"__table": "geom",
"__source_ts_ms": 1634844424986,
"__db": "postgres",
"__deleted": "false"
}
}
3 changes: 1 addition & 2 deletions docs/DOCS.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ database table = `inventory.customers` will be replicated to `default.testc_cdc_

## Debezium Event Flattening

Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported.

Iceberg consumer requires event flattening.
```properties
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
Expand Down