From f9c350b8c658dc697d0d60c1f886118aea275e3c Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Tue, 4 Jan 2022 10:23:36 +0100 Subject: [PATCH] improve IcebergChangeEvent and iceberg schema generation --- .../server/iceberg/IcebergChangeConsumer.java | 3 +- .../server/iceberg/IcebergChangeEvent.java | 277 ++++++++++-------- .../server/iceberg/TestIcebergUtil.java | 10 +- .../server/iceberg/testresources/S3Minio.java | 4 +- 4 files changed, 167 insertions(+), 127 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index a208a839..6db1111a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -138,6 +138,7 @@ public void handleBatch(List> records, DebeziumEngin throws InterruptedException { Instant start = Instant.now(); + //group events by destination Map> result = records.stream() .map((ChangeEvent e) @@ -185,7 +186,7 @@ public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, I if (!eventSchemaEnabled) { throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); } - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.getSchema(), writeFormat, !upsert); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert); }); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index e9f77c8b..be20a265 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -33,8 +33,7 @@ public class IcebergChangeEvent { protected final String destination; protected final JsonNode value; protected final JsonNode key; - protected final JsonNode valueSchema; - protected final JsonNode keySchema; + JsonSchema jsonSchema; public IcebergChangeEvent(String destination, JsonNode value, @@ -44,14 +43,25 @@ public IcebergChangeEvent(String destination, this.destination = destination; this.value = value; this.key = key; - this.valueSchema = valueSchema; - this.keySchema = keySchema; + this.jsonSchema = new JsonSchema(valueSchema, keySchema); } public JsonNode key() { return key; } + public JsonNode value() { + return value; + } + + public JsonSchema jsonSchema() { + return jsonSchema; + } + + public Schema icebergSchema() { + return jsonSchema.icebergSchema(); + } + public String destinationTable() { return destination.replace(".", "_"); } @@ -69,49 +79,6 @@ public GenericRecord asIcebergRecord(Schema schema) { return record; } - public String schemaHashCode() { - return valueSchema.hashCode() + "-" + keySchema.hashCode(); - } - - public Schema getSchema() { - - if (this.valueSchema == null) { - throw new RuntimeException("Failed to get event schema, event value is null, destination:" + this.destination); - } - - final List tableColumns = valueSchemaFields(); - - if (tableColumns.isEmpty()) { - throw new RuntimeException("Failed to get schema destination:" + this.destination); - } - - final List keyColumns = KeySchemaFields(); - Set identifierFieldIds = new HashSet<>(); - - for (Types.NestedField ic : keyColumns) { - boolean found = false; - - ListIterator colsIterator = tableColumns.listIterator(); - while (colsIterator.hasNext()) { - Types.NestedField tc = colsIterator.next(); - if (Objects.equals(tc.name(), ic.name())) { - identifierFieldIds.add(tc.fieldId()); - // set column as required its part of identifier filed - colsIterator.set(tc.asRequired()); - found = true; - break; - } - } - - if (!found) { - throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns"); - } - - } - - return new Schema(tableColumns, identifierFieldIds); - } - private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) { LOGGER.debug("Processing nested field:{}", tableFields); GenericRecord record = GenericRecord.create(tableFields); @@ -129,25 +96,6 @@ private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode dat return record; } - //getIcebergFieldsFromEventSchema - private List KeySchemaFields() { - if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) { - LOGGER.debug(keySchema.toString()); - return icebergSchema(keySchema, "", 0); - } - LOGGER.trace("Key schema not found!"); - return new ArrayList<>(); - } - - private List valueSchemaFields() { - if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) { - LOGGER.debug(valueSchema.toString()); - return icebergSchema(valueSchema, "", 0, true); - } - LOGGER.trace("Event schema not found!"); - return new ArrayList<>(); - } - private Type.PrimitiveType icebergFieldType(String fieldType) { switch (fieldType) { case "int8": @@ -175,59 +123,6 @@ private Type.PrimitiveType icebergFieldType(String fieldType) { } } - private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId) { - return icebergSchema(eventSchema, schemaName, columnId, false); - } - - private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId, - boolean addSourceTsField) { - List schemaColumns = new ArrayList<>(); - String schemaType = eventSchema.get("type").textValue(); - LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); - for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) { - columnId++; - String fieldName = jsonSchemaFieldNode.get("field").textValue(); - String fieldType = jsonSchemaFieldNode.get("type").textValue(); - LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType); - switch (fieldType) { - case "array": - JsonNode items = jsonSchemaFieldNode.get("items"); - if (items != null && items.has("type")) { - String listItemType = items.get("type").textValue(); - if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) { - throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName); - } - Type.PrimitiveType item = icebergFieldType(listItemType); - schemaColumns.add(Types.NestedField.optional( - columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); - //throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!"); - } else { - throw new RuntimeException("Unexpected Array type for field " + fieldName); - } - break; - case "map": - throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!"); - //schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get())); - //break; - case "struct": - // create it as struct, nested type - List subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId); - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); - columnId += subSchema.size(); - break; - default: //primitive types - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType))); - break; - } - } - - if (addSourceTsField) { - columnId++; - schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone())); - } - return schemaColumns; - } - private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) { LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); @@ -281,4 +176,148 @@ private Object jsonValToIcebergVal(Types.NestedField field, return val; } + public class JsonSchema { + private final JsonNode valueSchema; + private final JsonNode keySchema; + + JsonSchema(JsonNode valueSchema, JsonNode keySchema) { + this.valueSchema = valueSchema; + this.keySchema = keySchema; + } + + public JsonNode valueSchema() { + return valueSchema; + } + + public JsonNode keySchema() { + return keySchema; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + JsonSchema that = (JsonSchema) o; + return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema); + } + + @Override + public int hashCode() { + return Objects.hash(valueSchema, keySchema); + } + + //getIcebergFieldsFromEventSchema + private List KeySchemaFields() { + if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) { + LOGGER.debug(keySchema.toString()); + return icebergSchema(keySchema, "", 0); + } + LOGGER.trace("Key schema not found!"); + return new ArrayList<>(); + } + + private List valueSchemaFields() { + if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) { + LOGGER.debug(valueSchema.toString()); + return icebergSchema(valueSchema, "", 0, true); + } + LOGGER.trace("Event schema not found!"); + return new ArrayList<>(); + } + + public Schema icebergSchema() { + + if (this.valueSchema == null) { + throw new RuntimeException("Failed to get event schema, event schema is null"); + } + + final List tableColumns = valueSchemaFields(); + + if (tableColumns.isEmpty()) { + throw new RuntimeException("Failed to get event schema, event schema has no fields!"); + } + + final List keyColumns = KeySchemaFields(); + Set identifierFieldIds = new HashSet<>(); + + for (Types.NestedField ic : keyColumns) { + boolean found = false; + + ListIterator colsIterator = tableColumns.listIterator(); + while (colsIterator.hasNext()) { + Types.NestedField tc = colsIterator.next(); + if (Objects.equals(tc.name(), ic.name())) { + identifierFieldIds.add(tc.fieldId()); + // set column as required its part of identifier filed + colsIterator.set(tc.asRequired()); + found = true; + break; + } + } + + if (!found) { + throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns"); + } + + } + + return new Schema(tableColumns, identifierFieldIds); + } + + private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId) { + return icebergSchema(eventSchema, schemaName, columnId, false); + } + + private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId, + boolean addSourceTsField) { + List schemaColumns = new ArrayList<>(); + String schemaType = eventSchema.get("type").textValue(); + LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); + for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) { + columnId++; + String fieldName = jsonSchemaFieldNode.get("field").textValue(); + String fieldType = jsonSchemaFieldNode.get("type").textValue(); + LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType); + switch (fieldType) { + case "array": + JsonNode items = jsonSchemaFieldNode.get("items"); + if (items != null && items.has("type")) { + String listItemType = items.get("type").textValue(); + + if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) { + throw new RuntimeException("Complex nested array types are not supported," + + " array[" + listItemType + "], field " + fieldName); + } + + Type.PrimitiveType item = icebergFieldType(listItemType); + schemaColumns.add(Types.NestedField.optional( + columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); + } else { + throw new RuntimeException("Unexpected Array type for field " + fieldName); + } + break; + case "map": + throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!"); + //break; + case "struct": + // create it as struct, nested type + List subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId); + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); + columnId += subSchema.size(); + break; + default: //primitive types + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType))); + break; + } + } + + if (addSourceTsField) { + columnId++; + schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone())); + } + return schemaColumns; + } + + } + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index 185fa27c..f92ec654 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -36,7 +36,7 @@ public void testNestedJsonRecord() throws JsonProcessingException { IcebergChangeEvent e = new IcebergChangeEvent("test", mapper.readTree(serdeWithSchema).get("payload"), null, mapper.readTree(serdeWithSchema).get("schema"), null); - Schema schema = e.getSchema(); + Schema schema = e.icebergSchema(); assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " + "4:")); } @@ -46,7 +46,7 @@ public void testUnwrapJsonRecord() throws IOException { IcebergChangeEvent e = new IcebergChangeEvent("test", mapper.readTree(unwrapWithSchema).get("payload"), null, mapper.readTree(unwrapWithSchema).get("schema"), null); - Schema schema = e.getSchema(); + Schema schema = e.icebergSchema(); GenericRecord record = e.asIcebergRecord(schema); assertEquals("orders", record.getField("__table").toString()); assertEquals(16850, record.getField("order_date")); @@ -59,7 +59,7 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException { IcebergChangeEvent e = new IcebergChangeEvent("test", mapper.readTree(unwrapWithArraySchema).get("payload"), null, mapper.readTree(unwrapWithArraySchema).get("schema"), null); - Schema schema = e.getSchema(); + Schema schema = e.icebergSchema(); assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list, 4: schedule: optional list, 6:")); System.out.println(schema.asStruct()); System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType()); @@ -77,7 +77,7 @@ public void testNestedArray2JsonRecord() throws JsonProcessingException { IcebergChangeEvent e = new IcebergChangeEvent("test", mapper.readTree(unwrapWithArraySchema2).get("payload"), null, mapper.readTree(unwrapWithArraySchema2).get("schema"), null); - Schema schema = e.getSchema(); + Schema schema = e.icebergSchema(); System.out.println(schema.asStruct()); System.out.println(schema); System.out.println(schema.findField("tableChanges")); @@ -92,7 +92,7 @@ public void testNestedGeomJsonRecord() throws JsonProcessingException { IcebergChangeEvent e = new IcebergChangeEvent("test", mapper.readTree(unwrapWithGeomSchema).get("payload"), null, mapper.readTree(unwrapWithGeomSchema).get("schema"), null); - Schema schema = e.getSchema(); + Schema schema = e.icebergSchema(); GenericRecord record = e.asIcebergRecord(schema); //System.out.println(schema); //System.out.println(record); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java index 5832ba93..36f3e408 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java @@ -70,11 +70,11 @@ public static void listFiles() { try { List bucketList = client.listBuckets(); for (Bucket bucket : bucketList) { - System.out.printf("Bucket:%s ROOT", bucket.name()); + System.out.printf("Bucket:%s ROOT\n", bucket.name()); Iterable> results = client.listObjects(ListObjectsArgs.builder().bucket(bucket.name()).recursive(true).build()); for (Result result : results) { Item item = result.get(); - System.out.printf("Bucket:%s Item:%s Size:%s", bucket.name(), item.objectName(), item.size()); + System.out.printf("Bucket:%s Item:%s Size:%s\n", bucket.name(), item.objectName(), item.size()); } } } catch (Exception e) {