Skip to content

Commit

Permalink
Allow neste fields to be set as identifier (#351)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 15, 2024
1 parent 04f23b0 commit 0416802
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.debezium.DebeziumException;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
Expand Down Expand Up @@ -234,14 +235,15 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode
switch (fieldType) {
case "struct":
int rootStructId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepNextFieldId();
final IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
String subFieldName = subFieldSchema.get("field").textValue();
JsonNode equivalentNestedKeyField = findNodeFieldByName(subFieldName, keySchemaNode);
debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData, equivalentNestedKeyField);
}
// create it as struct, nested type
schemaData.fields().add(Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subSchemaData.fields())));
final Types.NestedField structField = Types.NestedField.of(rootStructId, !isPkField, fieldName, Types.StructType.of(subSchemaData.fields()));
schemaData.fields().add(structField);
return schemaData;
case "map":
if (isPkField) {
Expand All @@ -250,12 +252,12 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode
int rootMapId = schemaData.nextFieldId().getAndIncrement();
int keyFieldId = schemaData.nextFieldId().getAndIncrement();
int valFieldId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepNextFieldId();
final IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData, null);
schemaData.nextFieldId().incrementAndGet();
IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepNextFieldId();
final IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData, null);
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type());
final Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField));
return schemaData;

Expand All @@ -264,14 +266,14 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode
throw new DebeziumException("Cannot set array field '" + fieldName + "' as a identifier field, array types are not supported as an identifier field!");
}
int rootArrayId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepNextFieldId();
final IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData, null);
Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type());
final Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField));
return schemaData;
default:
// its primitive field
Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType));
final Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType));
schemaData.fields().add(field);
if (isPkField) schemaData.identifierFieldIds().add(field.fieldId());
return schemaData;
Expand Down Expand Up @@ -303,13 +305,38 @@ private static JsonNode findNodeFieldByName(String fieldName, JsonNode node) {
return null;
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode, IcebergChangeEventSchemaData schemaData) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
String fieldName = field.get("field").textValue();
JsonNode equivalentKeyFieldNode = findNodeFieldByName(fieldName, keySchemaNode);
debeziumFieldToIcebergField(field, fieldName, schemaData, equivalentKeyFieldNode);
}

return schemaData;
}

private Schema icebergSchema(boolean isUnwrapped) {

if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

final IcebergChangeEventSchemaData schemaData = icebergSchemaFields(valueSchema, keySchema);
IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
if (!isUnwrapped && keySchema != null) {
// NOTE: events re not unwrapped, align schema with event schema, so then we can scan event and key schemas synchronously
ObjectNode nestedKeySchema = mapper.createObjectNode();
nestedKeySchema.put("type", "struct");
nestedKeySchema.putArray("fields").add(((ObjectNode) keySchema).put("field", "after"));
icebergSchemaFields(valueSchema, nestedKeySchema, schemaData);
} else {
icebergSchemaFields(valueSchema, keySchema, schemaData);
}

if (schemaData.fields().isEmpty()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema has no fields!");
Expand All @@ -320,23 +347,6 @@ private Schema icebergSchema(boolean isUnwrapped) {

}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode) {
IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
String fieldName = field.get("field").textValue();
JsonNode equivalentKeyFieldNode = findNodeFieldByName(fieldName, keySchemaNode);
debeziumFieldToIcebergField(field, fieldName, schemaData, equivalentKeyFieldNode);
}

return schemaData;
}

private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) {
switch (fieldType) {
case "int8":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public IcebergChangeEventSchemaData() {
this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(1));
}

public IcebergChangeEventSchemaData copyKeepNextFieldId() {
return new IcebergChangeEventSchemaData(new ArrayList<>(), new HashSet<>(), this.nextFieldId);
public IcebergChangeEventSchemaData copyKeepIdentifierFieldIdsAndNextFieldId() {
return new IcebergChangeEventSchemaData(new ArrayList<>(), this.identifierFieldIds, this.nextFieldId);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package io.debezium.server.iceberg.tableoperator;

import java.io.IOException;
import java.util.List;

import com.google.common.collect.Sets;
import org.apache.iceberg.*;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;

import java.io.IOException;
import java.util.List;

abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {

private final Schema schema;
Expand Down Expand Up @@ -50,11 +50,13 @@ InternalRecordWrapper wrapper() {
@Override
public void write(Record row) throws IOException {
RowDataDeltaWriter writer = route(row);
// @TODO __op field should not be hardcoded! when unwrapped its __op when not ist op
if (upsert && !row.getField("__op").equals("c")) {// anything which not an insert is upsert
writer.delete(row);
}
// if its deleted row and upsertKeepDeletes = true then add deleted record to target table
// else deleted records are deleted from target table
// @TODO __op field should not be hardcoded! when unwrapped its __op when not ist op
if (
upsertKeepDeletes
|| !(row.getField("__op").equals("d")))// anything which not an insert is upsert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,31 @@

import org.junit.jupiter.api.Test;

import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;

class IcebergChangeEventSchemaDataTest {

@Test
void nextFieldId() {
void testIcebergChangeEventSchemaDataBehaviourAndCloning() {

IcebergChangeEventSchemaData test = new IcebergChangeEventSchemaData(5);
test.identifierFieldIds().add(1);
test.identifierFieldIds().add(3);
assertEquals(6, test.nextFieldId().incrementAndGet());
assertEquals(Set.of(3), test.identifierFieldIds());

IcebergChangeEventSchemaData testSubschemaField = test.copyKeepNextFieldId();
testSubschemaField.nextFieldId().incrementAndGet();
// test cloning and then changing nextFieldId is persisting
IcebergChangeEventSchemaData copy = test.copyKeepIdentifierFieldIdsAndNextFieldId();
assertEquals(6, test.nextFieldId().get());
copy.nextFieldId().incrementAndGet();
assertEquals(7, test.nextFieldId().get());

// test cloning and then changing identifier fields is persisting
assertEquals(Set.of(3), copy.identifierFieldIds());
copy.identifierFieldIds().add(7);
assertEquals(Set.of(3, 7), test.identifierFieldIds());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ public void testUnwrapJsonRecord() {
}""");

assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema);
System.out.println(record);
}

@Test
Expand All @@ -108,10 +106,6 @@ public void testNestedArrayJsonRecord() {
12: __deleted: optional string
}""");
assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema);
System.out.println(schema.asStruct());
System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType());
System.out.println(schema.findField("schedule").type().asListType().elementType());
assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(), "int");
assertEquals(schema.findField("schedule").type().asListType().elementType().toString(), "string");
GenericRecord record = e.asIcebergRecord(schema);
Expand All @@ -124,7 +118,6 @@ public void testNestedArray2JsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
System.out.println(schema.asStruct());
System.out.println(schema);
assertEquals(schema.toString(), """
table {
Expand All @@ -135,10 +128,6 @@ public void testNestedArray2JsonRecord() {
20: tableChanges: optional list<struct<22: type: optional string, 23: id: optional string, 24: table: optional struct<25: defaultCharsetName: optional string, 26: primaryKeyColumnNames: optional list<string>, 29: columns: optional list<struct<31: name: optional string, 32: jdbcType: optional int, 33: nativeType: optional int, 34: typeName: optional string, 35: typeExpression: optional string, 36: charsetName: optional string, 37: length: optional int, 38: scale: optional int, 39: position: optional int, 40: optional: optional boolean, 41: autoIncremented: optional boolean, 42: generated: optional boolean>>>>>
}""");
assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema.findField("tableChanges"));
System.out.println(schema.findField("tableChanges").type().asListType().elementType());
//GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
//System.out.println(record);
}

@Test
Expand All @@ -147,8 +136,6 @@ public void testNestedGeomJsonRecord() {
unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
GenericRecord record = e.asIcebergRecord(schema);
//System.out.println(schema);
//System.out.println(record);
assertEquals(schema.toString(), """
table {
1: id: optional int
Expand All @@ -175,18 +162,15 @@ public void valuePayloadWithSchemaAsJsonNode() {
final Serde<JsonNode> valueSerde = DebeziumSerdes.payloadJson(JsonNode.class);
valueSerde.configure(Collections.emptyMap(), false);
JsonNode deserializedData = valueSerde.deserializer().deserialize("xx", serdeWithSchema.getBytes());
System.out.println(deserializedData.getClass().getSimpleName());
System.out.println(deserializedData.has("payload"));
assertEquals(deserializedData.getClass().getSimpleName(), "ObjectNode");
System.out.println(deserializedData);
assertTrue(deserializedData.has("after"));
assertTrue(deserializedData.has("op"));
assertTrue(deserializedData.has("before"));
assertFalse(deserializedData.has("schema"));
assertFalse(deserializedData.has("payload"));

valueSerde.configure(Collections.singletonMap("from.field", "schema"), false);
JsonNode deserializedSchema = valueSerde.deserializer().deserialize("xx", serdeWithSchema.getBytes());
System.out.println(deserializedSchema);
assertFalse(deserializedSchema.has("schema"));

}
Expand Down Expand Up @@ -240,4 +224,24 @@ public void testIcebergChangeEventSchemaWithKey() {
}


@Test
public void testIcebergChangeEventSchemaWithNestedKey() throws IOException {
String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json"));
String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json"));
TestChangeEvent<String, String> dbzEvent = new TestChangeEvent<>(key, val, "test");
Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema();
assertEquals("""
table {
1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int>
7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int>
13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string>
31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long>
35: op: optional string
36: ts_ms: optional long
37: ts_us: optional long
38: ts_ns: optional long
}""", schema.toString());
assertEquals(Set.of(8), schema.identifierFieldIds());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "order_number"
}
],
"optional": false,
"name": "testc.inventory.orders.Key"
},
"payload": {
"order_number": 10004
}
}
Loading

0 comments on commit 0416802

Please sign in to comment.