Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for array data type #63

Merged
merged 6 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
Expand All @@ -35,6 +36,33 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema) {
return getIcebergSchema(eventSchema, "", 0);
}

public static PrimitiveType getIcebergFieldType(String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
return Types.IntegerType.get();
case "int64": // long 8 bytes
return Types.LongType.get();
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
return Types.FloatType.get();
case "float64": // double is represented in 64 bits
return Types.DoubleType.get();
case "boolean":
return Types.BooleanType.get();
case "string":
return Types.StringType.get();
case "bytes":
return Types.BinaryType.get();
default:
// default to String type
return Types.StringType.get();
//throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!");
}
}

public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
Expand All @@ -45,35 +73,17 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str
String fieldType = jsonSchemaFieldNode.get("type").textValue();
LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType);
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.IntegerType.get()));
break;
case "int64": // long 8 bytes
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.LongType.get()));
break;
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.FloatType.get()));
break;
case "float64": // double is represented in 64 bits
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.DoubleType.get()));
break;
case "boolean":
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.BooleanType.get()));
break;
case "string":
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
break;
case "bytes":
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.BinaryType.get()));
break;
case "array":
throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!");
//schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional()));
//break;
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
PrimitiveType item = IcebergUtil.getIcebergFieldType(items.get("type").textValue());
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
//throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!");
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
break;
case "map":
throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!");
//schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
Expand All @@ -84,9 +94,8 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default:
// default to String type
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, IcebergUtil.getIcebergFieldType(fieldType)));
break;
}
}
Expand Down Expand Up @@ -154,7 +163,7 @@ static Object jsonToGenericRecordVal(Types.NestedField field,
}
break;
case LIST:
val = jsonObjectMapper.convertValue(node, List.class);
val = jsonObjectMapper.convertValue(node, ArrayList.class);
break;
case MAP:
val = jsonObjectMapper.convertValue(node, Map.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ public ConfigSource() {
config.put("debezium.source.offset.flush.interval.ms", "60000");
config.put("debezium.source.database.server.name", "testc");
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
config.put("debezium.source.table.whitelist", "inventory.*");
config.put("%postgresql.debezium.source.database.whitelist", "inventory");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
config.put("debezium.source.table.whitelist", "inventory.*");
config.put("debezium.source.include.schema.changes", "false");

config.put("quarkus.log.level", "INFO");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("quarkus.profile", "mysql");
config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,39 @@ public void testConsumingVariousDataTypes() throws Exception {
});
}

@Test
public void testConsumingArrayDataType() throws Exception {
String sql = " DROP TABLE IF EXISTS inventory.array_data;\n" +
" CREATE TABLE IF NOT EXISTS inventory.array_data (\n" +
" name text,\n" +
" pay_by_quarter integer[],\n" +
" schedule text[][]\n" +
" );\n" +
" INSERT INTO inventory.array_data\n" +
" VALUES " +
"('Carol2',\n" +
" ARRAY[20000, 25000, 25000, 25000],\n" +
" ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n"+
"('Bill',\n" +
" '{10000, 10000, 10000, 10000}',\n" +
" '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" +
" ('Carol1',\n" +
" '{20000, 25000, 25000, 25000}',\n" +
" '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" +
";";
SourcePostgresqlDB.runSQL(sql);

Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.array_data");
df.show(false);
return df.count() >= 3;
} catch (Exception e) {
return false;
}
});
}

@Test
public void testSchemaChanges() throws Exception {
// TEST add new columns, drop not null constraint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class TestIcebergUtil {
final String serdeWithSchema = Testing.Files.readResourceAsString("json/serde-with-schema.json");
final String unwrapWithSchema = Testing.Files.readResourceAsString("json/unwrap-with-schema.json");
final String unwrapWithGeomSchema = Testing.Files.readResourceAsString("json/serde-with-schema_geom.json");
final String unwrapWithArraySchema = Testing.Files.readResourceAsString("json/serde-with-array.json");

@Test
public void testNestedJsonRecord() throws JsonProcessingException {
Expand All @@ -47,6 +48,20 @@ public void testUnwrapJsonRecord() throws IOException {
assertEquals(16850, record.getField("order_date"));
}

@Test
public void testNestedArrayJsonRecord() throws JsonProcessingException {
JsonNode jsonData = new ObjectMapper().readTree(unwrapWithArraySchema);
JsonNode jsonPayload = jsonData.get("payload");
JsonNode jsonSchema = jsonData.get("schema");
List<Types.NestedField> schemaFields = IcebergUtil.getIcebergSchema(jsonSchema);
Schema schema = new Schema(schemaFields);
assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list<int>, 4: schedule: optional list<string>, 6:"));
//System.out.println(schema.asStruct());
GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
//System.out.println(record);
assertTrue( record.toString().contains("[10000, 10001, 10002, 10003]"));
}

@Test
public void testNestedGeomJsonRecord() throws JsonProcessingException {
JsonNode jsonData = new ObjectMapper().readTree(unwrapWithGeomSchema);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "array",
"items": {
"type": "int32",
"optional": true
},
"optional": true,
"field": "pay_by_quarter"
},
{
"type": "array",
"items": {
"type": "string",
"optional": true
},
"optional": true,
"field": "schedule"
},
{
"type": "string",
"optional": true,
"field": "__op"
},
{
"type": "string",
"optional": true,
"field": "__table"
},
{
"type": "int64",
"optional": true,
"field": "__source_ts_ms"
},
{
"type": "string",
"optional": true,
"field": "__db"
},
{
"type": "string",
"optional": true,
"field": "__deleted"
}
],
"optional": false,
"name": "testc.inventory.array_data.Value"
},
"payload": {
"name": "Bill",
"pay_by_quarter": [
10000,
10001,
10002,
10003
],
"schedule": [
"[Ljava.lang.String;@508917a0",
"[Ljava.lang.String;@7412bd2"
],
"__op": "c",
"__table": "array_data",
"__source_ts_ms": 1638128893618,
"__db": "postgres",
"__deleted": "false"
}
}