Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support UUID Type #89

Merged
merged 2 commits into from
Jun 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ public class IcebergChangeEvent {
protected final JsonNode key;
JsonSchema jsonSchema;

public IcebergChangeEvent(String destination,
JsonNode value,
JsonNode key,
JsonNode valueSchema,
JsonNode keySchema) {
public IcebergChangeEvent(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) {
this.destination = destination;
this.value = value;
this.key = key;
Expand Down Expand Up @@ -114,6 +110,8 @@ private Type.PrimitiveType icebergFieldType(String fieldType) {
return Types.BooleanType.get();
case "string":
return Types.StringType.get();
case "uuid":
return Types.UUIDType.get();
case "bytes":
return Types.BinaryType.get();
default:
Expand All @@ -123,8 +121,7 @@ private Type.PrimitiveType icebergFieldType(String fieldType) {
}
}

private Object jsonValToIcebergVal(Types.NestedField field,
JsonNode node) {
private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand All @@ -147,12 +144,14 @@ private Object jsonValToIcebergVal(Types.NestedField field,
// if the node is not a value node (method isValueNode returns false), convert it to string.
val = node.isValueNode() ? node.asText(null) : node.toString();
break;
case UUID:
val = node.isValueNode() ? UUID.fromString(node.asText(null)) : UUID.fromString(node.toString());
break;
case BINARY:
try {
val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue());
} catch (IOException e) {
LOGGER.error("Failed to convert binary value to iceberg value, field:" + field.name(), e);
throw new RuntimeException("Failed Processing Event!", e);
throw new RuntimeException("Failed to convert binary value to iceberg value, field: " + field.name(), e);
}
break;
case LIST:
Expand Down Expand Up @@ -268,8 +267,7 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
return icebergSchema(eventSchema, schemaName, columnId, false);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId,
boolean addSourceTsField) {
private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId, boolean addSourceTsField) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
Expand All @@ -285,13 +283,11 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
String listItemType = items.get("type").textValue();

if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex nested array types are not supported," +
" array[" + listItemType + "], field " + fieldName);
throw new RuntimeException("Complex nested array types are not supported," + " array[" + listItemType + "], field " + fieldName);
}

Type.PrimitiveType item = icebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
Expand Down