Skip to content

Commit

Permalink
add check for array type (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Nov 29, 2021
1 parent 581fa2e commit d5651cd
Show file tree
Hide file tree
Showing 3 changed files with 290 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str
case "array":
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
PrimitiveType item = IcebergUtil.getIcebergFieldType(items.get("type").textValue());
String listItemType = items.get("type").textValue();
if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName);
}
PrimitiveType item = IcebergUtil.getIcebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
//throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!");
Expand Down Expand Up @@ -104,9 +108,9 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str

public static boolean hasSchema(JsonNode jsonNode) {
return jsonNode != null
&& jsonNode.has("schema")
&& jsonNode.get("schema").has("fields")
&& jsonNode.get("schema").get("fields").isArray();
&& jsonNode.has("schema")
&& jsonNode.get("schema").has("fields")
&& jsonNode.get("schema").get("fields").isArray();
}

public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) {
Expand Down Expand Up @@ -158,7 +162,7 @@ static Object jsonToGenericRecordVal(Types.NestedField field,
try {
val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue());
} catch (IOException e) {
LOGGER.error("Failed converting '" + field.name() + "' binary value to iceberg record", e);
LOGGER.error("Failed to convert binary value to iceberg value, field:" + field.name(), e);
throw new RuntimeException("Failed Processing Event!", e);
}
break;
Expand Down Expand Up @@ -197,11 +201,11 @@ public static Map<String, String> getConfigSubset(Config config, String prefix)
}

public static List<Types.NestedField> getIcebergFieldsFromEventSchema(byte[] eventVal) {
if(eventVal == null){

if (eventVal == null) {
return new ArrayList<>();
}

try {
JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal);
if (IcebergUtil.hasSchema(jsonEvent)) {
Expand All @@ -219,7 +223,7 @@ public static Schema getSchema(List<Types.NestedField> tableColumns,
List<Types.NestedField> keyColumns) {

Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

Expand All @@ -240,7 +244,7 @@ public static Schema getSchema(List<Types.NestedField> tableColumns,
}

}

return new Schema(tableColumns, identifierFieldIds);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class TestIcebergUtil {
final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json");
final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json");
final String unwrapWithArraySchema = Testing.Files.readResourceAsString("json/serde-with-array.json");
final String unwrapWithArraySchema2 = Testing.Files.readResourceAsString("json/serde-with-array2.json");

@Test
public void testNestedJsonRecord() throws JsonProcessingException {
Expand All @@ -56,12 +57,34 @@ public void testNestedArrayJsonRecord() throws JsonProcessingException {
List<Types.NestedField> schemaFields = IcebergUtil.getIcebergSchema(jsonSchema);
Schema schema = new Schema(schemaFields);
assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list<int>, 4: schedule: optional list<string>, 6:"));
//System.out.println(schema.asStruct());
System.out.println(schema.asStruct());
System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType());
System.out.println(schema.findField("schedule").type().asListType().elementType());
assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(),"int");
assertEquals(schema.findField("schedule").type().asListType().elementType().toString(),"string");
GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
//System.out.println(record);
assertTrue( record.toString().contains("[10000, 10001, 10002, 10003]"));
}

@Test
public void testNestedArray2JsonRecord() throws JsonProcessingException {
JsonNode jsonData = new ObjectMapper().readTree(unwrapWithArraySchema2);
JsonNode jsonPayload = jsonData.get("payload");
JsonNode jsonSchema = jsonData.get("schema");

assertThrows(RuntimeException.class, () -> {
List<Types.NestedField> schemaFields = IcebergUtil.getIcebergSchema(jsonSchema);
Schema schema = new Schema(schemaFields);
System.out.println(schema.asStruct());
System.out.println(schema);
System.out.println(schema.findField("tableChanges"));
System.out.println(schema.findField("tableChanges").type().asListType().elementType());
});
//GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
//System.out.println(record);
}

@Test
public void testNestedGeomJsonRecord() throws JsonProcessingException {
JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "databaseName"
},
{
"type": "string",
"optional": true,
"field": "schemaName"
},
{
"type": "string",
"optional": true,
"field": "ddl"
},
{
"type": "array",
"items": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "type"
},
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "defaultCharsetName"
},
{
"type": "array",
"items": {
"type": "string",
"optional": false
},
"optional": true,
"field": "primaryKeyColumnNames"
},
{
"type": "array",
"items": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int32",
"optional": false,
"field": "jdbcType"
},
{
"type": "int32",
"optional": true,
"field": "nativeType"
},
{
"type": "string",
"optional": false,
"field": "typeName"
},
{
"type": "string",
"optional": true,
"field": "typeExpression"
},
{
"type": "string",
"optional": true,
"field": "charsetName"
},
{
"type": "int32",
"optional": true,
"field": "length"
},
{
"type": "int32",
"optional": true,
"field": "scale"
},
{
"type": "int32",
"optional": false,
"field": "position"
},
{
"type": "boolean",
"optional": true,
"field": "optional"
},
{
"type": "boolean",
"optional": true,
"field": "autoIncremented"
},
{
"type": "boolean",
"optional": true,
"field": "generated"
}
],
"optional": false,
"name": "io.debezium.connector.schema.Column"
},
"optional": false,
"field": "columns"
}
],
"optional": false,
"name": "io.debezium.connector.schema.Table",
"field": "table"
}
],
"optional": false,
"name": "io.debezium.connector.schema.Change"
},
"optional": false,
"field": "tableChanges"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.SchemaChangeValue"
},
"payload": {
"source": {
"version": "1.7.0.Final",
"connector": "mysql",
"name": "testc",
"ts_ms": 1638187055631,
"snapshot": "true",
"db": "inventory",
"sequence": null,
"table": "geom",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": null,
"query": null
},
"databaseName": "inventory",
"schemaName": null,
"ddl": "CREATE TABLE `geom` (\n `id` int(11) NOT NULL AUTO_INCREMENT,\n `g` geometry NOT NULL,\n `h` geometry DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=latin1",
"tableChanges": []
}
}

0 comments on commit d5651cd

Please sign in to comment.