diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 9c749953..3f1f76cc 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -25,7 +25,6 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.*; -import java.util.concurrent.atomic.AtomicReference; import static io.debezium.server.iceberg.IcebergChangeConsumer.keyDeserializer; import static io.debezium.server.iceberg.IcebergChangeConsumer.valDeserializer; @@ -227,47 +226,46 @@ protected JsonNode keySchema() { * * @param fieldSchema JsonNode representation of debezium field schema. * @param fieldName name of the debezium field - * @param fieldId id sequence to assign iceberg field, after the conversion. + * @param schemaData keeps information of iceberg schema like fields, nextFieldId and identifier fields * @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField. */ - private static Map.Entry debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) { + private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergChangeEventSchemaData schemaData) { String fieldType = fieldSchema.get("type").textValue(); switch (fieldType) { case "struct": - // struct type - int rootStructId = fieldId; - List subFields = new ArrayList<>(); + int rootStructId = schemaData.nextFieldId().getAndIncrement(); + IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepNextFieldId(); for (JsonNode subFieldSchema : fieldSchema.get("fields")) { - fieldId += 1; String subFieldName = subFieldSchema.get("field").textValue(); - Map.Entry subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId); - subFields.add(subField.getValue()); - fieldId = subField.getKey(); + debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData); } // create it as struct, nested type - return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields))); + schemaData.fields().add(Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subSchemaData.fields()))); + return schemaData; case "map": - int rootMapId = fieldId; - int keyFieldId = fieldId + 1; - int valFieldId = fieldId + 2; - fieldId = fieldId + 3; - Map.Entry keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId); - fieldId = keyField.getKey() + 1; - Map.Entry valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId); - fieldId = valField.getKey(); - Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type()); - return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField)); + int rootMapId = schemaData.nextFieldId().getAndIncrement(); + int keyFieldId = schemaData.nextFieldId().getAndIncrement(); + int valFieldId = schemaData.nextFieldId().getAndIncrement(); + IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepNextFieldId(); + debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData); + schemaData.nextFieldId().incrementAndGet(); + IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepNextFieldId(); + debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData); + Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type()); + schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField)); + return schemaData; case "array": - int rootArrayId = fieldId; - fieldId += 1; - Map.Entry listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId); - fieldId = listItemsField.getKey() + 1; - Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type()); - return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField)); + int rootArrayId = schemaData.nextFieldId().getAndIncrement(); + IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepNextFieldId(); + debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData); + Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type()); + schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField)); + return schemaData; default: // its primitive field - return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType))); + schemaData.fields().add(Types.NestedField.optional(schemaData.nextFieldId().getAndIncrement(), fieldName, icebergPrimitiveField(fieldName, fieldType))); + return schemaData; } } @@ -302,19 +300,19 @@ private Schema icebergSchema(boolean isUnwrapped) { throw new RuntimeException("Failed to get schema from debezium event, event schema is null"); } - final List tableColumns = icebergSchemaFields(valueSchema); + final IcebergChangeEventSchemaData tableColumns = icebergSchemaFields(valueSchema); - if (tableColumns.isEmpty()) { + if (tableColumns.fields().isEmpty()) { throw new RuntimeException("Failed to get schema from debezium event, event schema has no fields!"); } - final List keyColumns = icebergSchemaFields(keySchema); + final IcebergChangeEventSchemaData keyColumns = icebergSchemaFields(keySchema); Set identifierFieldIds = new HashSet<>(); - for (Types.NestedField ic : keyColumns) { + for (Types.NestedField ic : keyColumns.fields()) { boolean found = false; - ListIterator colsIterator = tableColumns.listIterator(); + ListIterator colsIterator = tableColumns.fields().listIterator(); while (colsIterator.hasNext()) { Types.NestedField tc = colsIterator.next(); if (Objects.equals(tc.name(), ic.name())) { @@ -332,7 +330,7 @@ private Schema icebergSchema(boolean isUnwrapped) { } - return new Schema(tableColumns, identifierFieldIds); + return new Schema(tableColumns.fields(), identifierFieldIds); } /*** @@ -340,17 +338,18 @@ private Schema icebergSchema(boolean isUnwrapped) { * @param schemaNode * @return */ - private static List icebergSchemaFields(JsonNode schemaNode) { - List schemaColumns = new ArrayList<>(); - AtomicReference fieldId = new AtomicReference<>(1); + private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode) { + //List schemaColumns = new ArrayList<>(); + //AtomicReference fieldId = new AtomicReference<>(1); + IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData(); LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode); for (JsonNode field : getNodeFieldsArray(schemaNode)) { - Map.Entry df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get()); - fieldId.set(df.getKey() + 1); - schemaColumns.add(df.getValue()); + schemaData = debeziumFieldToIcebergField(field, field.get("field").textValue(), schemaData); + //fieldId.set(df.getKey() + 1); + //schemaColumns.add(df.getValue()); } - return schemaColumns; + return schemaData; } private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java new file mode 100644 index 00000000..00196ca9 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java @@ -0,0 +1,28 @@ +package io.debezium.server.iceberg; + +import org.apache.iceberg.types.Types; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +record IcebergChangeEventSchemaData(List fields, Set identifierFieldIds, + AtomicInteger nextFieldId) { + + + public IcebergChangeEventSchemaData(Integer nextFieldId) { + this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(nextFieldId)); + } + + public IcebergChangeEventSchemaData() { + this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(1)); + } + + public IcebergChangeEventSchemaData copyKeepNextFieldId() { + return new IcebergChangeEventSchemaData(new ArrayList<>(), new HashSet<>(), this.nextFieldId); + } + + +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventSchemaDataTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventSchemaDataTest.java new file mode 100644 index 00000000..dd1f44a6 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventSchemaDataTest.java @@ -0,0 +1,21 @@ +package io.debezium.server.iceberg; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class IcebergChangeEventSchemaDataTest { + + @Test + void nextFieldId() { + + IcebergChangeEventSchemaData test = new IcebergChangeEventSchemaData(5); + test.identifierFieldIds().add(1); + assertEquals(6, test.nextFieldId().incrementAndGet()); + + IcebergChangeEventSchemaData testSubschemaField = test.copyKeepNextFieldId(); + testSubschemaField.nextFieldId().incrementAndGet(); + assertEquals(7, test.nextFieldId().get()); + } + +} \ No newline at end of file