Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve IcebergChangeEvent and iceberg schema generation #75

Merged
merged 1 commit into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
throws InterruptedException {
Instant start = Instant.now();

//group events by destination
Map<String, List<IcebergChangeEvent>> result =
records.stream()
.map((ChangeEvent<Object, Object> e)
Expand Down Expand Up @@ -185,7 +186,7 @@ public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, I
if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.getSchema(), writeFormat, !upsert);
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public class IcebergChangeEvent {
protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
protected final JsonNode valueSchema;
protected final JsonNode keySchema;
JsonSchema jsonSchema;

public IcebergChangeEvent(String destination,
JsonNode value,
Expand All @@ -44,14 +43,25 @@ public IcebergChangeEvent(String destination,
this.destination = destination;
this.value = value;
this.key = key;
this.valueSchema = valueSchema;
this.keySchema = keySchema;
this.jsonSchema = new JsonSchema(valueSchema, keySchema);
}

public JsonNode key() {
return key;
}

public JsonNode value() {
return value;
}

public JsonSchema jsonSchema() {
return jsonSchema;
}

public Schema icebergSchema() {
return jsonSchema.icebergSchema();
}

public String destinationTable() {
return destination.replace(".", "_");
}
Expand All @@ -69,49 +79,6 @@ public GenericRecord asIcebergRecord(Schema schema) {
return record;
}

public String schemaHashCode() {
return valueSchema.hashCode() + "-" + keySchema.hashCode();
}

public Schema getSchema() {

if (this.valueSchema == null) {
throw new RuntimeException("Failed to get event schema, event value is null, destination:" + this.destination);
}

final List<Types.NestedField> tableColumns = valueSchemaFields();

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get schema destination:" + this.destination);
}

final List<Types.NestedField> keyColumns = KeySchemaFields();
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return new Schema(tableColumns, identifierFieldIds);
}

private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
LOGGER.debug("Processing nested field:{}", tableFields);
GenericRecord record = GenericRecord.create(tableFields);
Expand All @@ -129,25 +96,6 @@ private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode dat
return record;
}

//getIcebergFieldsFromEventSchema
private List<Types.NestedField> KeySchemaFields() {
if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) {
LOGGER.debug(keySchema.toString());
return icebergSchema(keySchema, "", 0);
}
LOGGER.trace("Key schema not found!");
return new ArrayList<>();
}

private List<Types.NestedField> valueSchemaFields() {
if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) {
LOGGER.debug(valueSchema.toString());
return icebergSchema(valueSchema, "", 0, true);
}
LOGGER.trace("Event schema not found!");
return new ArrayList<>();
}

private Type.PrimitiveType icebergFieldType(String fieldType) {
switch (fieldType) {
case "int8":
Expand Down Expand Up @@ -175,59 +123,6 @@ private Type.PrimitiveType icebergFieldType(String fieldType) {
}
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
return icebergSchema(eventSchema, schemaName, columnId, false);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId,
boolean addSourceTsField) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) {
columnId++;
String fieldName = jsonSchemaFieldNode.get("field").textValue();
String fieldType = jsonSchemaFieldNode.get("type").textValue();
LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType);
switch (fieldType) {
case "array":
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
String listItemType = items.get("type").textValue();
if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName);
}
Type.PrimitiveType item = icebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
//throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!");
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
break;
case "map":
throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!");
//schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
//break;
case "struct":
// create it as struct, nested type
List<Types.NestedField> subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType)));
break;
}
}

if (addSourceTsField) {
columnId++;
schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone()));
}
return schemaColumns;
}

private Object jsonValToIcebergVal(Types.NestedField field,
JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
Expand Down Expand Up @@ -281,4 +176,148 @@ private Object jsonValToIcebergVal(Types.NestedField field,
return val;
}

public class JsonSchema {
private final JsonNode valueSchema;
private final JsonNode keySchema;

JsonSchema(JsonNode valueSchema, JsonNode keySchema) {
this.valueSchema = valueSchema;
this.keySchema = keySchema;
}

public JsonNode valueSchema() {
return valueSchema;
}

public JsonNode keySchema() {
return keySchema;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JsonSchema that = (JsonSchema) o;
return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema);
}

@Override
public int hashCode() {
return Objects.hash(valueSchema, keySchema);
}

//getIcebergFieldsFromEventSchema
private List<Types.NestedField> KeySchemaFields() {
if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) {
LOGGER.debug(keySchema.toString());
return icebergSchema(keySchema, "", 0);
}
LOGGER.trace("Key schema not found!");
return new ArrayList<>();
}

private List<Types.NestedField> valueSchemaFields() {
if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) {
LOGGER.debug(valueSchema.toString());
return icebergSchema(valueSchema, "", 0, true);
}
LOGGER.trace("Event schema not found!");
return new ArrayList<>();
}

public Schema icebergSchema() {

if (this.valueSchema == null) {
throw new RuntimeException("Failed to get event schema, event schema is null");
}

final List<Types.NestedField> tableColumns = valueSchemaFields();

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get event schema, event schema has no fields!");
}

final List<Types.NestedField> keyColumns = KeySchemaFields();
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return new Schema(tableColumns, identifierFieldIds);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
return icebergSchema(eventSchema, schemaName, columnId, false);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId,
boolean addSourceTsField) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) {
columnId++;
String fieldName = jsonSchemaFieldNode.get("field").textValue();
String fieldType = jsonSchemaFieldNode.get("type").textValue();
LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType);
switch (fieldType) {
case "array":
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
String listItemType = items.get("type").textValue();

if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex nested array types are not supported," +
" array[" + listItemType + "], field " + fieldName);
}

Type.PrimitiveType item = icebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
break;
case "map":
throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!");
//break;
case "struct":
// create it as struct, nested type
List<Types.NestedField> subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType)));
break;
}
}

if (addSourceTsField) {
columnId++;
schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone()));
}
return schemaColumns;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testNestedJsonRecord() throws JsonProcessingException {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(serdeWithSchema).get("payload"), null,
mapper.readTree(serdeWithSchema).get("schema"), null);
Schema schema = e.getSchema();
Schema schema = e.icebergSchema();
assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " +
"4:"));
}
Expand All @@ -46,7 +46,7 @@ public void testUnwrapJsonRecord() throws IOException {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithSchema).get("payload"), null,
mapper.readTree(unwrapWithSchema).get("schema"), null);
Schema schema = e.getSchema();
Schema schema = e.icebergSchema();
GenericRecord record = e.asIcebergRecord(schema);
assertEquals("orders", record.getField("__table").toString());
assertEquals(16850, record.getField("order_date"));
Expand All @@ -59,7 +59,7 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithArraySchema).get("payload"), null,
mapper.readTree(unwrapWithArraySchema).get("schema"), null);
Schema schema = e.getSchema();
Schema schema = e.icebergSchema();
assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list<int>, 4: schedule: optional list<string>, 6:"));
System.out.println(schema.asStruct());
System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType());
Expand All @@ -77,7 +77,7 @@ public void testNestedArray2JsonRecord() throws JsonProcessingException {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithArraySchema2).get("payload"), null,
mapper.readTree(unwrapWithArraySchema2).get("schema"), null);
Schema schema = e.getSchema();
Schema schema = e.icebergSchema();
System.out.println(schema.asStruct());
System.out.println(schema);
System.out.println(schema.findField("tableChanges"));
Expand All @@ -92,7 +92,7 @@ public void testNestedGeomJsonRecord() throws JsonProcessingException {
IcebergChangeEvent e = new IcebergChangeEvent("test",
mapper.readTree(unwrapWithGeomSchema).get("payload"), null,
mapper.readTree(unwrapWithGeomSchema).get("schema"), null);
Schema schema = e.getSchema();
Schema schema = e.icebergSchema();
GenericRecord record = e.asIcebergRecord(schema);
//System.out.println(schema);
//System.out.println(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public static void listFiles() {
try {
List<Bucket> bucketList = client.listBuckets();
for (Bucket bucket : bucketList) {
System.out.printf("Bucket:%s ROOT", bucket.name());
System.out.printf("Bucket:%s ROOT\n", bucket.name());
Iterable<Result<Item>> results = client.listObjects(ListObjectsArgs.builder().bucket(bucket.name()).recursive(true).build());
for (Result<Item> result : results) {
Item item = result.get();
System.out.printf("Bucket:%s Item:%s Size:%s", bucket.name(), item.objectName(), item.size());
System.out.printf("Bucket:%s Item:%s Size:%s\n", bucket.name(), item.objectName(), item.size());
}
}
} catch (Exception e) {
Expand Down