Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve array type handling #64

Merged
merged 1 commit into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str
case "array":
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
PrimitiveType item = IcebergUtil.getIcebergFieldType(items.get("type").textValue());
String listItemType = items.get("type").textValue();
if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName);
}
PrimitiveType item = IcebergUtil.getIcebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
//throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!");
Expand Down Expand Up @@ -104,9 +108,9 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str

public static boolean hasSchema(JsonNode jsonNode) {
return jsonNode != null
&& jsonNode.has("schema")
&& jsonNode.get("schema").has("fields")
&& jsonNode.get("schema").get("fields").isArray();
&& jsonNode.has("schema")
&& jsonNode.get("schema").has("fields")
&& jsonNode.get("schema").get("fields").isArray();
}

public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) {
Expand Down Expand Up @@ -158,7 +162,7 @@ static Object jsonToGenericRecordVal(Types.NestedField field,
try {
val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue());
} catch (IOException e) {
LOGGER.error("Failed converting '" + field.name() + "' binary value to iceberg record", e);
LOGGER.error("Failed to convert binary value to iceberg value, field:" + field.name(), e);
throw new RuntimeException("Failed Processing Event!", e);
}
break;
Expand Down Expand Up @@ -197,11 +201,11 @@ public static Map<String, String> getConfigSubset(Config config, String prefix)
}

public static List<Types.NestedField> getIcebergFieldsFromEventSchema(byte[] eventVal) {
if(eventVal == null){

if (eventVal == null) {
return new ArrayList<>();
}

try {
JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal);
if (IcebergUtil.hasSchema(jsonEvent)) {
Expand All @@ -219,7 +223,7 @@ public static Schema getSchema(List<Types.NestedField> tableColumns,
List<Types.NestedField> keyColumns) {

Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

Expand All @@ -240,7 +244,7 @@ public static Schema getSchema(List<Types.NestedField> tableColumns,
}

}

return new Schema(tableColumns, identifierFieldIds);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class TestIcebergUtil {
final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json");
final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json");
final String unwrapWithArraySchema = Testing.Files.readResourceAsString("json/serde-with-array.json");
final String unwrapWithArraySchema2 = Testing.Files.readResourceAsString("json/serde-with-array2.json");

@Test
public void testNestedJsonRecord() throws JsonProcessingException {
Expand All @@ -56,12 +57,34 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException {
List<Types.NestedField> schemaFields = IcebergUtil.getIcebergSchema(jsonSchema);
Schema schema = new Schema(schemaFields);
assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list<int>, 4: schedule: optional list<string>, 6:"));
//System.out.println(schema.asStruct());
System.out.println(schema.asStruct());
System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType());
System.out.println(schema.findField("schedule").type().asListType().elementType());
assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(),"int");
assertEquals(schema.findField("schedule").type().asListType().elementType().toString(),"string");
GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
//System.out.println(record);
assertTrue( record.toString().contains("[10000, 10001, 10002, 10003]"));
}

@Test
public void testNestedArray2JsonRecord() throws JsonProcessingException {
JsonNode jsonData = new ObjectMapper().readTree(unwrapWithArraySchema2);
JsonNode jsonPayload = jsonData.get("payload");
JsonNode jsonSchema = jsonData.get("schema");

assertThrows(RuntimeException.class, () -> {
List<Types.NestedField> schemaFields = IcebergUtil.getIcebergSchema(jsonSchema);
Schema schema = new Schema(schemaFields);
System.out.println(schema.asStruct());
System.out.println(schema);
System.out.println(schema.findField("tableChanges"));
System.out.println(schema.findField("tableChanges").type().asListType().elementType());
});
//GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
//System.out.println(record);
}

@Test
public void testNestedGeomJsonRecord() throws JsonProcessingException {
JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "databaseName"
},
{
"type": "string",
"optional": true,
"field": "schemaName"
},
{
"type": "string",
"optional": true,
"field": "ddl"
},
{
"type": "array",
"items": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "type"
},
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "defaultCharsetName"
},
{
"type": "array",
"items": {
"type": "string",
"optional": false
},
"optional": true,
"field": "primaryKeyColumnNames"
},
{
"type": "array",
"items": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int32",
"optional": false,
"field": "jdbcType"
},
{
"type": "int32",
"optional": true,
"field": "nativeType"
},
{
"type": "string",
"optional": false,
"field": "typeName"
},
{
"type": "string",
"optional": true,
"field": "typeExpression"
},
{
"type": "string",
"optional": true,
"field": "charsetName"
},
{
"type": "int32",
"optional": true,
"field": "length"
},
{
"type": "int32",
"optional": true,
"field": "scale"
},
{
"type": "int32",
"optional": false,
"field": "position"
},
{
"type": "boolean",
"optional": true,
"field": "optional"
},
{
"type": "boolean",
"optional": true,
"field": "autoIncremented"
},
{
"type": "boolean",
"optional": true,
"field": "generated"
}
],
"optional": false,
"name": "io.debezium.connector.schema.Column"
},
"optional": false,
"field": "columns"
}
],
"optional": false,
"name": "io.debezium.connector.schema.Table",
"field": "table"
}
],
"optional": false,
"name": "io.debezium.connector.schema.Change"
},
"optional": false,
"field": "tableChanges"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.SchemaChangeValue"
},
"payload": {
"source": {
"version": "1.7.0.Final",
"connector": "mysql",
"name": "testc",
"ts_ms": 1638187055631,
"snapshot": "true",
"db": "inventory",
"sequence": null,
"table": "geom",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": null,
"query": null
},
"databaseName": "inventory",
"schemaName": null,
"ddl": "CREATE TABLE `geom` (\n `id` int(11) NOT NULL AUTO_INCREMENT,\n `g` geometry NOT NULL,\n `h` geometry DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=latin1",
"tableChanges": []
}
}