Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nested fields to be set as identifier #351

Merged
merged 1 commit into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.debezium.DebeziumException;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
Expand Down Expand Up @@ -234,14 +235,15 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode
switch (fieldType) {
case "struct":
int rootStructId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepNextFieldId();
final IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
String subFieldName = subFieldSchema.get("field").textValue();
JsonNode equivalentNestedKeyField = findNodeFieldByName(subFieldName, keySchemaNode);
debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData, equivalentNestedKeyField);
}
// create it as struct, nested type
schemaData.fields().add(Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subSchemaData.fields())));
final Types.NestedField structField = Types.NestedField.of(rootStructId, !isPkField, fieldName, Types.StructType.of(subSchemaData.fields()));
schemaData.fields().add(structField);
return schemaData;
case "map":
if (isPkField) {
Expand All @@ -250,12 +252,12 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode
int rootMapId = schemaData.nextFieldId().getAndIncrement();
int keyFieldId = schemaData.nextFieldId().getAndIncrement();
int valFieldId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepNextFieldId();
final IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData, null);
schemaData.nextFieldId().incrementAndGet();
IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepNextFieldId();
final IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData, null);
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type());
final Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField));
return schemaData;

Expand All @@ -264,14 +266,14 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode
throw new DebeziumException("Cannot set array field '" + fieldName + "' as a identifier field, array types are not supported as an identifier field!");
}
int rootArrayId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepNextFieldId();
final IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData, null);
Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type());
final Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField));
return schemaData;
default:
// its primitive field
Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType));
final Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType));
schemaData.fields().add(field);
if (isPkField) schemaData.identifierFieldIds().add(field.fieldId());
return schemaData;
Expand Down Expand Up @@ -303,13 +305,38 @@ private static JsonNode findNodeFieldByName(String fieldName, JsonNode node) {
return null;
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode, IcebergChangeEventSchemaData schemaData) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
String fieldName = field.get("field").textValue();
JsonNode equivalentKeyFieldNode = findNodeFieldByName(fieldName, keySchemaNode);
debeziumFieldToIcebergField(field, fieldName, schemaData, equivalentKeyFieldNode);
}

return schemaData;
}

private Schema icebergSchema(boolean isUnwrapped) {

if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

final IcebergChangeEventSchemaData schemaData = icebergSchemaFields(valueSchema, keySchema);
IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
if (!isUnwrapped && keySchema != null) {
// NOTE: events re not unwrapped, align schema with event schema, so then we can scan event and key schemas synchronously
ObjectNode nestedKeySchema = mapper.createObjectNode();
nestedKeySchema.put("type", "struct");
nestedKeySchema.putArray("fields").add(((ObjectNode) keySchema).put("field", "after"));
icebergSchemaFields(valueSchema, nestedKeySchema, schemaData);
} else {
icebergSchemaFields(valueSchema, keySchema, schemaData);
}

if (schemaData.fields().isEmpty()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema has no fields!");
Expand All @@ -320,23 +347,6 @@ private Schema icebergSchema(boolean isUnwrapped) {

}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode) {
IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
String fieldName = field.get("field").textValue();
JsonNode equivalentKeyFieldNode = findNodeFieldByName(fieldName, keySchemaNode);
debeziumFieldToIcebergField(field, fieldName, schemaData, equivalentKeyFieldNode);
}

return schemaData;
}

private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) {
switch (fieldType) {
case "int8":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public IcebergChangeEventSchemaData() {
this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(1));
}

public IcebergChangeEventSchemaData copyKeepNextFieldId() {
return new IcebergChangeEventSchemaData(new ArrayList<>(), new HashSet<>(), this.nextFieldId);
public IcebergChangeEventSchemaData copyKeepIdentifierFieldIdsAndNextFieldId() {
return new IcebergChangeEventSchemaData(new ArrayList<>(), this.identifierFieldIds, this.nextFieldId);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package io.debezium.server.iceberg.tableoperator;

import java.io.IOException;
import java.util.List;

import com.google.common.collect.Sets;
import org.apache.iceberg.*;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;

import java.io.IOException;
import java.util.List;

abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {

private final Schema schema;
Expand Down Expand Up @@ -50,11 +50,13 @@ InternalRecordWrapper wrapper() {
@Override
public void write(Record row) throws IOException {
RowDataDeltaWriter writer = route(row);
// @TODO __op field should not be hardcoded! when unwrapped its __op when not ist op
if (upsert && !row.getField("__op").equals("c")) {// anything which not an insert is upsert
writer.delete(row);
}
// if its deleted row and upsertKeepDeletes = true then add deleted record to target table
// else deleted records are deleted from target table
// @TODO __op field should not be hardcoded! when unwrapped its __op when not ist op
if (
upsertKeepDeletes
|| !(row.getField("__op").equals("d")))// anything which not an insert is upsert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,31 @@

import org.junit.jupiter.api.Test;

import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;

class IcebergChangeEventSchemaDataTest {

@Test
void nextFieldId() {
void testIcebergChangeEventSchemaDataBehaviourAndCloning() {

IcebergChangeEventSchemaData test = new IcebergChangeEventSchemaData(5);
test.identifierFieldIds().add(1);
test.identifierFieldIds().add(3);
assertEquals(6, test.nextFieldId().incrementAndGet());
assertEquals(Set.of(3), test.identifierFieldIds());

IcebergChangeEventSchemaData testSubschemaField = test.copyKeepNextFieldId();
testSubschemaField.nextFieldId().incrementAndGet();
// test cloning and then changing nextFieldId is persisting
IcebergChangeEventSchemaData copy = test.copyKeepIdentifierFieldIdsAndNextFieldId();
assertEquals(6, test.nextFieldId().get());
copy.nextFieldId().incrementAndGet();
assertEquals(7, test.nextFieldId().get());

// test cloning and then changing identifier fields is persisting
assertEquals(Set.of(3), copy.identifierFieldIds());
copy.identifierFieldIds().add(7);
assertEquals(Set.of(3, 7), test.identifierFieldIds());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ public void testUnwrapJsonRecord() {
}""");

assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema);
System.out.println(record);
}

@Test
Expand All @@ -108,10 +106,6 @@ public void testNestedArrayJsonRecord() {
12: __deleted: optional string
}""");
assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema);
System.out.println(schema.asStruct());
System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType());
System.out.println(schema.findField("schedule").type().asListType().elementType());
assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(), "int");
assertEquals(schema.findField("schedule").type().asListType().elementType().toString(), "string");
GenericRecord record = e.asIcebergRecord(schema);
Expand All @@ -124,7 +118,6 @@ public void testNestedArray2JsonRecord() {
IcebergChangeEvent e = new IcebergChangeEvent("test",
unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
System.out.println(schema.asStruct());
System.out.println(schema);
assertEquals(schema.toString(), """
table {
Expand All @@ -135,10 +128,6 @@ public void testNestedArray2JsonRecord() {
20: tableChanges: optional list<struct<22: type: optional string, 23: id: optional string, 24: table: optional struct<25: defaultCharsetName: optional string, 26: primaryKeyColumnNames: optional list<string>, 29: columns: optional list<struct<31: name: optional string, 32: jdbcType: optional int, 33: nativeType: optional int, 34: typeName: optional string, 35: typeExpression: optional string, 36: charsetName: optional string, 37: length: optional int, 38: scale: optional int, 39: position: optional int, 40: optional: optional boolean, 41: autoIncremented: optional boolean, 42: generated: optional boolean>>>>>
}""");
assertEquals(schema.identifierFieldIds(), Set.of());
System.out.println(schema.findField("tableChanges"));
System.out.println(schema.findField("tableChanges").type().asListType().elementType());
//GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
//System.out.println(record);
}

@Test
Expand All @@ -147,8 +136,6 @@ public void testNestedGeomJsonRecord() {
unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema();
GenericRecord record = e.asIcebergRecord(schema);
//System.out.println(schema);
//System.out.println(record);
assertEquals(schema.toString(), """
table {
1: id: optional int
Expand All @@ -175,18 +162,15 @@ public void valuePayloadWithSchemaAsJsonNode() {
final Serde<JsonNode> valueSerde = DebeziumSerdes.payloadJson(JsonNode.class);
valueSerde.configure(Collections.emptyMap(), false);
JsonNode deserializedData = valueSerde.deserializer().deserialize("xx", serdeWithSchema.getBytes());
System.out.println(deserializedData.getClass().getSimpleName());
System.out.println(deserializedData.has("payload"));
assertEquals(deserializedData.getClass().getSimpleName(), "ObjectNode");
System.out.println(deserializedData);
assertTrue(deserializedData.has("after"));
assertTrue(deserializedData.has("op"));
assertTrue(deserializedData.has("before"));
assertFalse(deserializedData.has("schema"));
assertFalse(deserializedData.has("payload"));

valueSerde.configure(Collections.singletonMap("from.field", "schema"), false);
JsonNode deserializedSchema = valueSerde.deserializer().deserialize("xx", serdeWithSchema.getBytes());
System.out.println(deserializedSchema);
assertFalse(deserializedSchema.has("schema"));

}
Expand Down Expand Up @@ -240,4 +224,24 @@ public void testIcebergChangeEventSchemaWithKey() {
}


@Test
public void testIcebergChangeEventSchemaWithNestedKey() throws IOException {
String key = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-key-withschema.json"));
String val = Files.readString(Path.of("src/test/resources/json/serde-unnested-order-val-withschema.json"));
TestChangeEvent<String, String> dbzEvent = new TestChangeEvent<>(key, val, "test");
Schema schema = dbzEvent.toIcebergChangeEvent().icebergSchema();
assertEquals("""
table {
1: before: optional struct<2: order_number: optional int, 3: order_date: optional int, 4: purchaser: optional int, 5: quantity: optional int, 6: product_id: optional int>
7: after: required struct<8: order_number: required int, 9: order_date: optional int, 10: purchaser: optional int, 11: quantity: optional int, 12: product_id: optional int>
13: source: optional struct<14: version: optional string, 15: connector: optional string, 16: name: optional string, 17: ts_ms: optional long, 18: snapshot: optional string, 19: db: optional string, 20: sequence: optional string, 21: ts_us: optional long, 22: ts_ns: optional long, 23: table: optional string, 24: server_id: optional long, 25: gtid: optional string, 26: file: optional string, 27: pos: optional long, 28: row: optional int, 29: thread: optional long, 30: query: optional string>
31: transaction: optional struct<32: id: optional string, 33: total_order: optional long, 34: data_collection_order: optional long>
35: op: optional string
36: ts_ms: optional long
37: ts_us: optional long
38: ts_ns: optional long
}""", schema.toString());
assertEquals(Set.of(8), schema.identifierFieldIds());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "order_number"
}
],
"optional": false,
"name": "testc.inventory.orders.Key"
},
"payload": {
"order_number": 10004
}
}
Loading