Skip to content

Commit

Permalink
improve IcebergChangeEvent and iceberg schema generation (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jan 4, 2022
1 parent 6bdfab1 commit cc3d3d6
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
throws InterruptedException {
Instant start = Instant.now();

//group events by destination
Map<String, List<IcebergChangeEvent>> result =
records.stream()
.map((ChangeEvent<Object, Object> e)
Expand Down Expand Up @@ -185,7 +186,7 @@ public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, I
if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.getSchema(), writeFormat, !upsert);
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public class IcebergChangeEvent {
protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
protected final JsonNode valueSchema;
protected final JsonNode keySchema;
JsonSchema jsonSchema;

public IcebergChangeEvent(String destination,
JsonNode value,
Expand All @@ -44,14 +43,25 @@ public IcebergChangeEvent(String destination,
this.destination = destination;
this.value = value;
this.key = key;
this.valueSchema = valueSchema;
this.keySchema = keySchema;
this.jsonSchema = new JsonSchema(valueSchema, keySchema);
}

public JsonNode key() {
return key;
}

public JsonNode value() {
return value;
}

public JsonSchema jsonSchema() {
return jsonSchema;
}

public Schema icebergSchema() {
return jsonSchema.icebergSchema();
}

public String destinationTable() {
return destination.replace(".", "_");
}
Expand All @@ -69,49 +79,6 @@ public GenericRecord asIcebergRecord(Schema schema) {
return record;
}

public String schemaHashCode() {
return valueSchema.hashCode() + "-" + keySchema.hashCode();
}

public Schema getSchema() {

if (this.valueSchema == null) {
throw new RuntimeException("Failed to get event schema, event value is null, destination:" + this.destination);
}

final List<Types.NestedField> tableColumns = valueSchemaFields();

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get schema destination:" + this.destination);
}

final List<Types.NestedField> keyColumns = KeySchemaFields();
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return new Schema(tableColumns, identifierFieldIds);
}

private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
LOGGER.debug("Processing nested field:{}", tableFields);
GenericRecord record = GenericRecord.create(tableFields);
Expand All @@ -129,25 +96,6 @@ private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode dat
return record;
}

//getIcebergFieldsFromEventSchema
private List<Types.NestedField> KeySchemaFields() {
if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) {
LOGGER.debug(keySchema.toString());
return icebergSchema(keySchema, "", 0);
}
LOGGER.trace("Key schema not found!");
return new ArrayList<>();
}

private List<Types.NestedField> valueSchemaFields() {
if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) {
LOGGER.debug(valueSchema.toString());
return icebergSchema(valueSchema, "", 0, true);
}
LOGGER.trace("Event schema not found!");
return new ArrayList<>();
}

private Type.PrimitiveType icebergFieldType(String fieldType) {
switch (fieldType) {
case "int8":
Expand Down Expand Up @@ -175,59 +123,6 @@ private Type.PrimitiveType icebergFieldType(String fieldType) {
}
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
return icebergSchema(eventSchema, schemaName, columnId, false);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId,
boolean addSourceTsField) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) {
columnId++;
String fieldName = jsonSchemaFieldNode.get("field").textValue();
String fieldType = jsonSchemaFieldNode.get("type").textValue();
LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType);
switch (fieldType) {
case "array":
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
String listItemType = items.get("type").textValue();
if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName);
}
Type.PrimitiveType item = icebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
//throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!");
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
break;
case "map":
throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!");
//schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
//break;
case "struct":
// create it as struct, nested type
List<Types.NestedField> subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType)));
break;
}
}

if (addSourceTsField) {
columnId++;
schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone()));
}
return schemaColumns;
}

private Object jsonValToIcebergVal(Types.NestedField field,
JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
Expand Down Expand Up @@ -281,4 +176,148 @@ private Object jsonValToIcebergVal(Types.NestedField field,
return val;
}

public class JsonSchema {
private final JsonNode valueSchema;
private final JsonNode keySchema;

JsonSchema(JsonNode valueSchema, JsonNode keySchema) {
this.valueSchema = valueSchema;
this.keySchema = keySchema;
}

public JsonNode valueSchema() {
return valueSchema;
}

public JsonNode keySchema() {
return keySchema;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JsonSchema that = (JsonSchema) o;
return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema);
}

@Override
public int hashCode() {
return Objects.hash(valueSchema, keySchema);
}

//getIcebergFieldsFromEventSchema
private List<Types.NestedField> KeySchemaFields() {
if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) {
LOGGER.debug(keySchema.toString());
return icebergSchema(keySchema, "", 0);
}
LOGGER.trace("Key schema not found!");
return new ArrayList<>();
}

private List<Types.NestedField> valueSchemaFields() {
if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) {
LOGGER.debug(valueSchema.toString());
return icebergSchema(valueSchema, "", 0, true);
}
LOGGER.trace("Event schema not found!");
return new ArrayList<>();
}

public Schema icebergSchema() {

if (this.valueSchema == null) {
throw new RuntimeException("Failed to get event schema, event schema is null");
}

final List<Types.NestedField> tableColumns = valueSchemaFields();

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get event schema, event schema has no fields!");
}

final List<Types.NestedField> keyColumns = KeySchemaFields();
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return new Schema(tableColumns, identifierFieldIds);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
return icebergSchema(eventSchema, schemaName, columnId, false);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId,
boolean addSourceTsField) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) {
columnId++;
String fieldName = jsonSchemaFieldNode.get("field").textValue();
String fieldType = jsonSchemaFieldNode.get("type").textValue();
LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType);
switch (fieldType) {
case "array":
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
String listItemType = items.get("type").textValue();

if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex nested array types are not supported," +
" array[" + listItemType + "], field " + fieldName);
}

Type.PrimitiveType item = icebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
break;
case "map":
throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!");
//break;
case "struct":
// create it as struct, nested type
List<Types.NestedField> subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType)));
break;
}
}

if (addSourceTsField) {
columnId++;
schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone()));
}
return schemaColumns;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testNestedJsonRecord() throws JsonProcessingException {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(serdeWithSchema).get("payload"), null,
mapper.readTree(serdeWithSchema).get("schema"), null);
Schema schema = e.getSchema();
Schema schema = e.icebergSchema();
assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " +
"4:"));
}
Expand All @@ -46,7 +46,7 @@ public void testUnwrapJsonRecord() throws IOException {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithSchema).get("payload"), null,
mapper.readTree(unwrapWithSchema).get("schema"), null);
Schema schema = e.getSchema();
Schema schema = e.icebergSchema();
GenericRecord record = e.asIcebergRecord(schema);
assertEquals("orders", record.getField("__table").toString());
assertEquals(16850, record.getField("order_date"));
Expand All @@ -59,7 +59,7 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithArraySchema).get("payload"), null,
mapper.readTree(unwrapWithArraySchema).get("schema"), null);
Schema schema = e.getSchema();
Schema schema = e.icebergSchema();
assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list<int>, 4: schedule: optional list<string>, 6:"));
System.out.println(schema.asStruct());
System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType());
Expand All @@ -77,7 +77,7 @@ public void testNestedArray2JsonRecord() throws JsonProcessingException {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithArraySchema2).get("payload"), null,
mapper.readTree(unwrapWithArraySchema2).get("schema"), null);
Schema schema = e.getSchema();
Schema schema = e.icebergSchema();
System.out.println(schema.asStruct());
System.out.println(schema);
System.out.println(schema.findField("tableChanges"));
Expand All @@ -92,7 +92,7 @@ public void testNestedGeomJsonRecord() throws JsonProcessingException {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithGeomSchema).get("payload"), null,
mapper.readTree(unwrapWithGeomSchema).get("schema"), null);
Schema schema = e.getSchema();
Schema schema = e.icebergSchema();
GenericRecord record = e.asIcebergRecord(schema);
//System.out.println(schema);
//System.out.println(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public static void listFiles() {
try {
List<Bucket> bucketList = client.listBuckets();
for (Bucket bucket : bucketList) {
System.out.printf("Bucket:%s ROOT", bucket.name());
System.out.printf("Bucket:%s ROOT\n", bucket.name());
Iterable<Result<Item>> results = client.listObjects(ListObjectsArgs.builder().bucket(bucket.name()).recursive(true).build());
for (Result<Item> result : results) {
Item item = result.get();
System.out.printf("Bucket:%s Item:%s Size:%s", bucket.name(), item.objectName(), item.size());
System.out.printf("Bucket:%s Item:%s Size:%s\n", bucket.name(), item.objectName(), item.size());
}
}
} catch (Exception e) {
Expand Down

0 comments on commit cc3d3d6

Please sign in to comment.