Skip to content

Commit

Permalink
Support UUID Type (#89)
Browse files Browse the repository at this point in the history
* Add UUID support
  • Loading branch information
ismailsimsek committed Jun 14, 2022
1 parent 63e1793 commit f1c9b3c
Showing 1 changed file with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ public class IcebergChangeEvent {
protected final JsonNode key;
final JsonSchema jsonSchema;

public IcebergChangeEvent(String destination,
JsonNode value,
JsonNode key,
JsonNode valueSchema,
JsonNode keySchema) {
public IcebergChangeEvent(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) {
this.destination = destination;
this.value = value;
this.key = key;
Expand Down Expand Up @@ -114,6 +110,8 @@ private Type.PrimitiveType icebergFieldType(String fieldType) {
return Types.BooleanType.get();
case "string":
return Types.StringType.get();
case "uuid":
return Types.UUIDType.get();
case "bytes":
return Types.BinaryType.get();
default:
Expand All @@ -123,8 +121,7 @@ private Type.PrimitiveType icebergFieldType(String fieldType) {
}
}

private Object jsonValToIcebergVal(Types.NestedField field,
JsonNode node) {
private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand All @@ -147,12 +144,14 @@ private Object jsonValToIcebergVal(Types.NestedField field,
// if the node is not a value node (method isValueNode returns false), convert it to string.
val = node.isValueNode() ? node.asText(null) : node.toString();
break;
case UUID:
val = node.isValueNode() ? UUID.fromString(node.asText(null)) : UUID.fromString(node.toString());
break;
case BINARY:
try {
val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue());
} catch (IOException e) {
LOGGER.error("Failed to convert binary value to iceberg value, field:" + field.name(), e);
throw new RuntimeException("Failed Processing Event!", e);
throw new RuntimeException("Failed to convert binary value to iceberg value, field: " + field.name(), e);
}
break;
case LIST:
Expand Down Expand Up @@ -268,8 +267,7 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
return icebergSchema(eventSchema, schemaName, columnId, false);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId,
boolean addSourceTsField) {
private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId, boolean addSourceTsField) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
Expand All @@ -285,13 +283,11 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
String listItemType = items.get("type").textValue();

if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex nested array types are not supported," +
" array[" + listItemType + "], field " + fieldName);
throw new RuntimeException("Complex nested array types are not supported," + " array[" + listItemType + "], field " + fieldName);
}

Type.PrimitiveType item = icebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
Expand Down

0 comments on commit f1c9b3c

Please sign in to comment.