diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java index feb51fdc..ea19464e 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java @@ -13,9 +13,12 @@ import io.debezium.server.iceberg.testresources.SourceMangoDB; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -30,12 +33,12 @@ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = SourceMangoDB.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergChangeConsumerMangodbTestProfile.class) +@TestProfile(IcebergChangeConsumerMangodbTest.IcebergChangeConsumerMangodbTestProfile.class) public class IcebergChangeConsumerMangodbTest extends BaseSparkTest { @Test public void testSimpleUpload() { - + Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { try { Dataset df = getTableData("testc.inventory.products"); @@ -47,4 +50,35 @@ public void testSimpleUpload() { }); } + public static class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile { + + //This method allows us to override configuration properties. + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("quarkus.profile", "mongodb"); + config.put("%mongodb.debezium.source.connector.class", "io.debezium.connector.mongodb.MongoDbConnector"); + config.put("%mongodb.debezium.transforms.unwrap.type", "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState"); + config.put("%mongodb.debezium.transforms.unwrap.add.fields", "op,source.ts_ms,db"); + config.put("%mongodb.debezium.sink.iceberg.allow-field-addition", "false"); + config.put("%mongodb.debezium.source.mongodb.name", "testc"); + config.put("%mongodb.debezium.source.database.include.list", "inventory"); // ok + config.put("%mongodb.debezium.source.collection.include.list", "inventory.products"); + + // IMPORTANT !!! FIX MongoDbConnector KEY FIELD NAME "id"=>"_id" !!! + config.put("%mongodb.debezium.transforms", "unwrap,renamekeyfield"); + config.put("%mongodb.debezium.transforms.renamekeyfield.type", + "org.apache.kafka.connect.transforms.ReplaceField$Key"); + config.put("%mongodb.debezium.transforms.renamekeyfield.renames", "id:_id"); + + return config; + } + + @Override + public String getConfigProfile() { + return "mongodb"; + } + + } + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTestProfile.java deleted file mode 100644 index 237a31b0..00000000 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTestProfile.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg; - -import io.quarkus.test.junit.QuarkusTestProfile; - -import java.util.HashMap; -import java.util.Map; - -public class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - config.put("quarkus.profile", "mongodb"); - config.put("%mongodb.debezium.source.connector.class", "io.debezium.connector.mongodb.MongoDbConnector"); - config.put("%mongodb.debezium.transforms.unwrap.type", "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState"); - config.put("%mongodb.debezium.transforms.unwrap.add.fields", "op,source.ts_ms,db"); - config.put("%mongodb.debezium.sink.iceberg.allow-field-addition", "false"); - config.put("%mongodb.debezium.source.mongodb.name", "testc"); - config.put("%mongodb.debezium.source.database.include.list", "inventory"); // ok - config.put("%mongodb.debezium.source.collection.include.list", "inventory.products"); - - // IMPORTANT !!! FIX MongoDbConnector KEY FIELD NAME "id"=>"_id" !!! - config.put("%mongodb.debezium.transforms", "unwrap,renamekeyfield"); - config.put("%mongodb.debezium.transforms.renamekeyfield.type", - "org.apache.kafka.connect.transforms.ReplaceField$Key"); - config.put("%mongodb.debezium.transforms.renamekeyfield.renames", "id:_id"); - - return config; - } - - @Override - public String getConfigProfile() { - return "mongodb"; - } - -} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java index d6733f8e..02ec8bb1 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java @@ -13,9 +13,12 @@ import io.debezium.server.iceberg.testresources.SourceMysqlDB; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -30,7 +33,7 @@ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergChangeConsumerMysqlTestProfile.class) +@TestProfile(IcebergChangeConsumerMysqlTest.IcebergChangeConsumerMysqlTestProfile.class) public class IcebergChangeConsumerMysqlTest extends BaseSparkTest { @Test @@ -77,4 +80,23 @@ public void testSimpleUpload() throws Exception { } + public static class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile { + + //This method allows us to override configuration properties. + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("quarkus.profile", "mysql"); + config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector"); + config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); + return config; + } + + @Override + public String getConfigProfile() { + return "mysql"; + } + + } + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java deleted file mode 100644 index 1873ddbe..00000000 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg; - -import io.quarkus.test.junit.QuarkusTestProfile; - -import java.util.HashMap; -import java.util.Map; - -public class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - config.put("quarkus.profile", "mysql"); - config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector"); - config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); - return config; - } - - @Override - public String getConfigProfile() { - return "mysql"; - } - -} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index f1a88042..5b79718e 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -13,9 +13,12 @@ import io.debezium.server.iceberg.testresources.SourcePostgresqlDB; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import javax.inject.Inject; import org.apache.iceberg.catalog.Namespace; @@ -38,7 +41,7 @@ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergChangeConsumerTestProfile.class) +@TestProfile(IcebergChangeConsumerTest.IcebergChangeConsumerTestProfile.class) public class IcebergChangeConsumerTest extends BaseSparkTest { protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class); @@ -291,4 +294,18 @@ public void testMapDestination() { assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2")); } + public static class IcebergChangeConsumerTestProfile implements QuarkusTestProfile { + + //This method allows us to override configuration properties. + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("debezium.sink.iceberg.write.format.default", "orc"); + config.put("debezium.sink.iceberg.destination-regexp", "\\d"); + //config.put("debezium.sink.iceberg.destination-regexp-replace", ""); + + return config; + } + } + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java deleted file mode 100644 index 2b66de78..00000000 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg; - -import io.quarkus.test.junit.QuarkusTestProfile; - -import java.util.HashMap; -import java.util.Map; - -public class IcebergChangeConsumerTestProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - config.put("debezium.sink.iceberg.write.format.default", "orc"); - config.put("debezium.sink.iceberg.destination-regexp", "\\d"); - //config.put("debezium.sink.iceberg.destination-regexp-replace", ""); - - return config; - } -} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java index b69ed085..aa426d0a 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java @@ -11,10 +11,13 @@ import io.debezium.server.iceberg.testresources.*; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.inject.Inject; import org.apache.spark.sql.Dataset; @@ -30,7 +33,7 @@ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class) +@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class) public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest { @Inject @@ -142,5 +145,18 @@ public void testSimpleUpsertCompositeKey() throws Exception { Assertions.assertEquals(ds.count(), 0); Assertions.assertEquals(ds.where("first_name= 'user2'").count(), 0); } - + + public static class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements QuarkusTestProfile { + + //This method allows us to override configuration properties. + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + + config.put("debezium.sink.iceberg.upsert", "true"); + config.put("debezium.sink.iceberg.upsert-keep-deletes", "false"); + return config; + } + } + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index 8fc176fb..3cd0f0e7 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -11,10 +11,13 @@ import io.debezium.server.iceberg.testresources.*; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.inject.Inject; import org.apache.spark.sql.Dataset; @@ -30,7 +33,7 @@ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergChangeConsumerUpsertTestProfile.class) +@TestProfile(IcebergChangeConsumerUpsertTest.IcebergChangeConsumerUpsertTestProfile.class) public class IcebergChangeConsumerUpsertTest extends BaseSparkTest { @Inject @@ -165,4 +168,17 @@ public void testSimpleUpsertNoKey() throws Exception { Assertions.assertEquals(ds.where("id = 1 AND __op= 'c' AND first_name= 'user2'").count(), 2); } + public static class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfile { + + //This method allows us to override configuration properties. + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + + config.put("debezium.sink.iceberg.upsert", "true"); + config.put("debezium.sink.iceberg.upsert-keep-deletes", "true"); + return config; + } + } + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestDeleteDeletesProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestDeleteDeletesProfile.java deleted file mode 100644 index 3255deb3..00000000 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestDeleteDeletesProfile.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg; - -import io.quarkus.test.junit.QuarkusTestProfile; - -import java.util.HashMap; -import java.util.Map; - -public class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - - config.put("debezium.sink.iceberg.upsert", "true"); - config.put("debezium.sink.iceberg.upsert-keep-deletes", "false"); - return config; - } -} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestProfile.java deleted file mode 100644 index c3849096..00000000 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestProfile.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg; - -import io.quarkus.test.junit.QuarkusTestProfile; - -import java.util.HashMap; -import java.util.Map; - -public class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - - config.put("debezium.sink.iceberg.upsert", "true"); - config.put("debezium.sink.iceberg.upsert-keep-deletes", "true"); - return config; - } -} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index 52b1e413..cca7060c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -13,9 +13,12 @@ import io.debezium.server.iceberg.testresources.SourcePostgresqlDB; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -32,7 +35,7 @@ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergEventsChangeConsumerTestProfile.class) +@TestProfile(IcebergEventsChangeConsumerTest.IcebergEventsChangeConsumerTestProfile.class) public class IcebergEventsChangeConsumerTest extends BaseSparkTest { @ConfigProperty(name = "debezium.sink.type") String sinkType; @@ -54,4 +57,17 @@ public void testSimpleUpload() { S3Minio.listFiles(); } + public static class IcebergEventsChangeConsumerTestProfile implements QuarkusTestProfile { + + //This method allows us to override configuration properties. + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + + config.put("debezium.sink.type", "icebergevents"); + + return config; + } + } + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTestProfile.java deleted file mode 100644 index 9c18fb6b..00000000 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTestProfile.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg; - -import io.quarkus.test.junit.QuarkusTestProfile; - -import java.util.HashMap; -import java.util.Map; - -public class IcebergEventsChangeConsumerTestProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - - config.put("debezium.sink.type", "icebergevents"); - - return config; - } -} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java index a2a77e08..91777558 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java @@ -9,8 +9,11 @@ package io.debezium.server.iceberg.batchsizewait; import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; +import java.util.HashMap; +import java.util.Map; import javax.inject.Inject; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -18,7 +21,7 @@ import org.junit.jupiter.api.Test; @QuarkusTest -@TestProfile(DynamicBatchSizeWaitTestProfile.class) +@TestProfile(DynamicBatchSizeWaitTest.DynamicBatchSizeWaitTestProfile.class) class DynamicBatchSizeWaitTest { @Inject @@ -68,4 +71,17 @@ void shouldDecreaseSleepMs() { Assertions.assertTrue(dynamicSleep.getWaitMs(120) <= 100); } + public static class DynamicBatchSizeWaitTestProfile implements QuarkusTestProfile { + + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("quarkus.arc.selected-alternatives", "DynamicBatchSizeWait"); + config.put("debezium.source.max.batch.size", "100"); + config.put("debezium.sink.batch.batch-size-wait.max-wait-ms", "5000"); + config.put("debezium.source.poll.interval.ms", "5000"); + return config; + } + } + } \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTestProfile.java deleted file mode 100644 index 3c0dbebf..00000000 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTestProfile.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg.batchsizewait; - -import io.quarkus.test.junit.QuarkusTestProfile; - -import java.util.HashMap; -import java.util.Map; - -public class DynamicBatchSizeWaitTestProfile implements QuarkusTestProfile { - - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - config.put("quarkus.arc.selected-alternatives", "DynamicBatchSizeWait"); - config.put("debezium.source.max.batch.size", "100"); - config.put("debezium.sink.batch.batch-size-wait.max-wait-ms", "5000"); - config.put("debezium.source.poll.interval.ms", "5000"); - return config; - } -} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java index 6ccc3a64..dfd2ac3e 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java @@ -13,9 +13,12 @@ import io.debezium.server.iceberg.testresources.SourcePostgresqlDB; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import javax.inject.Inject; import org.apache.spark.sql.Dataset; @@ -25,7 +28,7 @@ import org.junit.jupiter.api.Test; @QuarkusTest -@TestProfile(MaxBatchSizeWaitTestProfile.class) +@TestProfile(MaxBatchSizeWaitTest.MaxBatchSizeWaitTestProfile.class) @QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) class MaxBatchSizeWaitTest extends BaseSparkTest { @@ -60,4 +63,22 @@ public void testBatchsizeWait() throws Exception { }); } + public static class MaxBatchSizeWaitTestProfile implements QuarkusTestProfile { + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + // wait + config.put("debezium.sink.batch.batch-size-wait", "MaxBatchSizeWait"); + config.put("debezium.sink.batch.metrics.snapshot-mbean", "debezium.postgres:type=connector-metrics,context=snapshot,server=testc"); + config.put("debezium.sink.batch.metrics.streaming-mbean", "debezium.postgres:type=connector-metrics,context=streaming,server=testc"); + config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); + config.put("debezium.source.max.batch.size", "5000"); + config.put("debezium.source.max.queue.size", "70000"); + //config.put("debezium.source.poll.interval.ms", "1000"); + config.put("debezium.sink.batch.batch-size-wait.max-wait-ms", "5000"); + config.put("debezium.sink.batch.batch-size-wait.wait-interval-ms", "1000"); + config.put("quarkus.log.category.\"io.debezium.server.iceberg.batchsizewait\".level", "DEBUG"); + return config; + } + } } \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTestProfile.java deleted file mode 100644 index 2a99a2a2..00000000 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTestProfile.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg.batchsizewait; - -import io.quarkus.test.junit.QuarkusTestProfile; - -import java.util.HashMap; -import java.util.Map; - -public class MaxBatchSizeWaitTestProfile implements QuarkusTestProfile { - - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - // wait - config.put("debezium.sink.batch.batch-size-wait", "MaxBatchSizeWait"); - config.put("debezium.sink.batch.metrics.snapshot-mbean", "debezium.postgres:type=connector-metrics,context=snapshot,server=testc"); - config.put("debezium.sink.batch.metrics.streaming-mbean", "debezium.postgres:type=connector-metrics,context=streaming,server=testc"); - config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); - config.put("debezium.source.max.batch.size", "5000"); - config.put("debezium.source.max.queue.size", "70000"); - //config.put("debezium.source.poll.interval.ms", "1000"); - config.put("debezium.sink.batch.batch-size-wait.max-wait-ms", "5000"); - config.put("debezium.sink.batch.batch-size-wait.wait-interval-ms", "1000"); - config.put("quarkus.log.category.\"io.debezium.server.iceberg.batchsizewait\".level", "DEBUG"); - return config; - } -} \ No newline at end of file