From fe264f84b8b5a513af3da882ea8f9e0b4ce0fd08 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 11 Jun 2022 09:16:40 +0200 Subject: [PATCH 1/2] Add UUID support --- .../java/io/debezium/server/iceberg/IcebergChangeEvent.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 565543e8..fa33c452 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -114,6 +114,8 @@ private Type.PrimitiveType icebergFieldType(String fieldType) { return Types.BooleanType.get(); case "string": return Types.StringType.get(); + case "uuid": + return Types.UUIDType.get(); case "bytes": return Types.BinaryType.get(); default: @@ -147,6 +149,9 @@ private Object jsonValToIcebergVal(Types.NestedField field, // if the node is not a value node (method isValueNode returns false), convert it to string. val = node.isValueNode() ? node.asText(null) : node.toString(); break; + case UUID: + val = node.isValueNode() ? UUID.fromString(node.asText(null)) : UUID.fromString(node.toString()); + break; case BINARY: try { val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()); From 125c49971e0bd04a28e4fd7f38ca7b0c512770e4 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 11 Jun 2022 09:17:58 +0200 Subject: [PATCH 2/2] Add UUID support --- .../server/iceberg/IcebergChangeEvent.java | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index fa33c452..30955fb9 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -35,11 +35,7 @@ public class IcebergChangeEvent { protected final JsonNode key; JsonSchema jsonSchema; - public IcebergChangeEvent(String destination, - JsonNode value, - JsonNode key, - JsonNode valueSchema, - JsonNode keySchema) { + public IcebergChangeEvent(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) { this.destination = destination; this.value = value; this.key = key; @@ -125,8 +121,7 @@ private Type.PrimitiveType icebergFieldType(String fieldType) { } } - private Object jsonValToIcebergVal(Types.NestedField field, - JsonNode node) { + private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) { LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); final Object val; switch (field.type().typeId()) { @@ -156,8 +151,7 @@ private Object jsonValToIcebergVal(Types.NestedField field, try { val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()); } catch (IOException e) { - LOGGER.error("Failed to convert binary value to iceberg value, field:" + field.name(), e); - throw new RuntimeException("Failed Processing Event!", e); + throw new RuntimeException("Failed to convert binary value to iceberg value, field: " + field.name(), e); } break; case LIST: @@ -273,8 +267,7 @@ private List icebergSchema(JsonNode eventSchema, String schem return icebergSchema(eventSchema, schemaName, columnId, false); } - private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId, - boolean addSourceTsField) { + private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId, boolean addSourceTsField) { List schemaColumns = new ArrayList<>(); String schemaType = eventSchema.get("type").textValue(); LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); @@ -290,13 +283,11 @@ private List icebergSchema(JsonNode eventSchema, String schem String listItemType = items.get("type").textValue(); if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) { - throw new RuntimeException("Complex nested array types are not supported," + - " array[" + listItemType + "], field " + fieldName); + throw new RuntimeException("Complex nested array types are not supported," + " array[" + listItemType + "], field " + fieldName); } Type.PrimitiveType item = icebergFieldType(listItemType); - schemaColumns.add(Types.NestedField.optional( - columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); } else { throw new RuntimeException("Unexpected Array type for field " + fieldName); }