Skip to content

Commit

Permalink
improve code (#346)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 15, 2024
1 parent 7727566 commit f5121dd
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,23 @@ public ChangeEventSchema changeEventSchema() {
}

public Schema icebergSchema() {
return changeEventSchema().icebergSchema();
return changeEventSchema().icebergSchema(this.isUnwrapped());
}

public String destination() {
return destination;
}

public boolean isUnwrapped() {
return !(
this.value().has("after") &&
this.value().has("source") &&
this.value().has("before") &&
this.value().get("after").isObject() &&
this.value().get("source").isObject()
);
}

public GenericRecord asIcebergRecord(Schema schema) {
return asIcebergRecord(schema.asStruct(), value());
}
Expand Down Expand Up @@ -266,7 +276,27 @@ public int hashCode() {
return Objects.hash(valueSchema, keySchema);
}

private Schema icebergSchema() {
private static JsonNode getNodeFieldsArray(JsonNode node) {
if (node != null && !node.isNull() && node.has("fields") && node.get("fields").isArray()) {
return node.get("fields");
}

return mapper.createObjectNode();
}

private static JsonNode findNodeFieldByName(String fieldName, JsonNode node) {

for (JsonNode field : getNodeFieldsArray(node)) {

if (Objects.equals(field.get("field").textValue(), fieldName)) {
return field;
}
}

return null;
}

private Schema icebergSchema(boolean isUnwrapped) {

if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
Expand Down Expand Up @@ -313,14 +343,13 @@ private Schema icebergSchema() {
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
});
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
}

return schemaColumns;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ public void testNestedJsonRecord() {
serdeWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
System.out.println(schema.toString());
assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " +
"4:"));
assertEquals(schema.toString(), ("""
table {
1: before: optional struct<2: id: optional int, 3: first_name: optional string, 4: last_name: optional string, 5: email: optional string>
6: after: optional struct<7: id: optional int, 8: first_name: optional string, 9: last_name: optional string, 10: email: optional string>
11: source: optional struct<12: version: optional string, 13: connector: optional string, 14: name: optional string, 15: ts_ms: optional long, 16: snapshot: optional boolean, 17: db: optional string, 18: table: optional string, 19: server_id: optional long, 20: gtid: optional string, 21: file: optional string, 22: pos: optional long, 23: row: optional int, 24: thread: optional long, 25: query: optional string>
26: op: optional string
27: ts_ms: optional long
}"""));
}

@Test
Expand All @@ -62,6 +68,19 @@ public void testUnwrapJsonRecord() {
GenericRecord record = e.asIcebergRecord(schema);
assertEquals("orders", record.getField("__table").toString());
assertEquals(16850, record.getField("order_date"));
assertEquals(schema.toString(), """
table {
1: id: optional int
2: order_date: optional int
3: purchaser: optional int
4: quantity: optional int
5: product_id: optional int
6: __op: optional string
7: __table: optional string
8: __lsn: optional long
9: __source_ts_ms: optional timestamptz
10: __deleted: optional string
}""");
System.out.println(schema);
System.out.println(record);
}
Expand All @@ -72,10 +91,19 @@ public void testNestedArrayJsonRecord() {
unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null);

Schema schema = e.icebergSchema();
assertEquals(schema.toString(), """
table {
1: name: optional string
2: pay_by_quarter: optional list<int>
5: schedule: optional list<string>
8: __op: optional string
9: __table: optional string
10: __source_ts_ms: optional timestamptz
11: __db: optional string
12: __deleted: optional string
}""");
System.out.println(schema);
System.out.println(schema.asStruct());
assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list<int>, 5: schedule: optional list<string>, 8:"));
System.out.println(schema.asStruct());
System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType());
System.out.println(schema.findField("schedule").type().asListType().elementType());
assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(), "int");
Expand All @@ -92,7 +120,14 @@ public void testNestedArray2JsonRecord() {
Schema schema = e.icebergSchema();
System.out.println(schema.asStruct());
System.out.println(schema);
assertTrue(schema.asStruct().toString().contains("20: tableChanges: optional list<struct<22: type: optional string,"));
assertEquals(schema.toString(), """
table {
1: source: optional struct<2: version: optional string, 3: connector: optional string, 4: name: optional string, 5: ts_ms: optional long, 6: snapshot: optional string, 7: db: optional string, 8: sequence: optional string, 9: table: optional string, 10: server_id: optional long, 11: gtid: optional string, 12: file: optional string, 13: pos: optional long, 14: row: optional int, 15: thread: optional long, 16: query: optional string>
17: databaseName: optional string
18: schemaName: optional string
19: ddl: optional string
20: tableChanges: optional list<struct<22: type: optional string, 23: id: optional string, 24: table: optional struct<25: defaultCharsetName: optional string, 26: primaryKeyColumnNames: optional list<string>, 29: columns: optional list<struct<31: name: optional string, 32: jdbcType: optional int, 33: nativeType: optional int, 34: typeName: optional string, 35: typeExpression: optional string, 36: charsetName: optional string, 37: length: optional int, 38: scale: optional int, 39: position: optional int, 40: optional: optional boolean, 41: autoIncremented: optional boolean, 42: generated: optional boolean>>>>>
}""");
System.out.println(schema.findField("tableChanges"));
System.out.println(schema.findField("tableChanges").type().asListType().elementType());
//GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
Expand All @@ -107,7 +142,17 @@ public void testNestedGeomJsonRecord() {
GenericRecord record = e.asIcebergRecord(schema);
//System.out.println(schema);
//System.out.println(record);
assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>"));
assertEquals(schema.toString(), """
table {
1: id: optional int
2: g: optional struct<3: wkb: optional string, 4: srid: optional int>
5: h: optional struct<6: wkb: optional string, 7: srid: optional int>
8: __op: optional string
9: __table: optional string
10: __source_ts_ms: optional timestamptz
11: __db: optional string
12: __deleted: optional string
}""");
GenericRecord g = (GenericRecord) record.getField("g");
GenericRecord h = (GenericRecord) record.getField("h");
assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass()));
Expand Down

0 comments on commit f5121dd

Please sign in to comment.