From 33d1cfdd1fe288fc4dac574e82dc1826ee7a7ed8 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 9 Sep 2023 11:47:18 +0200 Subject: [PATCH 1/2] Add support to map type --- .../server/iceberg/IcebergChangeEvent.java | 7 +++++-- .../iceberg/IcebergChangeConsumerTest.java | 16 +++++++++++----- .../testresources/SourcePostgresqlDB.java | 2 +- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 80ea8bfe..ef819282 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -295,8 +295,11 @@ private List icebergSchema(JsonNode eventSchema, String schem } break; case "map": - throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!"); - //break; + String keyFieldType = jsonSchemaFieldNode.get("keys").get("type").textValue(); + String varFieldlType = jsonSchemaFieldNode.get("keys").get("type").textValue(); + Types.MapType mapField = Types.MapType.ofOptional(columnId, ++columnId, icebergFieldType(fieldName+".keys", keyFieldType), icebergFieldType(fieldName+".values", varFieldlType)); + schemaColumns.add(Types.NestedField.optional(++columnId,fieldName, mapField)); + break; case "struct": // create it as struct, nested type List subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 79afb5a8..9a869cc0 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -59,6 +59,7 @@ public class IcebergChangeConsumerTest extends BaseSparkTest { @Test public void testConsumingVariousDataTypes() throws Exception { assertEquals(sinkType, "iceberg"); + SourcePostgresqlDB.runSQL("CREATE EXTENSION hstore;"); String sql = "\n" + " DROP TABLE IF EXISTS inventory.data_types;\n" + " CREATE TABLE IF NOT EXISTS inventory.data_types (\n" + @@ -77,21 +78,25 @@ public void testConsumingVariousDataTypes() throws Exception { " c_uuid UUID,\n" + " c_bytea BYTEA,\n" + " c_json JSON,\n" + - " c_jsonb JSONB\n" + + " c_jsonb JSONB,\n" + + " c_hstore_keyval hstore,\n" + + " c_last_field VARCHAR\n" + " );"; SourcePostgresqlDB.runSQL(sql); sql = "INSERT INTO inventory.data_types (" + "c_id, " + "c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " + "c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid,c_bytea, " + - "c_json, c_jsonb) " + + "c_json, c_jsonb, c_hstore_keyval, c_last_field) " + "VALUES (1, null, null, null,null,null,null," + "null,null,null,null,null,null,null," + - "null,null)," + + "null,null, null, null)," + "(2, 'val_text', 'A', 123, current_date , current_timestamp, current_timestamp," + "'1.23'::float,'1234566.34456'::decimal,'345672123.452'::numeric, interval '1 day',false," + "'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID, 'aBC'::bytea," + - "'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb" + + "'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb, " + + "'akey=>1,akey=>2'::hstore, " + + "'stringvalue' " + ")"; SourcePostgresqlDB.runSQL(sql); Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> { @@ -160,7 +165,7 @@ public void testSchemaChanges() throws Exception { Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { try { Dataset ds = getTableData("testc.inventory.customers"); - //ds.show(); + ds.show(); return ds.where("__op == 'r'").count() == 4 // snapshot rows. initial table data && ds.where("__op == 'u'").count() == 3 // 3 update @@ -314,6 +319,7 @@ public Map getConfigOverrides() { config.put("debezium.sink.iceberg.write.format.default", "orc"); config.put("debezium.sink.iceberg.destination-regexp", "\\d"); //config.put("debezium.sink.iceberg.destination-regexp-replace", ""); + config.put("debezium.source.hstore.handling.mode", "map"); return config; } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java index a7c01cfc..7036fd29 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java @@ -28,7 +28,7 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager { public static final String POSTGRES_USER = "postgres"; public static final String POSTGRES_PASSWORD = "postgres"; public static final String POSTGRES_DBNAME = "postgres"; - public static final String POSTGRES_IMAGE = "debezium/example-postgres:2.1.2.Final"; + public static final String POSTGRES_IMAGE = "debezium/example-postgres:2.3"; public static final String POSTGRES_HOST = "localhost"; public static final Integer POSTGRES_PORT_DEFAULT = 5432; private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class); From 5d8ee99ed1a0081a2de7133ff2b5750dcead1e47 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 9 Sep 2023 14:52:58 +0200 Subject: [PATCH 2/2] Add support to map type --- .../server/iceberg/IcebergChangeConsumerTest.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 9a869cc0..0ecc5422 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -95,7 +95,7 @@ public void testConsumingVariousDataTypes() throws Exception { "'1.23'::float,'1234566.34456'::decimal,'345672123.452'::numeric, interval '1 day',false," + "'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID, 'aBC'::bytea," + "'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb, " + - "'akey=>1,akey=>2'::hstore, " + + "'mapkey1=>1, mapkey2=>2'::hstore, " + "'stringvalue' " + ")"; SourcePostgresqlDB.runSQL(sql); @@ -111,6 +111,15 @@ public void testConsumingVariousDataTypes() throws Exception { return false; } }); + Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> { + try { + Dataset df = getTableData("testc.inventory.data_types"); + df.show(true); + return df.count() == 2; + } catch (Exception e) { + return false; + } + }); } @Test