diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index ba406724..3f1f76cc 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -234,7 +234,7 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode switch (fieldType) { case "struct": int rootStructId = schemaData.nextFieldId().getAndIncrement(); - IcebergChangeEventSchemaData subSchemaData = schemaData.copyWithNextFieldId(); + IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepNextFieldId(); for (JsonNode subFieldSchema : fieldSchema.get("fields")) { String subFieldName = subFieldSchema.get("field").textValue(); debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData); @@ -246,10 +246,10 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode int rootMapId = schemaData.nextFieldId().getAndIncrement(); int keyFieldId = schemaData.nextFieldId().getAndIncrement(); int valFieldId = schemaData.nextFieldId().getAndIncrement(); - IcebergChangeEventSchemaData keySchemaData = schemaData.copyWithNextFieldId(); + IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepNextFieldId(); debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData); schemaData.nextFieldId().incrementAndGet(); - IcebergChangeEventSchemaData valSchemaData = schemaData.copyWithNextFieldId(); + IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepNextFieldId(); debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData); Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type()); schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField)); @@ -257,7 +257,7 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode case "array": int rootArrayId = schemaData.nextFieldId().getAndIncrement(); - IcebergChangeEventSchemaData arraySchemaData = schemaData.copyWithNextFieldId(); + IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepNextFieldId(); debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData); Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type()); schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField)); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java index 2a224563..00196ca9 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEventSchemaData.java @@ -9,17 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger; record IcebergChangeEventSchemaData(List fields, Set identifierFieldIds, - AtomicInteger nextFieldId) { - - public IcebergChangeEventSchemaData(List fields, Set identifierFieldIds, AtomicInteger nextFieldId) { - this.nextFieldId = nextFieldId; - this.fields = fields; - this.identifierFieldIds = identifierFieldIds; - } - - public IcebergChangeEventSchemaData(AtomicInteger nextFieldId) { - this(new ArrayList<>(), new HashSet<>(), nextFieldId); - } + AtomicInteger nextFieldId) { public IcebergChangeEventSchemaData(Integer nextFieldId) { @@ -30,8 +20,7 @@ public IcebergChangeEventSchemaData() { this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(1)); } - - public IcebergChangeEventSchemaData copyWithNextFieldId() { + public IcebergChangeEventSchemaData copyKeepNextFieldId() { return new IcebergChangeEventSchemaData(new ArrayList<>(), new HashSet<>(), this.nextFieldId); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventSchemaDataTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventSchemaDataTest.java new file mode 100644 index 00000000..dd1f44a6 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventSchemaDataTest.java @@ -0,0 +1,21 @@ +package io.debezium.server.iceberg; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class IcebergChangeEventSchemaDataTest { + + @Test + void nextFieldId() { + + IcebergChangeEventSchemaData test = new IcebergChangeEventSchemaData(5); + test.identifierFieldIds().add(1); + assertEquals(6, test.nextFieldId().incrementAndGet()); + + IcebergChangeEventSchemaData testSubschemaField = test.copyKeepNextFieldId(); + testSubschemaField.nextFieldId().incrementAndGet(); + assertEquals(7, test.nextFieldId().get()); + } + +} \ No newline at end of file