Skip to content

Commit

Permalink
Improve naming, rename JsonSchema to ChangeEventSchema (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 12, 2024
1 parent a47eb1d commit 7727566
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ public JsonNode value() {
return value;
}

public JsonSchema jsonSchema() {
public ChangeEventSchema changeEventSchema() {
try {
return new JsonSchema(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema"));
return new ChangeEventSchema(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema"));
} catch (IOException e) {
throw new DebeziumException("Failed to get event schema", e);
}
}

public Schema icebergSchema() {
return jsonSchema().icebergSchema();
return changeEventSchema().icebergSchema();
}

public String destination() {
Expand Down Expand Up @@ -105,40 +105,6 @@ private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonN
return record;
}

private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
return Types.IntegerType.get();
case "int64": // long 8 bytes
if (TS_MS_FIELDS.contains(fieldName)) {
return Types.TimestampType.withZone();
} else {
return Types.LongType.get();
}
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
return Types.FloatType.get();
case "double":
case "float64": // double is represented in 64 bits
return Types.DoubleType.get();
case "boolean":
return Types.BooleanType.get();
case "string":
return Types.StringType.get();
case "uuid":
return Types.UUIDType.get();
case "bytes":
return Types.BinaryType.get();
default:
// default to String type
return Types.StringType.get();
//throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!");
}
}

private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
Expand Down Expand Up @@ -228,80 +194,12 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
return val;
}

/***
* converts given debezium filed to iceberg field equivalent. does recursion in case of complex/nested types.
*
* @param fieldSchema JsonNode representation of debezium field schema.
* @param fieldName name of the debezium field
* @param fieldId id sequence to assign iceberg field, after the conversion.
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static Map.Entry<Integer, Types.NestedField> debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) {
String fieldType = fieldSchema.get("type").textValue();
switch (fieldType) {
case "struct":
// struct type
int rootStructId = fieldId;
List<Types.NestedField> subFields = new ArrayList<>();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
fieldId += 1;
String subFieldName = subFieldSchema.get("field").textValue();
Map.Entry<Integer, Types.NestedField> subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId);
subFields.add(subField.getValue());
fieldId = subField.getKey();
}
// create it as struct, nested type
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields)));
case "map":
int rootMapId = fieldId;
int keyFieldId = fieldId + 1;
int valFieldId = fieldId + 2;
fieldId = fieldId + 3;
Map.Entry<Integer, Types.NestedField> keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId);
fieldId = keyField.getKey() + 1;
Map.Entry<Integer, Types.NestedField> valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId);
fieldId = valField.getKey();
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField));

case "array":
int rootArrayId = fieldId;
fieldId += 1;
Map.Entry<Integer, Types.NestedField> listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId);
fieldId = listItemsField.getKey() + 1;
Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField));
default:
// its primitive field
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType)));
}
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
});
}
return schemaColumns;
}


public static class JsonSchema {
public static class ChangeEventSchema {
private final JsonNode valueSchema;
private final JsonNode keySchema;

JsonSchema(JsonNode valueSchema, JsonNode keySchema) {
ChangeEventSchema(JsonNode valueSchema, JsonNode keySchema) {
this.valueSchema = valueSchema;
this.keySchema = keySchema;
}
Expand All @@ -314,12 +212,53 @@ protected JsonNode keySchema() {
return keySchema;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JsonSchema that = (JsonSchema) o;
return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema);
/***
* converts given debezium filed to iceberg field equivalent. does recursion in case of complex/nested types.
*
* @param fieldSchema JsonNode representation of debezium field schema.
* @param fieldName name of the debezium field
* @param fieldId id sequence to assign iceberg field, after the conversion.
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static Map.Entry<Integer, Types.NestedField> debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) {
String fieldType = fieldSchema.get("type").textValue();
switch (fieldType) {
case "struct":
// struct type
int rootStructId = fieldId;
List<Types.NestedField> subFields = new ArrayList<>();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
fieldId += 1;
String subFieldName = subFieldSchema.get("field").textValue();
Map.Entry<Integer, Types.NestedField> subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId);
subFields.add(subField.getValue());
fieldId = subField.getKey();
}
// create it as struct, nested type
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields)));
case "map":
int rootMapId = fieldId;
int keyFieldId = fieldId + 1;
int valFieldId = fieldId + 2;
fieldId = fieldId + 3;
Map.Entry<Integer, Types.NestedField> keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId);
fieldId = keyField.getKey() + 1;
Map.Entry<Integer, Types.NestedField> valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId);
fieldId = valField.getKey();
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField));

case "array":
int rootArrayId = fieldId;
fieldId += 1;
Map.Entry<Integer, Types.NestedField> listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId);
fieldId = listItemsField.getKey() + 1;
Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField));
default:
// its primitive field
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType)));
}
}

@Override
Expand Down Expand Up @@ -365,6 +304,70 @@ private Schema icebergSchema() {

return new Schema(tableColumns, identifierFieldIds);
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
});
}
return schemaColumns;
}

private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
return Types.IntegerType.get();
case "int64": // long 8 bytes
if (TS_MS_FIELDS.contains(fieldName)) {
return Types.TimestampType.withZone();
} else {
return Types.LongType.get();
}
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
return Types.FloatType.get();
case "double":
case "float64": // double is represented in 64 bits
return Types.DoubleType.get();
case "boolean":
return Types.BooleanType.get();
case "string":
return Types.StringType.get();
case "uuid":
return Types.UUIDType.get();
case "bytes":
return Types.BinaryType.get();
default:
// default to String type
return Types.StringType.get();
//throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!");
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChangeEventSchema that = (ChangeEventSchema) o;
return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema);
}


}


}
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {
// if field additions not enabled add set of events to table
addToTablePerSchema(icebergTable, events);
} else {
Map<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> eventsGroupedBySchema =
Map<IcebergChangeEvent.ChangeEventSchema, List<IcebergChangeEvent>> eventsGroupedBySchema =
events.stream()
.collect(Collectors.groupingBy(IcebergChangeEvent::jsonSchema));
.collect(Collectors.groupingBy(IcebergChangeEvent::changeEventSchema));
LOGGER.debug("Batch got {} records with {} different schema!!", events.size(), eventsGroupedBySchema.keySet().size());

for (Map.Entry<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
for (Map.Entry<IcebergChangeEvent.ChangeEventSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema());
// add set of events to table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public static TestChangeEvent<Object, Object> of(String destination, Integer id,
.build();

final String key = "{" +
"\"schema\":" + t.jsonSchema().keySchema() + "," +
"\"schema\":" + t.changeEventSchema().keySchema() + "," +
"\"payload\":" + t.key() +
"} ";
final String val = "{" +
"\"schema\":" + t.jsonSchema().valueSchema() + "," +
"\"schema\":" + t.changeEventSchema().valueSchema() + "," +
"\"payload\":" + t.value() +
"} ";
return new TestChangeEvent<>(key, val, destination);
Expand All @@ -71,11 +71,11 @@ public static TestChangeEvent<Object, Object> ofCompositeKey(String destination,
.build();

final String key = "{" +
"\"schema\":" + t.jsonSchema().keySchema() + "," +
"\"schema\":" + t.changeEventSchema().keySchema() + "," +
"\"payload\":" + t.key() +
"} ";
final String val = "{" +
"\"schema\":" + t.jsonSchema().valueSchema() + "," +
"\"schema\":" + t.changeEventSchema().valueSchema() + "," +
"\"payload\":" + t.value() +
"} ";

Expand Down Expand Up @@ -106,7 +106,7 @@ public static TestChangeEvent<Object, Object> ofNoKey(String destination, Intege
.build();

final String val = "{" +
"\"schema\":" + t.jsonSchema().valueSchema() + "," +
"\"schema\":" + t.changeEventSchema().valueSchema() + "," +
"\"payload\":" + t.value() +
"} ";
return new TestChangeEvent<>(null, val, destination);
Expand Down

0 comments on commit 7727566

Please sign in to comment.