From f9c564af26a7163369d0111095eede32ec29aa3c Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 15 Jun 2024 15:31:11 +0200 Subject: [PATCH] Improve setting identifier fields --- .../server/iceberg/IcebergChangeEvent.java | 59 ++++++++----------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 7b324857..dc2a0f75 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -13,7 +13,6 @@ import io.debezium.DebeziumException; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.slf4j.Logger; @@ -229,42 +228,52 @@ protected JsonNode keySchema() { * @param schemaData keeps information of iceberg schema like fields, nextFieldId and identifier fields * @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField. */ - private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergChangeEventSchemaData schemaData) { + private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergChangeEventSchemaData schemaData, JsonNode keySchemaNode) { String fieldType = fieldSchema.get("type").textValue(); + boolean isPkField = !(keySchemaNode == null || keySchemaNode.isNull()); switch (fieldType) { case "struct": int rootStructId = schemaData.nextFieldId().getAndIncrement(); IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepNextFieldId(); for (JsonNode subFieldSchema : fieldSchema.get("fields")) { String subFieldName = subFieldSchema.get("field").textValue(); - debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData); + JsonNode equivalentNestedKeyField = findNodeFieldByName(subFieldName, keySchemaNode); + debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData, equivalentNestedKeyField); } // create it as struct, nested type schemaData.fields().add(Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subSchemaData.fields()))); return schemaData; case "map": + if (isPkField) { + throw new DebeziumException("Cannot set map field '" + fieldName + "' as a identifier field, is not supported as a identifier field!"); + } int rootMapId = schemaData.nextFieldId().getAndIncrement(); int keyFieldId = schemaData.nextFieldId().getAndIncrement(); int valFieldId = schemaData.nextFieldId().getAndIncrement(); IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepNextFieldId(); - debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData); + debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData, null); schemaData.nextFieldId().incrementAndGet(); IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepNextFieldId(); - debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData); + debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData, null); Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type()); schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField)); return schemaData; case "array": + if (isPkField) { + throw new DebeziumException("Cannot set array field '" + fieldName + "' as a identifier field, is not supported as a identifier field!"); + } int rootArrayId = schemaData.nextFieldId().getAndIncrement(); IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepNextFieldId(); - debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData); + debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData, null); Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type()); schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField)); return schemaData; default: // its primitive field - schemaData.fields().add(Types.NestedField.optional(schemaData.nextFieldId().getAndIncrement(), fieldName, icebergPrimitiveField(fieldName, fieldType))); + Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType)); + schemaData.fields().add(field); + if (isPkField) schemaData.identifierFieldIds().add(field.fieldId()); return schemaData; } } @@ -300,37 +309,15 @@ private Schema icebergSchema(boolean isUnwrapped) { throw new RuntimeException("Failed to get schema from debezium event, event schema is null"); } - final IcebergChangeEventSchemaData tableColumns = icebergSchemaFields(valueSchema); + final IcebergChangeEventSchemaData schemaData = icebergSchemaFields(valueSchema, keySchema); - if (tableColumns.fields().isEmpty()) { + if (schemaData.fields().isEmpty()) { throw new RuntimeException("Failed to get schema from debezium event, event schema has no fields!"); } - final IcebergChangeEventSchemaData keyColumns = icebergSchemaFields(keySchema); - Set identifierFieldIds = new HashSet<>(); - - for (Types.NestedField ic : keyColumns.fields()) { - boolean found = false; - - ListIterator colsIterator = tableColumns.fields().listIterator(); - while (colsIterator.hasNext()) { - Types.NestedField tc = colsIterator.next(); - if (Objects.equals(tc.name(), ic.name())) { - identifierFieldIds.add(tc.fieldId()); - // set column as required its part of identifier filed - colsIterator.set(tc.asRequired()); - found = true; - break; - } - } - - if (!found) { - throw new ValidationException("Debezium key/identifier field `" + ic.name() + "` not found in event columns!"); - } - - } + // @TODO validate key fields are correctly set!? + return new Schema(schemaData.fields(), schemaData.identifierFieldIds()); - return new Schema(tableColumns.fields(), identifierFieldIds); } /*** @@ -338,11 +325,13 @@ private Schema icebergSchema(boolean isUnwrapped) { * @param schemaNode * @return */ - private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode) { + private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode) { IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData(); LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode); for (JsonNode field : getNodeFieldsArray(schemaNode)) { - debeziumFieldToIcebergField(field, field.get("field").textValue(), schemaData); + String fieldName = field.get("field").textValue(); + JsonNode equivalentKeyFieldNode = findNodeFieldByName(fieldName, keySchemaNode); + debeziumFieldToIcebergField(field, fieldName, schemaData, equivalentKeyFieldNode); } return schemaData;