Skip to content

Commit

Permalink
Improve conversion of debezium schema to iceberg schema
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 15, 2024
1 parent 6308f72 commit da43d81
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode
switch (fieldType) {
case "struct":
int rootStructId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData subSchemaData = schemaData.copyWithNextFieldId();
IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepNextFieldId();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
String subFieldName = subFieldSchema.get("field").textValue();
debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData);
Expand All @@ -246,18 +246,18 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode
int rootMapId = schemaData.nextFieldId().getAndIncrement();
int keyFieldId = schemaData.nextFieldId().getAndIncrement();
int valFieldId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData keySchemaData = schemaData.copyWithNextFieldId();
IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData);
schemaData.nextFieldId().incrementAndGet();
IcebergChangeEventSchemaData valSchemaData = schemaData.copyWithNextFieldId();
IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData);
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField));
return schemaData;

case "array":
int rootArrayId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData arraySchemaData = schemaData.copyWithNextFieldId();
IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData);
Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,7 @@
import java.util.concurrent.atomic.AtomicInteger;

record IcebergChangeEventSchemaData(List<Types.NestedField> fields, Set<Integer> identifierFieldIds,
AtomicInteger nextFieldId) {

public IcebergChangeEventSchemaData(List<Types.NestedField> fields, Set<Integer> identifierFieldIds, AtomicInteger nextFieldId) {
this.nextFieldId = nextFieldId;
this.fields = fields;
this.identifierFieldIds = identifierFieldIds;
}

public IcebergChangeEventSchemaData(AtomicInteger nextFieldId) {
this(new ArrayList<>(), new HashSet<>(), nextFieldId);
}
AtomicInteger nextFieldId) {


public IcebergChangeEventSchemaData(Integer nextFieldId) {
Expand All @@ -30,8 +20,7 @@ public IcebergChangeEventSchemaData() {
this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(1));
}


public IcebergChangeEventSchemaData copyWithNextFieldId() {
public IcebergChangeEventSchemaData copyKeepNextFieldId() {
return new IcebergChangeEventSchemaData(new ArrayList<>(), new HashSet<>(), this.nextFieldId);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.debezium.server.iceberg;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

class IcebergChangeEventSchemaDataTest {

@Test
void nextFieldId() {

IcebergChangeEventSchemaData test = new IcebergChangeEventSchemaData(5);
test.identifierFieldIds().add(1);
assertEquals(6, test.nextFieldId().incrementAndGet());

IcebergChangeEventSchemaData testSubschemaField = test.copyKeepNextFieldId();
testSubschemaField.nextFieldId().incrementAndGet();
assertEquals(7, test.nextFieldId().get());
}

}

0 comments on commit da43d81

Please sign in to comment.