Skip to content

Commit

Permalink
Add support to map data type (#232)
Browse files Browse the repository at this point in the history
* Add support to map type

* Add support to map type
  • Loading branch information
ismailsimsek committed Sep 9, 2023
1 parent bd7b556 commit 0841617
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,11 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
}
break;
case "map":
throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!");
//break;
String keyFieldType = jsonSchemaFieldNode.get("keys").get("type").textValue();
String varFieldlType = jsonSchemaFieldNode.get("keys").get("type").textValue();
Types.MapType mapField = Types.MapType.ofOptional(columnId, ++columnId, icebergFieldType(fieldName+".keys", keyFieldType), icebergFieldType(fieldName+".values", varFieldlType));
schemaColumns.add(Types.NestedField.optional(++columnId,fieldName, mapField));
break;
case "struct":
// create it as struct, nested type
List<Types.NestedField> subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class IcebergChangeConsumerTest extends BaseSparkTest {
@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
SourcePostgresqlDB.runSQL("CREATE EXTENSION hstore;");
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.data_types;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_types (\n" +
Expand All @@ -77,21 +78,25 @@ public void testConsumingVariousDataTypes() throws Exception {
" c_uuid UUID,\n" +
" c_bytea BYTEA,\n" +
" c_json JSON,\n" +
" c_jsonb JSONB\n" +
" c_jsonb JSONB,\n" +
" c_hstore_keyval hstore,\n" +
" c_last_field VARCHAR\n" +
" );";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.data_types (" +
"c_id, " +
"c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " +
"c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid,c_bytea, " +
"c_json, c_jsonb) " +
"c_json, c_jsonb, c_hstore_keyval, c_last_field) " +
"VALUES (1, null, null, null,null,null,null," +
"null,null,null,null,null,null,null," +
"null,null)," +
"null,null, null, null)," +
"(2, 'val_text', 'A', 123, current_date , current_timestamp, current_timestamp," +
"'1.23'::float,'1234566.34456'::decimal,'345672123.452'::numeric, interval '1 day',false," +
"'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID, 'aBC'::bytea," +
"'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb" +
"'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb, " +
"'mapkey1=>1, mapkey2=>2'::hstore, " +
"'stringvalue' " +
")";
SourcePostgresqlDB.runSQL(sql);
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
Expand All @@ -106,6 +111,15 @@ public void testConsumingVariousDataTypes() throws Exception {
return false;
}
});
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.data_types");
df.show(true);
return df.count() == 2;
} catch (Exception e) {
return false;
}
});
}

@Test
Expand Down Expand Up @@ -160,7 +174,7 @@ public void testSchemaChanges() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
//ds.show();
ds.show();
return
ds.where("__op == 'r'").count() == 4 // snapshot rows. initial table data
&& ds.where("__op == 'u'").count() == 3 // 3 update
Expand Down Expand Up @@ -314,6 +328,7 @@ public Map<String, String> getConfigOverrides() {
config.put("debezium.sink.iceberg.write.format.default", "orc");
config.put("debezium.sink.iceberg.destination-regexp", "\\d");
//config.put("debezium.sink.iceberg.destination-regexp-replace", "");
config.put("debezium.source.hstore.handling.mode", "map");
return config;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager {
public static final String POSTGRES_USER = "postgres";
public static final String POSTGRES_PASSWORD = "postgres";
public static final String POSTGRES_DBNAME = "postgres";
public static final String POSTGRES_IMAGE = "debezium/example-postgres:2.1.2.Final";
public static final String POSTGRES_IMAGE = "debezium/example-postgres:2.3";
public static final String POSTGRES_HOST = "localhost";
public static final Integer POSTGRES_PORT_DEFAULT = 5432;
private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class);
Expand Down

0 comments on commit 0841617

Please sign in to comment.