diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 6db1111a..3db022bc 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -84,6 +85,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String warehouseLocation; @ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS") String defaultFs; + @ConfigProperty(name = "debezium.sink.iceberg.destination-regexp", defaultValue = "") + protected Optional destinationRegexp; + @ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "") + protected Optional destinationRegexpReplace; @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") String tablePrefix; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") @@ -154,13 +159,12 @@ public void handleBatch(List> records, DebeziumEngin throw new DebeziumException(ex); } }) - .collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable)); + .collect(Collectors.groupingBy(IcebergChangeEvent::destination)); // consume list of events for each destination table - for (Map.Entry> event : result.entrySet()) { - final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); - Table icebergTable = this.loadIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0)); - icebergTableOperator.addToTable(icebergTable, event.getValue()); + for (Map.Entry> tableEvents : result.entrySet()) { + Table icebergTable = this.loadIcebergTable(icebergCatalog, mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0)); + icebergTableOperator.addToTable(icebergTable, tableEvents.getValue()); } // workaround! somehow offset is not saved to file unless we call committer.markProcessed @@ -203,4 +207,11 @@ protected void logConsumerProgress(long numUploadedEvents) { } } + public TableIdentifier mapDestination(String destination) { + final String tableName = destination + .replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse("")) + .replace(".", "_"); + + return TableIdentifier.of(Namespace.of(namespace), tablePrefix + tableName); + } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index be20a265..565543e8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -62,8 +62,8 @@ public Schema icebergSchema() { return jsonSchema.icebergSchema(); } - public String destinationTable() { - return destination.replace(".", "_"); + public String destination() { + return destination; } public GenericRecord asIcebergRecord(Schema schema) { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index ae4b2d4a..cbbf3f16 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -16,7 +16,10 @@ import io.quarkus.test.junit.TestProfile; import java.time.Duration; +import javax.inject.Inject; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.awaitility.Awaitility; @@ -42,6 +45,11 @@ public class IcebergChangeConsumerTest extends BaseSparkTest { @ConfigProperty(name = "debezium.sink.type") String sinkType; + @Inject + IcebergChangeConsumer icebergConsumer; + @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") + String namespace; + @Test public void testConsumingVariousDataTypes() throws Exception { assertEquals(sinkType, "iceberg"); @@ -277,4 +285,10 @@ public void testPartitionedTable() { S3Minio.listFiles(); } + @Test + public void testMapDestination() { + assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table1")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2")); + } + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java index f7923cf8..2b66de78 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java @@ -20,6 +20,8 @@ public class IcebergChangeConsumerTestProfile implements QuarkusTestProfile { public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("debezium.sink.iceberg.write.format.default", "orc"); + config.put("debezium.sink.iceberg.destination-regexp", "\\d"); + //config.put("debezium.sink.iceberg.destination-regexp-replace", ""); return config; } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java index f0e86bdf..9250e91a 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java @@ -55,7 +55,7 @@ class IcebergTableOperatorTest extends BaseSparkTest { public Table createTable(IcebergChangeEvent sampleEvent) { HadoopCatalog icebergCatalog = getIcebergCatalog(); - final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destinationTable()); + final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination()); return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java index b51e2a2e..751d53ad 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java @@ -30,7 +30,7 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager { public static final String MYSQL_PASSWORD = "mysqlpw"; public static final String MYSQL_DEBEZIUM_USER = "debezium"; public static final String MYSQL_DEBEZIUM_PASSWORD = "dbz"; - public static final String MYSQL_IMAGE = "debezium/example-mysql:1.7.0.Final"; + public static final String MYSQL_IMAGE = "debezium/example-mysql:1.9.2.Final"; public static final String MYSQL_HOST = "127.0.0.1"; public static final String MYSQL_DATABASE = "inventory"; public static final Integer MYSQL_PORT_DEFAULT = 3306; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java index 0ff7a71a..8a0ab6a1 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java @@ -28,7 +28,7 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager { public static final String POSTGRES_USER = "postgres"; public static final String POSTGRES_PASSWORD = "postgres"; public static final String POSTGRES_DBNAME = "postgres"; - public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.7.0.Final"; + public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.9.2.Final"; public static final String POSTGRES_HOST = "localhost"; public static final Integer POSTGRES_PORT_DEFAULT = 5432; private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class); diff --git a/pom.xml b/pom.xml index 1f5c50b1..4c1ffb3b 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,7 @@ 3.3.1 2.17.120 1.12.2 - 1.15.3 + 1.17.1 3.1.0 1.9.2.Final