Skip to content

Commit

Permalink
Improve setting identifier fields
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 15, 2024
1 parent d7452d4 commit f9c564a
Showing 1 changed file with 24 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.debezium.DebeziumException;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
Expand Down Expand Up @@ -229,42 +228,52 @@ protected JsonNode keySchema() {
* @param schemaData keeps information of iceberg schema like fields, nextFieldId and identifier fields
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergChangeEventSchemaData schemaData) {
private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergChangeEventSchemaData schemaData, JsonNode keySchemaNode) {
String fieldType = fieldSchema.get("type").textValue();
boolean isPkField = !(keySchemaNode == null || keySchemaNode.isNull());
switch (fieldType) {
case "struct":
int rootStructId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepNextFieldId();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
String subFieldName = subFieldSchema.get("field").textValue();
debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData);
JsonNode equivalentNestedKeyField = findNodeFieldByName(subFieldName, keySchemaNode);
debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData, equivalentNestedKeyField);
}
// create it as struct, nested type
schemaData.fields().add(Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subSchemaData.fields())));
return schemaData;
case "map":
if (isPkField) {
throw new DebeziumException("Cannot set map field '" + fieldName + "' as a identifier field, is not supported as a identifier field!");
}
int rootMapId = schemaData.nextFieldId().getAndIncrement();
int keyFieldId = schemaData.nextFieldId().getAndIncrement();
int valFieldId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData);
debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData, null);
schemaData.nextFieldId().incrementAndGet();
IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData);
debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData, null);
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField));
return schemaData;

case "array":
if (isPkField) {
throw new DebeziumException("Cannot set array field '" + fieldName + "' as a identifier field, is not supported as a identifier field!");
}
int rootArrayId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData);
debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData, null);
Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField));
return schemaData;
default:
// its primitive field
schemaData.fields().add(Types.NestedField.optional(schemaData.nextFieldId().getAndIncrement(), fieldName, icebergPrimitiveField(fieldName, fieldType)));
Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType));
schemaData.fields().add(field);
if (isPkField) schemaData.identifierFieldIds().add(field.fieldId());
return schemaData;
}
}
Expand Down Expand Up @@ -300,49 +309,29 @@ private Schema icebergSchema(boolean isUnwrapped) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

final IcebergChangeEventSchemaData tableColumns = icebergSchemaFields(valueSchema);
final IcebergChangeEventSchemaData schemaData = icebergSchemaFields(valueSchema, keySchema);

if (tableColumns.fields().isEmpty()) {
if (schemaData.fields().isEmpty()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema has no fields!");
}

final IcebergChangeEventSchemaData keyColumns = icebergSchemaFields(keySchema);
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns.fields()) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.fields().listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Debezium key/identifier field `" + ic.name() + "` not found in event columns!");
}

}
// @TODO validate key fields are correctly set!?
return new Schema(schemaData.fields(), schemaData.identifierFieldIds());

return new Schema(tableColumns.fields(), identifierFieldIds);
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode) {
private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode) {
IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
debeziumFieldToIcebergField(field, field.get("field").textValue(), schemaData);
String fieldName = field.get("field").textValue();
JsonNode equivalentKeyFieldNode = findNodeFieldByName(fieldName, keySchemaNode);
debeziumFieldToIcebergField(field, fieldName, schemaData, equivalentKeyFieldNode);
}

return schemaData;
Expand Down

0 comments on commit f9c564a

Please sign in to comment.