From 388a14512f1c6a1d619d2f54e0030ac7fb3a1fcf Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Wed, 12 Jun 2024 22:54:18 +0200 Subject: [PATCH] Improve naming, rename JsonSchema to ChangeEventSchema --- .../server/iceberg/IcebergChangeEvent.java | 229 +++++++++--------- .../tableoperator/IcebergTableOperator.java | 6 +- .../server/iceberg/TestChangeEvent.java | 10 +- 3 files changed, 124 insertions(+), 121 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index ebe8b70d..8c25a45c 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -68,16 +68,16 @@ public JsonNode value() { return value; } - public JsonSchema jsonSchema() { + public ChangeEventSchema changeEventSchema() { try { - return new JsonSchema(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema")); + return new ChangeEventSchema(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema")); } catch (IOException e) { throw new DebeziumException("Failed to get event schema", e); } } public Schema icebergSchema() { - return jsonSchema().icebergSchema(); + return changeEventSchema().icebergSchema(); } public String destination() { @@ -105,40 +105,6 @@ private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonN return record; } - private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) { - switch (fieldType) { - case "int8": - case "int16": - case "int32": // int 4 bytes - return Types.IntegerType.get(); - case "int64": // long 8 bytes - if (TS_MS_FIELDS.contains(fieldName)) { - return Types.TimestampType.withZone(); - } else { - return Types.LongType.get(); - } - case "float8": - case "float16": - case "float32": // float is represented in 32 bits, - return Types.FloatType.get(); - case "double": - case "float64": // double is represented in 64 bits - return Types.DoubleType.get(); - case "boolean": - return Types.BooleanType.get(); - case "string": - return Types.StringType.get(); - case "uuid": - return Types.UUIDType.get(); - case "bytes": - return Types.BinaryType.get(); - default: - // default to String type - return Types.StringType.get(); - //throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!"); - } - } - private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) { LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); final Object val; @@ -228,80 +194,12 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node return val; } - /*** - * converts given debezium filed to iceberg field equivalent. does recursion in case of complex/nested types. - * - * @param fieldSchema JsonNode representation of debezium field schema. - * @param fieldName name of the debezium field - * @param fieldId id sequence to assign iceberg field, after the conversion. - * @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField. - */ - private static Map.Entry debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) { - String fieldType = fieldSchema.get("type").textValue(); - switch (fieldType) { - case "struct": - // struct type - int rootStructId = fieldId; - List subFields = new ArrayList<>(); - for (JsonNode subFieldSchema : fieldSchema.get("fields")) { - fieldId += 1; - String subFieldName = subFieldSchema.get("field").textValue(); - Map.Entry subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId); - subFields.add(subField.getValue()); - fieldId = subField.getKey(); - } - // create it as struct, nested type - return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields))); - case "map": - int rootMapId = fieldId; - int keyFieldId = fieldId + 1; - int valFieldId = fieldId + 2; - fieldId = fieldId + 3; - Map.Entry keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId); - fieldId = keyField.getKey() + 1; - Map.Entry valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId); - fieldId = valField.getKey(); - Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type()); - return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField)); - - case "array": - int rootArrayId = fieldId; - fieldId += 1; - Map.Entry listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId); - fieldId = listItemsField.getKey() + 1; - Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type()); - return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField)); - default: - // its primitive field - return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType))); - } - } - - /*** - * Converts debezium event fields to iceberg equivalent and returns list of iceberg fields. - * @param schemaNode - * @return - */ - private static List icebergSchemaFields(JsonNode schemaNode) { - List schemaColumns = new ArrayList<>(); - AtomicReference fieldId = new AtomicReference<>(1); - if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) { - LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode); - schemaNode.get("fields").forEach(field -> { - Map.Entry df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get()); - fieldId.set(df.getKey() + 1); - schemaColumns.add(df.getValue()); - }); - } - return schemaColumns; - } - - public static class JsonSchema { + public static class ChangeEventSchema { private final JsonNode valueSchema; private final JsonNode keySchema; - JsonSchema(JsonNode valueSchema, JsonNode keySchema) { + ChangeEventSchema(JsonNode valueSchema, JsonNode keySchema) { this.valueSchema = valueSchema; this.keySchema = keySchema; } @@ -314,12 +212,53 @@ protected JsonNode keySchema() { return keySchema; } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - JsonSchema that = (JsonSchema) o; - return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema); + /*** + * converts given debezium filed to iceberg field equivalent. does recursion in case of complex/nested types. + * + * @param fieldSchema JsonNode representation of debezium field schema. + * @param fieldName name of the debezium field + * @param fieldId id sequence to assign iceberg field, after the conversion. + * @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField. + */ + private static Map.Entry debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) { + String fieldType = fieldSchema.get("type").textValue(); + switch (fieldType) { + case "struct": + // struct type + int rootStructId = fieldId; + List subFields = new ArrayList<>(); + for (JsonNode subFieldSchema : fieldSchema.get("fields")) { + fieldId += 1; + String subFieldName = subFieldSchema.get("field").textValue(); + Map.Entry subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId); + subFields.add(subField.getValue()); + fieldId = subField.getKey(); + } + // create it as struct, nested type + return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields))); + case "map": + int rootMapId = fieldId; + int keyFieldId = fieldId + 1; + int valFieldId = fieldId + 2; + fieldId = fieldId + 3; + Map.Entry keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId); + fieldId = keyField.getKey() + 1; + Map.Entry valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId); + fieldId = valField.getKey(); + Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type()); + return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField)); + + case "array": + int rootArrayId = fieldId; + fieldId += 1; + Map.Entry listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId); + fieldId = listItemsField.getKey() + 1; + Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type()); + return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField)); + default: + // its primitive field + return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType))); + } } @Override @@ -365,6 +304,70 @@ private Schema icebergSchema() { return new Schema(tableColumns, identifierFieldIds); } + + /*** + * Converts debezium event fields to iceberg equivalent and returns list of iceberg fields. + * @param schemaNode + * @return + */ + private static List icebergSchemaFields(JsonNode schemaNode) { + List schemaColumns = new ArrayList<>(); + AtomicReference fieldId = new AtomicReference<>(1); + if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) { + LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode); + schemaNode.get("fields").forEach(field -> { + Map.Entry df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get()); + fieldId.set(df.getKey() + 1); + schemaColumns.add(df.getValue()); + }); + } + return schemaColumns; + } + + private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) { + switch (fieldType) { + case "int8": + case "int16": + case "int32": // int 4 bytes + return Types.IntegerType.get(); + case "int64": // long 8 bytes + if (TS_MS_FIELDS.contains(fieldName)) { + return Types.TimestampType.withZone(); + } else { + return Types.LongType.get(); + } + case "float8": + case "float16": + case "float32": // float is represented in 32 bits, + return Types.FloatType.get(); + case "double": + case "float64": // double is represented in 64 bits + return Types.DoubleType.get(); + case "boolean": + return Types.BooleanType.get(); + case "string": + return Types.StringType.get(); + case "uuid": + return Types.UUIDType.get(); + case "bytes": + return Types.BinaryType.get(); + default: + // default to String type + return Types.StringType.get(); + //throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!"); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ChangeEventSchema that = (ChangeEventSchema) o; + return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema); + } + + } + } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 8974668a..3b1423cf 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -149,12 +149,12 @@ public void addToTable(Table icebergTable, List events) { // if field additions not enabled add set of events to table addToTablePerSchema(icebergTable, events); } else { - Map> eventsGroupedBySchema = + Map> eventsGroupedBySchema = events.stream() - .collect(Collectors.groupingBy(IcebergChangeEvent::jsonSchema)); + .collect(Collectors.groupingBy(IcebergChangeEvent::changeEventSchema)); LOGGER.debug("Batch got {} records with {} different schema!!", events.size(), eventsGroupedBySchema.keySet().size()); - for (Map.Entry> schemaEvents : eventsGroupedBySchema.entrySet()) { + for (Map.Entry> schemaEvents : eventsGroupedBySchema.entrySet()) { // extend table schema if new fields found applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema()); // add set of events to table diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java index 6924e6d9..dd2ea415 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEvent.java @@ -49,11 +49,11 @@ public static TestChangeEvent of(String destination, Integer id, .build(); final String key = "{" + - "\"schema\":" + t.jsonSchema().keySchema() + "," + + "\"schema\":" + t.changeEventSchema().keySchema() + "," + "\"payload\":" + t.key() + "} "; final String val = "{" + - "\"schema\":" + t.jsonSchema().valueSchema() + "," + + "\"schema\":" + t.changeEventSchema().valueSchema() + "," + "\"payload\":" + t.value() + "} "; return new TestChangeEvent<>(key, val, destination); @@ -71,11 +71,11 @@ public static TestChangeEvent ofCompositeKey(String destination, .build(); final String key = "{" + - "\"schema\":" + t.jsonSchema().keySchema() + "," + + "\"schema\":" + t.changeEventSchema().keySchema() + "," + "\"payload\":" + t.key() + "} "; final String val = "{" + - "\"schema\":" + t.jsonSchema().valueSchema() + "," + + "\"schema\":" + t.changeEventSchema().valueSchema() + "," + "\"payload\":" + t.value() + "} "; @@ -106,7 +106,7 @@ public static TestChangeEvent ofNoKey(String destination, Intege .build(); final String val = "{" + - "\"schema\":" + t.jsonSchema().valueSchema() + "," + + "\"schema\":" + t.changeEventSchema().valueSchema() + "," + "\"payload\":" + t.value() + "} "; return new TestChangeEvent<>(null, val, destination);