From 5d280b6bfb7bc315b359fcdd15cb9ae0b3a1fd5f Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 14 May 2022 16:27:10 +0200 Subject: [PATCH 01/12] Use regexp to enable flexible destination mapping --- .../server/iceberg/IcebergChangeConsumer.java | 17 ++++++++++++++--- .../server/iceberg/IcebergChangeEvent.java | 4 ++-- .../tableoperator/IcebergTableOperatorTest.java | 2 +- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 6db1111a..af0a1914 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -84,6 +85,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String warehouseLocation; @ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS") String defaultFs; + @ConfigProperty(name = "debezium.sink.iceberg.destination-regexp", defaultValue = "") + protected Optional destinationRegexp; + @ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "") + protected Optional destinationRegexpReplace; @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") String tablePrefix; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") @@ -154,12 +159,11 @@ public void handleBatch(List> records, DebeziumEngin throw new DebeziumException(ex); } }) - .collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable)); + .collect(Collectors.groupingBy(IcebergChangeEvent::destination)); // consume list of events for each destination table for (Map.Entry> event : result.entrySet()) { - final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); - Table icebergTable = this.loadIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0)); + Table icebergTable = this.loadIcebergTable(icebergCatalog, mapDestination(event.getKey()), event.getValue().get(0)); icebergTableOperator.addToTable(icebergTable, event.getValue()); } @@ -203,4 +207,11 @@ protected void logConsumerProgress(long numUploadedEvents) { } } + TableIdentifier mapDestination(String destination) { + final String tableName = destination + .replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse("")) + .replace(".", "_"); + + return TableIdentifier.of(Namespace.of(namespace), tablePrefix + tableName); + } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index be20a265..565543e8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -62,8 +62,8 @@ public Schema icebergSchema() { return jsonSchema.icebergSchema(); } - public String destinationTable() { - return destination.replace(".", "_"); + public String destination() { + return destination; } public GenericRecord asIcebergRecord(Schema schema) { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java index f0e86bdf..9250e91a 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java @@ -55,7 +55,7 @@ class IcebergTableOperatorTest extends BaseSparkTest { public Table createTable(IcebergChangeEvent sampleEvent) { HadoopCatalog icebergCatalog = getIcebergCatalog(); - final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destinationTable()); + final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination()); return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert); } From c9a4b5880a44a7da46a66230e04bfc1ba4aae11f Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 14 May 2022 17:06:56 +0200 Subject: [PATCH 02/12] Use regexp to enable flexible destination mapping --- .../server/iceberg/IcebergChangeConsumer.java | 2 ++ .../server/iceberg/IcebergChangeConsumerTest.java | 14 ++++++++++++++ .../iceberg/IcebergChangeConsumerTestProfile.java | 2 ++ 3 files changed, 18 insertions(+) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index af0a1914..ab1dc7fa 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -9,6 +9,7 @@ package io.debezium.server.iceberg; import io.debezium.DebeziumException; +import io.debezium.annotation.VisibleForTesting; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; @@ -207,6 +208,7 @@ protected void logConsumerProgress(long numUploadedEvents) { } } + @VisibleForTesting TableIdentifier mapDestination(String destination) { final String tableName = destination .replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse("")) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index ae4b2d4a..d7dd9ff6 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -16,7 +16,10 @@ import io.quarkus.test.junit.TestProfile; import java.time.Duration; +import javax.inject.Inject; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.awaitility.Awaitility; @@ -42,6 +45,11 @@ public class IcebergChangeConsumerTest extends BaseSparkTest { @ConfigProperty(name = "debezium.sink.type") String sinkType; + @Inject + IcebergChangeConsumer icebergConsumer; + @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") + String namespace; + @Test public void testConsumingVariousDataTypes() throws Exception { assertEquals(sinkType, "iceberg"); @@ -277,4 +285,10 @@ public void testPartitionedTable() { S3Minio.listFiles(); } + @Test + public void testMapDestination() { + assertEquals(TableIdentifier.of(Namespace.of(namespace), "table"), icebergConsumer.mapDestination("table1")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "table"), icebergConsumer.mapDestination("table2")); + } + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java index f7923cf8..a2e8c561 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java @@ -20,6 +20,8 @@ public class IcebergChangeConsumerTestProfile implements QuarkusTestProfile { public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("debezium.sink.iceberg.write.format.default", "orc"); + config.put("debezium.sink.iceberg.destination-regexp", "\\d"); + config.put("debezium.sink.iceberg.destination-regexp-replace", ""); return config; } From a17f171dd4ee9234efdcd19d1794fc490a958c72 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 14 May 2022 17:13:25 +0200 Subject: [PATCH 03/12] Use regexp to enable flexible destination mapping --- .../debezium/server/iceberg/testresources/TestChangeEvent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java index 1fdbf734..b311d5cb 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java @@ -127,7 +127,7 @@ public V record() { @Override public String destination() { - return destination; + return destination.replace(".", "_"); } @Override From 6efff9bd0cda8591c5e891cc24237e99bc51dce1 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 14 May 2022 17:18:23 +0200 Subject: [PATCH 04/12] Use regexp to enable flexible destination mapping --- .../io/debezium/server/iceberg/testresources/SourceMysqlDB.java | 2 +- .../server/iceberg/testresources/SourcePostgresqlDB.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java index b51e2a2e..751d53ad 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java @@ -30,7 +30,7 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager { public static final String MYSQL_PASSWORD = "mysqlpw"; public static final String MYSQL_DEBEZIUM_USER = "debezium"; public static final String MYSQL_DEBEZIUM_PASSWORD = "dbz"; - public static final String MYSQL_IMAGE = "debezium/example-mysql:1.7.0.Final"; + public static final String MYSQL_IMAGE = "debezium/example-mysql:1.9.2.Final"; public static final String MYSQL_HOST = "127.0.0.1"; public static final String MYSQL_DATABASE = "inventory"; public static final Integer MYSQL_PORT_DEFAULT = 3306; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java index 0ff7a71a..8a0ab6a1 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java @@ -28,7 +28,7 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager { public static final String POSTGRES_USER = "postgres"; public static final String POSTGRES_PASSWORD = "postgres"; public static final String POSTGRES_DBNAME = "postgres"; - public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.7.0.Final"; + public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.9.2.Final"; public static final String POSTGRES_HOST = "localhost"; public static final Integer POSTGRES_PORT_DEFAULT = 5432; private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class); From d8b86fc807df6ba3c0bc4c11b87eed7fdfa3da51 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 14 May 2022 21:28:59 +0200 Subject: [PATCH 05/12] Use regexp to enable flexible destination mapping --- .../debezium/server/iceberg/IcebergChangeConsumer.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index ab1dc7fa..3db022bc 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -9,7 +9,6 @@ package io.debezium.server.iceberg; import io.debezium.DebeziumException; -import io.debezium.annotation.VisibleForTesting; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; @@ -163,9 +162,9 @@ public void handleBatch(List> records, DebeziumEngin .collect(Collectors.groupingBy(IcebergChangeEvent::destination)); // consume list of events for each destination table - for (Map.Entry> event : result.entrySet()) { - Table icebergTable = this.loadIcebergTable(icebergCatalog, mapDestination(event.getKey()), event.getValue().get(0)); - icebergTableOperator.addToTable(icebergTable, event.getValue()); + for (Map.Entry> tableEvents : result.entrySet()) { + Table icebergTable = this.loadIcebergTable(icebergCatalog, mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0)); + icebergTableOperator.addToTable(icebergTable, tableEvents.getValue()); } // workaround! somehow offset is not saved to file unless we call committer.markProcessed @@ -208,8 +207,7 @@ protected void logConsumerProgress(long numUploadedEvents) { } } - @VisibleForTesting - TableIdentifier mapDestination(String destination) { + public TableIdentifier mapDestination(String destination) { final String tableName = destination .replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse("")) .replace(".", "_"); From 596e279ffbe31d9b5066f989d71c614b1fc1292e Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 14 May 2022 21:48:46 +0200 Subject: [PATCH 06/12] Use regexp to enable flexible destination mapping --- .../src/main/java/io/debezium/server/iceberg/IcebergUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index a1c33452..47c4454d 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -42,7 +42,7 @@ public static Map getConfigSubset(Config config, String prefix) for (String propName : config.getPropertyNames()) { if (propName.startsWith(prefix)) { final String newPropName = propName.substring(prefix.length()); - ret.put(newPropName, config.getValue(propName, String.class)); + ret.put(newPropName, config.getConfigValue(propName).getValue()); } } From 61809695fd02ec9faafcde9e795a417ef869fbc3 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 14 May 2022 22:39:56 +0200 Subject: [PATCH 07/12] Use regexp to enable flexible destination mapping --- .../server/iceberg/IcebergChangeConsumerTestProfile.java | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java index a2e8c561..2b66de78 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java @@ -21,7 +21,7 @@ public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("debezium.sink.iceberg.write.format.default", "orc"); config.put("debezium.sink.iceberg.destination-regexp", "\\d"); - config.put("debezium.sink.iceberg.destination-regexp-replace", ""); + //config.put("debezium.sink.iceberg.destination-regexp-replace", ""); return config; } diff --git a/pom.xml b/pom.xml index 1f5c50b1..4c1ffb3b 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,7 @@ 3.3.1 2.17.120 1.12.2 - 1.15.3 + 1.17.1 3.1.0 1.9.2.Final From a9423c5f8e1c1dd2c38ae5d0f4766ca99cc66c7b Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 15 May 2022 16:12:09 +0200 Subject: [PATCH 08/12] Use regexp to enable flexible destination mapping --- .../io/debezium/server/iceberg/IcebergChangeConsumerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index d7dd9ff6..cbbf3f16 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -287,8 +287,8 @@ public void testPartitionedTable() { @Test public void testMapDestination() { - assertEquals(TableIdentifier.of(Namespace.of(namespace), "table"), icebergConsumer.mapDestination("table1")); - assertEquals(TableIdentifier.of(Namespace.of(namespace), "table"), icebergConsumer.mapDestination("table2")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table1")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2")); } } From ff15e91b9de70e1e3432e57181efd82bf4e840c6 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 15 May 2022 16:24:17 +0200 Subject: [PATCH 09/12] Use regexp to enable flexible destination mapping --- .../server/iceberg/IcebergChangeConsumerTestProfile.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java index 2b66de78..a2e8c561 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java @@ -21,7 +21,7 @@ public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("debezium.sink.iceberg.write.format.default", "orc"); config.put("debezium.sink.iceberg.destination-regexp", "\\d"); - //config.put("debezium.sink.iceberg.destination-regexp-replace", ""); + config.put("debezium.sink.iceberg.destination-regexp-replace", ""); return config; } From bff8ccff7d60da915a14f81bcc3d0eaa90c1dc7a Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 15 May 2022 16:56:01 +0200 Subject: [PATCH 10/12] Use regexp to enable flexible destination mapping --- .../server/iceberg/IcebergChangeConsumerTestProfile.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java index a2e8c561..2b66de78 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java @@ -21,7 +21,7 @@ public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("debezium.sink.iceberg.write.format.default", "orc"); config.put("debezium.sink.iceberg.destination-regexp", "\\d"); - config.put("debezium.sink.iceberg.destination-regexp-replace", ""); + //config.put("debezium.sink.iceberg.destination-regexp-replace", ""); return config; } From 7fb8fe0b1cf442e6297074027b8dc12109faa089 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 15 May 2022 17:31:03 +0200 Subject: [PATCH 11/12] Use regexp to enable flexible destination mapping --- .../src/main/java/io/debezium/server/iceberg/IcebergUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 47c4454d..a1c33452 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -42,7 +42,7 @@ public static Map getConfigSubset(Config config, String prefix) for (String propName : config.getPropertyNames()) { if (propName.startsWith(prefix)) { final String newPropName = propName.substring(prefix.length()); - ret.put(newPropName, config.getConfigValue(propName).getValue()); + ret.put(newPropName, config.getValue(propName, String.class)); } } From 6b589c40bc4fdc5799c773ae4c815e85f299f094 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 15 May 2022 17:42:09 +0200 Subject: [PATCH 12/12] Use regexp to enable flexible destination mapping --- .../debezium/server/iceberg/testresources/TestChangeEvent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java index b311d5cb..1fdbf734 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java @@ -127,7 +127,7 @@ public V record() { @Override public String destination() { - return destination.replace(".", "_"); + return destination; } @Override