diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index ef819282..02f526d1 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -8,13 +8,6 @@ package io.debezium.server.iceberg; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.*; - import com.fasterxml.jackson.databind.JsonNode; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; @@ -24,6 +17,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + /** * @author Ismail Simsek */ @@ -84,7 +85,7 @@ private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonN return record; } - private static Type.PrimitiveType icebergFieldType(String fieldName, String fieldType) { + private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) { switch (fieldType) { case "int8": case "int16": @@ -161,10 +162,36 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node } break; case LIST: + Types.NestedField listItemsType = field.type().asListType().fields().get(0); + // recursive value mapping when list elements are nested type + if (listItemsType.type().isNestedType()) { + ArrayList listVal = new ArrayList<>(); + node.elements().forEachRemaining(element -> { + listVal.add(jsonValToIcebergVal(field.type().asListType().fields().get(0), element)); + }); + val = listVal; + break; + } + val = IcebergChangeConsumer.mapper.convertValue(node, ArrayList.class); break; case MAP: - val = IcebergChangeConsumer.mapper.convertValue(node, Map.class); + Type keyType = field.type().asMapType().keyType(); + Type valType = field.type().asMapType().valueType(); + if (keyType.isPrimitiveType() && valType.isPrimitiveType()) { + val = IcebergChangeConsumer.mapper.convertValue(node, Map.class); + break; + } + // convert complex/nested map value with recursion + HashMap mapVal = new HashMap<>(); + node.fields().forEachRemaining(f -> { + if (valType.isStructType()) { + mapVal.put(f.getKey(), asIcebergRecord(valType.asStructType(), f.getValue())); + } else { + mapVal.put(f.getKey(), f.getValue()); + } + }); + val = mapVal; break; case STRUCT: // create it as struct, nested type @@ -181,7 +208,76 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node return val; } - public class JsonSchema { + /*** + * converts given debezium filed to iceberg field equivalent. does recursion in case of complex/nested types. + * + * @param fieldSchema JsonNode representation of debezium field schema. + * @param fieldName name of the debezium field + * @param fieldId id sequence to assign iceberg field, after the conversion. + * @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField. + */ + private static Map.Entry debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) { + String fieldType = fieldSchema.get("type").textValue(); + switch (fieldType) { + case "struct": + // struct type + int rootStructId = fieldId; + List subFields = new ArrayList<>(); + for (JsonNode subFieldSchema : fieldSchema.get("fields")) { + fieldId += 1; + String subFieldName = subFieldSchema.get("field").textValue(); + Map.Entry subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId); + subFields.add(subField.getValue()); + fieldId = subField.getKey(); + } + // create it as struct, nested type + return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields))); + case "map": + int rootMapId = fieldId; + int keyFieldId = fieldId + 1; + int valFieldId = fieldId + 2; + fieldId = fieldId + 3; + Map.Entry keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId); + fieldId = keyField.getKey() + 1; + Map.Entry valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId); + fieldId = valField.getKey(); + Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type()); + return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField)); + + case "array": + int rootArrayId = fieldId; + fieldId += 1; + Map.Entry listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId); + fieldId = listItemsField.getKey() + 1; + Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type()); + return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField)); + default: + // its primitive field + return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType))); + } + } + + /*** + * Converts debezium event fields to iceberg equivalent and returns list of iceberg fields. + * @param schemaNode + * @return + */ + private static List icebergSchemaFields(JsonNode schemaNode) { + List schemaColumns = new ArrayList<>(); + AtomicReference fieldId = new AtomicReference<>(1); + if (schemaNode != null && schemaNode.has("fields") && schemaNode.get("fields").isArray()) { + LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode); + schemaNode.get("fields").forEach(field -> { + Map.Entry df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get()); + fieldId.set(df.getKey() + 1); + schemaColumns.add(df.getValue()); + }); + } + return schemaColumns; + } + + + public static class JsonSchema { private final JsonNode valueSchema; private final JsonNode keySchema; @@ -211,38 +307,19 @@ public int hashCode() { return Objects.hash(valueSchema, keySchema); } - //getIcebergFieldsFromEventSchema - private List KeySchemaFields() { - if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) { - LOGGER.debug(keySchema.toString()); - return icebergSchema(keySchema, "", 0); - } - LOGGER.trace("Key schema not found!"); - return new ArrayList<>(); - } - - private List valueSchemaFields() { - if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) { - LOGGER.debug(valueSchema.toString()); - return icebergSchema(valueSchema, "", 0); - } - LOGGER.trace("Event schema not found!"); - return new ArrayList<>(); - } - public Schema icebergSchema() { if (this.valueSchema == null) { throw new RuntimeException("Failed to get event schema, event schema is null"); } - final List tableColumns = valueSchemaFields(); + final List tableColumns = icebergSchemaFields(valueSchema); if (tableColumns.isEmpty()) { throw new RuntimeException("Failed to get event schema, event schema has no fields!"); } - final List keyColumns = KeySchemaFields(); + final List keyColumns = icebergSchemaFields(keySchema); Set identifierFieldIds = new HashSet<>(); for (Types.NestedField ic : keyColumns) { @@ -268,53 +345,6 @@ public Schema icebergSchema() { return new Schema(tableColumns, identifierFieldIds); } - - private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId) { - List schemaColumns = new ArrayList<>(); - String schemaType = eventSchema.get("type").textValue(); - LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); - for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) { - columnId++; - String fieldName = jsonSchemaFieldNode.get("field").textValue(); - String fieldType = jsonSchemaFieldNode.get("type").textValue(); - LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType); - switch (fieldType) { - case "array": - JsonNode items = jsonSchemaFieldNode.get("items"); - if (items != null && items.has("type")) { - String listItemType = items.get("type").textValue(); - - if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) { - throw new RuntimeException("Complex nested array types are not supported," + " array[" + listItemType + "], field " + fieldName); - } - - Type.PrimitiveType item = icebergFieldType(fieldName, listItemType); - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); - } else { - throw new RuntimeException("Unexpected Array type for field " + fieldName); - } - break; - case "map": - String keyFieldType = jsonSchemaFieldNode.get("keys").get("type").textValue(); - String varFieldlType = jsonSchemaFieldNode.get("keys").get("type").textValue(); - Types.MapType mapField = Types.MapType.ofOptional(columnId, ++columnId, icebergFieldType(fieldName+".keys", keyFieldType), icebergFieldType(fieldName+".values", varFieldlType)); - schemaColumns.add(Types.NestedField.optional(++columnId,fieldName, mapField)); - break; - case "struct": - // create it as struct, nested type - List subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId); - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); - columnId += subSchema.size(); - break; - default: //primitive types - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldName, fieldType))); - break; - } - } - - return schemaColumns; - } - } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 0ecc5422..af617677 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -8,6 +8,7 @@ package io.debezium.server.iceberg; +import com.google.common.collect.Lists; import io.debezium.server.iceberg.testresources.BaseSparkTest; import io.debezium.server.iceberg.testresources.S3Minio; import io.debezium.server.iceberg.testresources.SourcePostgresqlDB; @@ -15,12 +16,6 @@ import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; - -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; - -import com.google.common.collect.Lists; import jakarta.inject.Inject; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -34,6 +29,11 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -59,7 +59,6 @@ public class IcebergChangeConsumerTest extends BaseSparkTest { @Test public void testConsumingVariousDataTypes() throws Exception { assertEquals(sinkType, "iceberg"); - SourcePostgresqlDB.runSQL("CREATE EXTENSION hstore;"); String sql = "\n" + " DROP TABLE IF EXISTS inventory.data_types;\n" + " CREATE TABLE IF NOT EXISTS inventory.data_types (\n" + @@ -102,7 +101,7 @@ public void testConsumingVariousDataTypes() throws Exception { Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> { try { Dataset df = getTableData("testc.inventory.data_types"); - df.show(true); + df.show(false); return df.where("c_text is null AND c_varchar is null AND c_int is null " + "AND c_date is null AND c_timestamp is null AND c_timestamptz is null " + "AND c_float is null AND c_decimal is null AND c_numeric is null AND c_interval is null " + @@ -114,7 +113,7 @@ public void testConsumingVariousDataTypes() throws Exception { Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> { try { Dataset df = getTableData("testc.inventory.data_types"); - df.show(true); + df.show(false); return df.count() == 2; } catch (Exception e) { return false; @@ -128,19 +127,23 @@ public void testConsumingArrayDataType() throws Exception { " CREATE TABLE IF NOT EXISTS inventory.array_data (\n" + " name text,\n" + " pay_by_quarter integer[],\n" + + " c_array_of_map hstore[],\n" + " schedule text[][]\n" + " );\n" + " INSERT INTO inventory.array_data\n" + " VALUES " + "('Carol2',\n" + " ARRAY[20000, 25000, 25000, 25000],\n" + + " ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" + " ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n" + "('Bill',\n" + " '{10000, 10000, 10000, 10000}',\n" + + " ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" + " '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" + " ('Carol1',\n" + - " '{20000, 25000, 25000, 25000}',\n" + - " '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" + + " '{20000, 25000, 25000, 25000}',\n" + + " ARRAY['mapkey1=>1, mapkey2=>2'::hstore],\n" + + " '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" + ";"; SourcePostgresqlDB.runSQL(sql); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java similarity index 86% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java index 5dcf8eff..10fccf0d 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeEventTest.java @@ -8,31 +8,31 @@ package io.debezium.server.iceberg; -import io.debezium.serde.DebeziumSerdes; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import io.debezium.serde.DebeziumSerdes; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.types.Types; import org.apache.kafka.common.serialization.Serde; import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; + import static io.debezium.server.iceberg.IcebergChangeConsumer.mapper; import static org.junit.jupiter.api.Assertions.*; -class TestIcebergUtil { +class IcebergChangeEventTest { final String serdeWithSchema = Files.readString(Path.of("src/test/resources/json/serde-with-schema.json")); final String unwrapWithSchema = Files.readString(Path.of("src/test/resources/json/unwrap-with-schema.json")); final String unwrapWithGeomSchema = Files.readString(Path.of("src/test/resources/json/serde-with-schema_geom.json")); final String unwrapWithArraySchema = Files.readString(Path.of("src/test/resources/json/serde-with-array.json")); final String unwrapWithArraySchema2 = Files.readString(Path.of("src/test/resources/json/serde-with-array2.json")); - TestIcebergUtil() throws IOException { + IcebergChangeEventTest() throws IOException { } @Test @@ -41,8 +41,9 @@ public void testNestedJsonRecord() throws JsonProcessingException { mapper.readTree(serdeWithSchema).get("payload"), null, mapper.readTree(serdeWithSchema).get("schema"), null); Schema schema = e.icebergSchema(); + System.out.println(schema.toString()); assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " + - "4:")); + "4:")); } @Test @@ -64,7 +65,9 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException { mapper.readTree(unwrapWithArraySchema).get("payload"), null, mapper.readTree(unwrapWithArraySchema).get("schema"), null); Schema schema = e.icebergSchema(); - assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list, 4: schedule: optional list, 6:")); + System.out.println(schema); + System.out.println(schema.asStruct()); + assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list, 5: schedule: optional list, 8:")); System.out.println(schema.asStruct()); System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType()); System.out.println(schema.findField("schedule").type().asListType().elementType()); @@ -77,16 +80,15 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException { @Test public void testNestedArray2JsonRecord() throws JsonProcessingException { - assertThrows(RuntimeException.class, () -> { - IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(unwrapWithArraySchema2).get("payload"), null, - mapper.readTree(unwrapWithArraySchema2).get("schema"), null); - Schema schema = e.icebergSchema(); - System.out.println(schema.asStruct()); - System.out.println(schema); - System.out.println(schema.findField("tableChanges")); - System.out.println(schema.findField("tableChanges").type().asListType().elementType()); - }); + IcebergChangeEvent e = new IcebergChangeEvent("test", + mapper.readTree(unwrapWithArraySchema2).get("payload"), null, + mapper.readTree(unwrapWithArraySchema2).get("schema"), null); + Schema schema = e.icebergSchema(); + System.out.println(schema.asStruct()); + System.out.println(schema); + assertTrue(schema.asStruct().toString().contains("20: tableChanges: optional list start() { container.start(); + try { + SourcePostgresqlDB.runSQL("CREATE EXTENSION hstore;"); + } catch (SQLException | ClassNotFoundException e) { + throw new RuntimeException(e); + } Map params = new ConcurrentHashMap<>(); params.put("debezium.source.database.hostname", POSTGRES_HOST);