Skip to content

Commit

Permalink
Add support for array data type (#63)
Browse files Browse the repository at this point in the history
* support array data type
  • Loading branch information
ismailsimsek committed Nov 29, 2021
1 parent 251d38a commit 581fa2e
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
Expand All @@ -35,6 +36,33 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema) {
return getIcebergSchema(eventSchema, "", 0);
}

public static PrimitiveType getIcebergFieldType(String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
return Types.IntegerType.get();
case "int64": // long 8 bytes
return Types.LongType.get();
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
return Types.FloatType.get();
case "float64": // double is represented in 64 bits
return Types.DoubleType.get();
case "boolean":
return Types.BooleanType.get();
case "string":
return Types.StringType.get();
case "bytes":
return Types.BinaryType.get();
default:
// default to String type
return Types.StringType.get();
//throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!");
}
}

public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
Expand All @@ -45,35 +73,17 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str
String fieldType = jsonSchemaFieldNode.get("type").textValue();
LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType);
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.IntegerType.get()));
break;
case "int64": // long 8 bytes
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.LongType.get()));
break;
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.FloatType.get()));
break;
case "float64": // double is represented in 64 bits
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.DoubleType.get()));
break;
case "boolean":
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.BooleanType.get()));
break;
case "string":
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
break;
case "bytes":
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.BinaryType.get()));
break;
case "array":
throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!");
//schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional()));
//break;
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
PrimitiveType item = IcebergUtil.getIcebergFieldType(items.get("type").textValue());
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
//throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!");
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
break;
case "map":
throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!");
//schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
Expand All @@ -84,9 +94,8 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default:
// default to String type
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, IcebergUtil.getIcebergFieldType(fieldType)));
break;
}
}
Expand Down Expand Up @@ -154,7 +163,7 @@ static Object jsonToGenericRecordVal(Types.NestedField field,
}
break;
case LIST:
val = jsonObjectMapper.convertValue(node, List.class);
val = jsonObjectMapper.convertValue(node, ArrayList.class);
break;
case MAP:
val = jsonObjectMapper.convertValue(node, Map.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ public ConfigSource() {
config.put("debezium.source.offset.flush.interval.ms", "60000");
config.put("debezium.source.database.server.name", "testc");
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
config.put("debezium.source.table.whitelist", "inventory.*");
config.put("%postgresql.debezium.source.database.whitelist", "inventory");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
config.put("debezium.source.table.whitelist", "inventory.*");
config.put("debezium.source.include.schema.changes", "false");

config.put("quarkus.log.level", "INFO");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("quarkus.profile", "mysql");
config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,39 @@ public void testConsumingVariousDataTypes() throws Exception {
});
}

@Test
public void testConsumingArrayDataType() throws Exception {
String sql = " DROP TABLE IF EXISTS inventory.array_data;\n" +
" CREATE TABLE IF NOT EXISTS inventory.array_data (\n" +
" name text,\n" +
" pay_by_quarter integer[],\n" +
" schedule text[][]\n" +
" );\n" +
" INSERT INTO inventory.array_data\n" +
" VALUES " +
"('Carol2',\n" +
" ARRAY[20000, 25000, 25000, 25000],\n" +
" ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n"+
"('Bill',\n" +
" '{10000, 10000, 10000, 10000}',\n" +
" '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" +
" ('Carol1',\n" +
" '{20000, 25000, 25000, 25000}',\n" +
" '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" +
";";
SourcePostgresqlDB.runSQL(sql);

Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.array_data");
df.show(false);
return df.count() >= 3;
} catch (Exception e) {
return false;
}
});
}

@Test
public void testSchemaChanges() throws Exception {
// TEST add new columns, drop not null constraint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class TestIcebergUtil {
final String serdeWithSchema = Testing.Files.readResourceAsString("json/serde-with-schema.json");
final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json");
final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json");
final String unwrapWithArraySchema = Testing.Files.readResourceAsString("json/serde-with-array.json");

@Test
public void testNestedJsonRecord() throws JsonProcessingException {
Expand All @@ -47,6 +48,20 @@ public void testUnwrapJsonRecord() throws IOException {
assertEquals(16850, record.getField("order_date"));
}

@Test
public void testNestedArrayJsonRecord() throws JsonProcessingException {
JsonNode jsonData = new ObjectMapper().readTree(unwrapWithArraySchema);
JsonNode jsonPayload = jsonData.get("payload");
JsonNode jsonSchema = jsonData.get("schema");
List<Types.NestedField> schemaFields = IcebergUtil.getIcebergSchema(jsonSchema);
Schema schema = new Schema(schemaFields);
assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list<int>, 4: schedule: optional list<string>, 6:"));
//System.out.println(schema.asStruct());
GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
//System.out.println(record);
assertTrue( record.toString().contains("[10000, 10001, 10002, 10003]"));
}

@Test
public void testNestedGeomJsonRecord() throws JsonProcessingException {
JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "array",
"items": {
"type": "int32",
"optional": true
},
"optional": true,
"field": "pay_by_quarter"
},
{
"type": "array",
"items": {
"type": "string",
"optional": true
},
"optional": true,
"field": "schedule"
},
{
"type": "string",
"optional": true,
"field": "__op"
},
{
"type": "string",
"optional": true,
"field": "__table"
},
{
"type": "int64",
"optional": true,
"field": "__source_ts_ms"
},
{
"type": "string",
"optional": true,
"field": "__db"
},
{
"type": "string",
"optional": true,
"field": "__deleted"
}
],
"optional": false,
"name": "testc.inventory.array_data.Value"
},
"payload": {
"name": "Bill",
"pay_by_quarter": [
10000,
10001,
10002,
10003
],
"schedule": [
"[Ljava.lang.String;@508917a0",
"[Ljava.lang.String;@7412bd2"
],
"__op": "c",
"__table": "array_data",
"__source_ts_ms": 1638128893618,
"__db": "postgres",
"__deleted": "false"
}
}

0 comments on commit 581fa2e

Please sign in to comment.