Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve setting identifier fields #350

Merged
merged 1 commit into from
Jun 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.debezium.DebeziumException;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
Expand Down Expand Up @@ -229,42 +228,52 @@ protected JsonNode keySchema() {
* @param schemaData keeps information of iceberg schema like fields, nextFieldId and identifier fields
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergChangeEventSchemaData schemaData) {
private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergChangeEventSchemaData schemaData, JsonNode keySchemaNode) {
String fieldType = fieldSchema.get("type").textValue();
boolean isPkField = !(keySchemaNode == null || keySchemaNode.isNull());
switch (fieldType) {
case "struct":
int rootStructId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepNextFieldId();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
String subFieldName = subFieldSchema.get("field").textValue();
debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData);
JsonNode equivalentNestedKeyField = findNodeFieldByName(subFieldName, keySchemaNode);
debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData, equivalentNestedKeyField);
}
// create it as struct, nested type
schemaData.fields().add(Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subSchemaData.fields())));
return schemaData;
case "map":
if (isPkField) {
throw new DebeziumException("Cannot set map field '" + fieldName + "' as a identifier field, map types are not supported as an identifier field!");
}
int rootMapId = schemaData.nextFieldId().getAndIncrement();
int keyFieldId = schemaData.nextFieldId().getAndIncrement();
int valFieldId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData);
debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData, null);
schemaData.nextFieldId().incrementAndGet();
IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData);
debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData, null);
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField));
return schemaData;

case "array":
if (isPkField) {
throw new DebeziumException("Cannot set array field '" + fieldName + "' as a identifier field, array types are not supported as an identifier field!");
}
int rootArrayId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData);
debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData, null);
Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField));
return schemaData;
default:
// its primitive field
schemaData.fields().add(Types.NestedField.optional(schemaData.nextFieldId().getAndIncrement(), fieldName, icebergPrimitiveField(fieldName, fieldType)));
Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType));
schemaData.fields().add(field);
if (isPkField) schemaData.identifierFieldIds().add(field.fieldId());
return schemaData;
}
}
Expand Down Expand Up @@ -300,49 +309,29 @@ private Schema icebergSchema(boolean isUnwrapped) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

final IcebergChangeEventSchemaData tableColumns = icebergSchemaFields(valueSchema);
final IcebergChangeEventSchemaData schemaData = icebergSchemaFields(valueSchema, keySchema);

if (tableColumns.fields().isEmpty()) {
if (schemaData.fields().isEmpty()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema has no fields!");
}

final IcebergChangeEventSchemaData keyColumns = icebergSchemaFields(keySchema);
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns.fields()) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.fields().listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Debezium key/identifier field `" + ic.name() + "` not found in event columns!");
}

}
// @TODO validate key fields are correctly set!?
return new Schema(schemaData.fields(), schemaData.identifierFieldIds());

return new Schema(tableColumns.fields(), identifierFieldIds);
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode) {
private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode) {
IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
debeziumFieldToIcebergField(field, field.get("field").textValue(), schemaData);
String fieldName = field.get("field").textValue();
JsonNode equivalentKeyFieldNode = findNodeFieldByName(fieldName, keySchemaNode);
debeziumFieldToIcebergField(field, fieldName, schemaData, equivalentKeyFieldNode);
}

return schemaData;
Expand Down