Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve code and schema conversion tests #346

Merged
merged 1 commit into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,23 @@ public ChangeEventSchema changeEventSchema() {
}

public Schema icebergSchema() {
return changeEventSchema().icebergSchema();
return changeEventSchema().icebergSchema(this.isUnwrapped());
}

public String destination() {
return destination;
}

public boolean isUnwrapped() {
return !(
this.value().has("after") &&
this.value().has("source") &&
this.value().has("before") &&
this.value().get("after").isObject() &&
this.value().get("source").isObject()
);
}

public GenericRecord asIcebergRecord(Schema schema) {
return asIcebergRecord(schema.asStruct(), value());
}
Expand Down Expand Up @@ -266,7 +276,27 @@ public int hashCode() {
return Objects.hash(valueSchema, keySchema);
}

private Schema icebergSchema() {
private static JsonNode getNodeFieldsArray(JsonNode node) {
if (node != null && !node.isNull() && node.has("fields") && node.get("fields").isArray()) {
return node.get("fields");
}

return mapper.createObjectNode();
}

private static JsonNode findNodeFieldByName(String fieldName, JsonNode node) {

for (JsonNode field : getNodeFieldsArray(node)) {

if (Objects.equals(field.get("field").textValue(), fieldName)) {
return field;
}
}

return null;
}

private Schema icebergSchema(boolean isUnwrapped) {

if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
Expand Down Expand Up @@ -313,14 +343,13 @@ private Schema icebergSchema() {
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
});
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
}

return schemaColumns;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ public void testNestedJsonRecord() {
serdeWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
System.out.println(schema.toString());
assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " +
"4:"));
assertEquals(schema.toString(), ("""
table {
1: before: optional struct<2: id: optional int, 3: first_name: optional string, 4: last_name: optional string, 5: email: optional string>
6: after: optional struct<7: id: optional int, 8: first_name: optional string, 9: last_name: optional string, 10: email: optional string>
11: source: optional struct<12: version: optional string, 13: connector: optional string, 14: name: optional string, 15: ts_ms: optional long, 16: snapshot: optional boolean, 17: db: optional string, 18: table: optional string, 19: server_id: optional long, 20: gtid: optional string, 21: file: optional string, 22: pos: optional long, 23: row: optional int, 24: thread: optional long, 25: query: optional string>
26: op: optional string
27: ts_ms: optional long
}"""));
}

@Test
Expand All @@ -62,6 +68,19 @@ public void testUnwrapJsonRecord() {
GenericRecord record = e.asIcebergRecord(schema);
assertEquals("orders", record.getField("__table").toString());
assertEquals(16850, record.getField("order_date"));
assertEquals(schema.toString(), """
table {
1: id: optional int
2: order_date: optional int
3: purchaser: optional int
4: quantity: optional int
5: product_id: optional int
6: __op: optional string
7: __table: optional string
8: __lsn: optional long
9: __source_ts_ms: optional timestamptz
10: __deleted: optional string
}""");
System.out.println(schema);
System.out.println(record);
}
Expand All @@ -72,10 +91,19 @@ public void testNestedArrayJsonRecord() {
unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null);

Schema schema = e.icebergSchema();
assertEquals(schema.toString(), """
table {
1: name: optional string
2: pay_by_quarter: optional list<int>
5: schedule: optional list<string>
8: __op: optional string
9: __table: optional string
10: __source_ts_ms: optional timestamptz
11: __db: optional string
12: __deleted: optional string
}""");
System.out.println(schema);
System.out.println(schema.asStruct());
assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list<int>, 5: schedule: optional list<string>, 8:"));
System.out.println(schema.asStruct());
System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType());
System.out.println(schema.findField("schedule").type().asListType().elementType());
assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(), "int");
Expand All @@ -92,7 +120,14 @@ public void testNestedArray2JsonRecord() {
Schema schema = e.icebergSchema();
System.out.println(schema.asStruct());
System.out.println(schema);
assertTrue(schema.asStruct().toString().contains("20: tableChanges: optional list<struct<22: type: optional string,"));
assertEquals(schema.toString(), """
table {
1: source: optional struct<2: version: optional string, 3: connector: optional string, 4: name: optional string, 5: ts_ms: optional long, 6: snapshot: optional string, 7: db: optional string, 8: sequence: optional string, 9: table: optional string, 10: server_id: optional long, 11: gtid: optional string, 12: file: optional string, 13: pos: optional long, 14: row: optional int, 15: thread: optional long, 16: query: optional string>
17: databaseName: optional string
18: schemaName: optional string
19: ddl: optional string
20: tableChanges: optional list<struct<22: type: optional string, 23: id: optional string, 24: table: optional struct<25: defaultCharsetName: optional string, 26: primaryKeyColumnNames: optional list<string>, 29: columns: optional list<struct<31: name: optional string, 32: jdbcType: optional int, 33: nativeType: optional int, 34: typeName: optional string, 35: typeExpression: optional string, 36: charsetName: optional string, 37: length: optional int, 38: scale: optional int, 39: position: optional int, 40: optional: optional boolean, 41: autoIncremented: optional boolean, 42: generated: optional boolean>>>>>
}""");
System.out.println(schema.findField("tableChanges"));
System.out.println(schema.findField("tableChanges").type().asListType().elementType());
//GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
Expand All @@ -107,7 +142,17 @@ public void testNestedGeomJsonRecord() {
GenericRecord record = e.asIcebergRecord(schema);
//System.out.println(schema);
//System.out.println(record);
assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>"));
assertEquals(schema.toString(), """
table {
1: id: optional int
2: g: optional struct<3: wkb: optional string, 4: srid: optional int>
5: h: optional struct<6: wkb: optional string, 7: srid: optional int>
8: __op: optional string
9: __table: optional string
10: __source_ts_ms: optional timestamptz
11: __db: optional string
12: __deleted: optional string
}""");
GenericRecord g = (GenericRecord) record.getField("g");
GenericRecord h = (GenericRecord) record.getField("h");
assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass()));
Expand Down