Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve naming, rename JsonSchema to ChangeEventSchema #345

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ public JsonNode value() {
return value;
}

public JsonSchema jsonSchema() {
public ChangeEventSchema changeEventSchema() {
try {
return new JsonSchema(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema"));
return new ChangeEventSchema(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema"));
} catch (IOException e) {
throw new DebeziumException("Failed to get event schema", e);
}
}

public Schema icebergSchema() {
return jsonSchema().icebergSchema();
return changeEventSchema().icebergSchema();
}

public String destination() {
Expand Down Expand Up @@ -105,40 +105,6 @@ private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonN
return record;
}

private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
return Types.IntegerType.get();
case "int64": // long 8 bytes
if (TS_MS_FIELDS.contains(fieldName)) {
return Types.TimestampType.withZone();
} else {
return Types.LongType.get();
}
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
return Types.FloatType.get();
case "double":
case "float64": // double is represented in 64 bits
return Types.DoubleType.get();
case "boolean":
return Types.BooleanType.get();
case "string":
return Types.StringType.get();
case "uuid":
return Types.UUIDType.get();
case "bytes":
return Types.BinaryType.get();
default:
// default to String type
return Types.StringType.get();
//throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!");
}
}

private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
Expand Down Expand Up @@ -228,80 +194,12 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
return val;
}

/***
* converts given debezium filed to iceberg field equivalent. does recursion in case of complex/nested types.
*
* @param fieldSchema JsonNode representation of debezium field schema.
* @param fieldName name of the debezium field
* @param fieldId id sequence to assign iceberg field, after the conversion.
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static Map.Entry<Integer, Types.NestedField> debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) {
String fieldType = fieldSchema.get("type").textValue();
switch (fieldType) {
case "struct":
// struct type
int rootStructId = fieldId;
List<Types.NestedField> subFields = new ArrayList<>();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
fieldId += 1;
String subFieldName = subFieldSchema.get("field").textValue();
Map.Entry<Integer, Types.NestedField> subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId);
subFields.add(subField.getValue());
fieldId = subField.getKey();
}
// create it as struct, nested type
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields)));
case "map":
int rootMapId = fieldId;
int keyFieldId = fieldId + 1;
int valFieldId = fieldId + 2;
fieldId = fieldId + 3;
Map.Entry<Integer, Types.NestedField> keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId);
fieldId = keyField.getKey() + 1;
Map.Entry<Integer, Types.NestedField> valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId);
fieldId = valField.getKey();
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField));

case "array":
int rootArrayId = fieldId;
fieldId += 1;
Map.Entry<Integer, Types.NestedField> listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId);
fieldId = listItemsField.getKey() + 1;
Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField));
default:
// its primitive field
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType)));
}
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
});
}
return schemaColumns;
}


public static class JsonSchema {
public static class ChangeEventSchema {
private final JsonNode valueSchema;
private final JsonNode keySchema;

JsonSchema(JsonNode valueSchema, JsonNode keySchema) {
ChangeEventSchema(JsonNode valueSchema, JsonNode keySchema) {
this.valueSchema = valueSchema;
this.keySchema = keySchema;
}
Expand All @@ -314,12 +212,53 @@ protected JsonNode keySchema() {
return keySchema;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JsonSchema that = (JsonSchema) o;
return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema);
/***
* converts given debezium filed to iceberg field equivalent. does recursion in case of complex/nested types.
*
* @param fieldSchema JsonNode representation of debezium field schema.
* @param fieldName name of the debezium field
* @param fieldId id sequence to assign iceberg field, after the conversion.
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static Map.Entry<Integer, Types.NestedField> debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, int fieldId) {
String fieldType = fieldSchema.get("type").textValue();
switch (fieldType) {
case "struct":
// struct type
int rootStructId = fieldId;
List<Types.NestedField> subFields = new ArrayList<>();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
fieldId += 1;
String subFieldName = subFieldSchema.get("field").textValue();
Map.Entry<Integer, Types.NestedField> subField = debeziumFieldToIcebergField(subFieldSchema, subFieldName, fieldId);
subFields.add(subField.getValue());
fieldId = subField.getKey();
}
// create it as struct, nested type
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootStructId, fieldName, Types.StructType.of(subFields)));
case "map":
int rootMapId = fieldId;
int keyFieldId = fieldId + 1;
int valFieldId = fieldId + 2;
fieldId = fieldId + 3;
Map.Entry<Integer, Types.NestedField> keyField = debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", fieldId);
fieldId = keyField.getKey() + 1;
Map.Entry<Integer, Types.NestedField> valField = debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", fieldId);
fieldId = valField.getKey();
Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyField.getValue().type(), valField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootMapId, fieldName, mapField));

case "array":
int rootArrayId = fieldId;
fieldId += 1;
Map.Entry<Integer, Types.NestedField> listItemsField = debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", fieldId);
fieldId = listItemsField.getKey() + 1;
Types.ListType listField = Types.ListType.ofOptional(fieldId, listItemsField.getValue().type());
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(rootArrayId, fieldName, listField));
default:
// its primitive field
return new AbstractMap.SimpleEntry<>(fieldId, Types.NestedField.optional(fieldId, fieldName, icebergPrimitiveField(fieldName, fieldType)));
}
}

@Override
Expand Down Expand Up @@ -365,6 +304,70 @@ private Schema icebergSchema() {

return new Schema(tableColumns, identifierFieldIds);
}

/***
* Converts debezium event fields to iceberg equivalent and returns list of iceberg fields.
* @param schemaNode
* @return
*/
private static List<Types.NestedField> icebergSchemaFields(JsonNode schemaNode) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
AtomicReference<Integer> fieldId = new AtomicReference<>(1);
if (schemaNode != null && !schemaNode.isNull() && schemaNode.has("fields") && schemaNode.get("fields").isArray()) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
schemaNode.get("fields").forEach(field -> {
Map.Entry<Integer, Types.NestedField> df = debeziumFieldToIcebergField(field, field.get("field").textValue(), fieldId.get());
fieldId.set(df.getKey() + 1);
schemaColumns.add(df.getValue());
});
}
return schemaColumns;
}

private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
return Types.IntegerType.get();
case "int64": // long 8 bytes
if (TS_MS_FIELDS.contains(fieldName)) {
return Types.TimestampType.withZone();
} else {
return Types.LongType.get();
}
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
return Types.FloatType.get();
case "double":
case "float64": // double is represented in 64 bits
return Types.DoubleType.get();
case "boolean":
return Types.BooleanType.get();
case "string":
return Types.StringType.get();
case "uuid":
return Types.UUIDType.get();
case "bytes":
return Types.BinaryType.get();
default:
// default to String type
return Types.StringType.get();
//throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!");
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChangeEventSchema that = (ChangeEventSchema) o;
return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema);
}


}


}
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {
// if field additions not enabled add set of events to table
addToTablePerSchema(icebergTable, events);
} else {
Map<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> eventsGroupedBySchema =
Map<IcebergChangeEvent.ChangeEventSchema, List<IcebergChangeEvent>> eventsGroupedBySchema =
events.stream()
.collect(Collectors.groupingBy(IcebergChangeEvent::jsonSchema));
.collect(Collectors.groupingBy(IcebergChangeEvent::changeEventSchema));
LOGGER.debug("Batch got {} records with {} different schema!!", events.size(), eventsGroupedBySchema.keySet().size());

for (Map.Entry<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
for (Map.Entry<IcebergChangeEvent.ChangeEventSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema());
// add set of events to table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public static TestChangeEvent<Object, Object> of(String destination, Integer id,
.build();

final String key = "{" +
"\"schema\":" + t.jsonSchema().keySchema() + "," +
"\"schema\":" + t.changeEventSchema().keySchema() + "," +
"\"payload\":" + t.key() +
"} ";
final String val = "{" +
"\"schema\":" + t.jsonSchema().valueSchema() + "," +
"\"schema\":" + t.changeEventSchema().valueSchema() + "," +
"\"payload\":" + t.value() +
"} ";
return new TestChangeEvent<>(key, val, destination);
Expand All @@ -71,11 +71,11 @@ public static TestChangeEvent<Object, Object> ofCompositeKey(String destination,
.build();

final String key = "{" +
"\"schema\":" + t.jsonSchema().keySchema() + "," +
"\"schema\":" + t.changeEventSchema().keySchema() + "," +
"\"payload\":" + t.key() +
"} ";
final String val = "{" +
"\"schema\":" + t.jsonSchema().valueSchema() + "," +
"\"schema\":" + t.changeEventSchema().valueSchema() + "," +
"\"payload\":" + t.value() +
"} ";

Expand Down Expand Up @@ -106,7 +106,7 @@ public static TestChangeEvent<Object, Object> ofNoKey(String destination, Intege
.build();

final String val = "{" +
"\"schema\":" + t.jsonSchema().valueSchema() + "," +
"\"schema\":" + t.changeEventSchema().valueSchema() + "," +
"\"payload\":" + t.value() +
"} ";
return new TestChangeEvent<>(null, val, destination);
Expand Down