Skip to content

Commit

Permalink
Improve conversion of debezium schema to iceberg schema (#347)
Browse files Browse the repository at this point in the history
* Improve conversion of debezium schema to iceberg schema

* Improve conversion of debezium schema to iceberg schema
  • Loading branch information
ismailsimsek committed Jun 15, 2024
1 parent 0997cae commit ab211b3
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

import static io.debezium.server.iceberg.IcebergChangeConsumer.keyDeserializer;
import static io.debezium.server.iceberg.IcebergChangeConsumer.valDeserializer;
Expand Down Expand Up @@ -227,47 +226,46 @@ protected JsonNode keySchema() {
*
* @param fieldSchema JsonNode representation of debezium field schema.
* @param fieldName name of the debezium field
* @param fieldId id sequence to assign iceberg field, after the conversion.
* @param schemaData keeps information of iceberg schema like fields, nextFieldId and identifier fields
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static Map.Entry<Integer, Types.NestedField> debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) {
private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergChangeEventSchemaData schemaData) {
String fieldType = fieldSchema.get("type").textValue();
switch (fieldType) {
case "struct":
// struct type
int rootStructId = fieldId;
List<Types.NestedField> subFields = new ArrayList<>();
int rootStructId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepNextFieldId();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
fieldId += 1;
String subFieldName = subFieldSchema.get("field").textValue();
Map.Entry<Integer, Types.NestedField> subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId);
subFields.add(subField.getValue());
fieldId = subField.getKey();
debeziumFieldToIcebergField(subFieldSchema, subFieldName, subSchemaData);
}
// create it as struct, nested type
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields)));
schemaData.fields().add(Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subSchemaData.fields())));
return schemaData;
case "map":
int rootMapId = fieldId;
int keyFieldId = fieldId + 1;
int valFieldId = fieldId + 2;
fieldId = fieldId + 3;
Map.Entry<Integer, Types.NestedField> keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId);
fieldId = keyField.getKey() + 1;
Map.Entry<Integer, Types.NestedField> valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId);
fieldId = valField.getKey();
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField));
int rootMapId = schemaData.nextFieldId().getAndIncrement();
int keyFieldId = schemaData.nextFieldId().getAndIncrement();
int valFieldId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData);
schemaData.nextFieldId().incrementAndGet();
IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData);
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField));
return schemaData;

case "array":
int rootArrayId = fieldId;
fieldId += 1;
Map.Entry<Integer, Types.NestedField> listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId);
fieldId = listItemsField.getKey() + 1;
Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField));
int rootArrayId = schemaData.nextFieldId().getAndIncrement();
IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData);
Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField));
return schemaData;
default:
// its primitive field
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType)));
schemaData.fields().add(Types.NestedField.optional(schemaData.nextFieldId().getAndIncrement(), fieldName, icebergPrimitiveField(fieldName, fieldType)));
return schemaData;
}
}

Expand Down Expand Up @@ -302,19 +300,19 @@ private Schema icebergSchema(boolean isUnwrapped) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

final List<Types.NestedField> tableColumns = icebergSchemaFields(valueSchema);
final IcebergChangeEventSchemaData tableColumns = icebergSchemaFields(valueSchema);

if (tableColumns.isEmpty()) {
if (tableColumns.fields().isEmpty()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema has no fields!");
}

final List<Types.NestedField> keyColumns = icebergSchemaFields(keySchema);
final IcebergChangeEventSchemaData keyColumns = icebergSchemaFields(keySchema);
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
for (Types.NestedField ic : keyColumns.fields()) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
ListIterator<Types.NestedField> colsIterator = tableColumns.fields().listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
Expand All @@ -332,25 +330,22 @@ private Schema icebergSchema(boolean isUnwrapped) {

}

return new Schema(tableColumns, identifierFieldIds);
return new Schema(tableColumns.fields(), identifierFieldIds);
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode) {
IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
debeziumFieldToIcebergField(field, field.get("field").textValue(), schemaData);
}

return schemaColumns;
return schemaData;
}

private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.debezium.server.iceberg;

import org.apache.iceberg.types.Types;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

record IcebergChangeEventSchemaData(List<Types.NestedField> fields, Set<Integer> identifierFieldIds,
AtomicInteger nextFieldId) {


public IcebergChangeEventSchemaData(Integer nextFieldId) {
this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(nextFieldId));
}

public IcebergChangeEventSchemaData() {
this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(1));
}

public IcebergChangeEventSchemaData copyKeepNextFieldId() {
return new IcebergChangeEventSchemaData(new ArrayList<>(), new HashSet<>(), this.nextFieldId);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.debezium.server.iceberg;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

class IcebergChangeEventSchemaDataTest {

@Test
void nextFieldId() {

IcebergChangeEventSchemaData test = new IcebergChangeEventSchemaData(5);
test.identifierFieldIds().add(1);
assertEquals(6, test.nextFieldId().incrementAndGet());

IcebergChangeEventSchemaData testSubschemaField = test.copyKeepNextFieldId();
testSubschemaField.nextFieldId().incrementAndGet();
assertEquals(7, test.nextFieldId().get());
}

}

0 comments on commit ab211b3

Please sign in to comment.