diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index d846f594..82ef3ae4 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -44,7 +44,7 @@ public ConfigSource() { // debezium unwrap message config.put("debezium.transforms", "unwrap"); config.put("debezium.transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState"); - config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db"); + config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db,ts_ms"); config.put("debezium.transforms.unwrap.delete.handling.mode", "rewrite"); config.put("debezium.transforms.unwrap.drop.tombstones", "true"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java index ea19464e..807f707c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java @@ -51,8 +51,6 @@ public void testSimpleUpload() { } public static class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. @Override public Map getConfigOverrides() { Map config = new HashMap<>(); @@ -64,7 +62,6 @@ public Map getConfigOverrides() { config.put("%mongodb.debezium.source.mongodb.name", "testc"); config.put("%mongodb.debezium.source.database.include.list", "inventory"); // ok config.put("%mongodb.debezium.source.collection.include.list", "inventory.products"); - // IMPORTANT !!! FIX MongoDbConnector KEY FIELD NAME "id"=>"_id" !!! config.put("%mongodb.debezium.transforms", "unwrap,renamekeyfield"); config.put("%mongodb.debezium.transforms.renamekeyfield.type", @@ -78,7 +75,6 @@ public Map getConfigOverrides() { public String getConfigProfile() { return "mongodb"; } - } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java index 02ec8bb1..2c53d7a6 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java @@ -81,8 +81,6 @@ public void testSimpleUpload() throws Exception { } public static class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. @Override public Map getConfigOverrides() { Map config = new HashMap<>(); @@ -96,7 +94,6 @@ public Map getConfigOverrides() { public String getConfigProfile() { return "mysql"; } - } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 5b79718e..aa29b944 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -254,7 +254,7 @@ public void testSimpleUpload() { Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { Dataset ds = getTableData("testc.inventory.customers"); - ds.show(); + ds.show(false); return ds.count() >= 3; } catch (Exception e) { return false; @@ -295,15 +295,12 @@ public void testMapDestination() { } public static class IcebergChangeConsumerTestProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. @Override public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("debezium.sink.iceberg.write.format.default", "orc"); config.put("debezium.sink.iceberg.destination-regexp", "\\d"); //config.put("debezium.sink.iceberg.destination-regexp-replace", ""); - return config; } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java index aa426d0a..4c9c8629 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java @@ -8,7 +8,10 @@ package io.debezium.server.iceberg; -import io.debezium.server.iceberg.testresources.*; +import io.debezium.server.iceberg.testresources.BaseSparkTest; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.debezium.server.iceberg.testresources.TestChangeEvent; +import io.debezium.server.iceberg.testresources.TestUtil; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; @@ -32,17 +35,17 @@ */ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) -@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) @TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class) public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest { @Inject IcebergChangeConsumer consumer; + final static Long TEST_EPOCH_MS = 1577840461000L; @Test public void testSimpleUpsert() throws Exception { - String dest = "inventory.customers_upsert"; + String dest = "testc.inventory.customers_upsert"; List> records = new ArrayList<>(); records.add(TestChangeEvent.of(dest, 1, "c")); records.add(TestChangeEvent.of(dest, 2, "c")); @@ -72,17 +75,17 @@ public void testSimpleUpsert() throws Exception { records.clear(); // incase of duplicate records it should only keep the latest by epoch ts - records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", 1L)); - records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", 2L)); - records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", 3L)); - records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", 4L)); - records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", 5L)); - records.add(TestChangeEvent.of(dest, 4, "d", "Updatedname-4-V3", 6L)); - records.add(TestChangeEvent.of(dest, 5, "d", 7L)); - records.add(TestChangeEvent.of(dest, 6, "r", 8L)); - records.add(TestChangeEvent.of(dest, 6, "r", 9L)); - records.add(TestChangeEvent.of(dest, 6, "u", 10L)); - records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", 11L)); + records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", TEST_EPOCH_MS + 2L)); + records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", TEST_EPOCH_MS + 3L)); + records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", TEST_EPOCH_MS + 4L)); + records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", TEST_EPOCH_MS + 5L)); + records.add(TestChangeEvent.of(dest, 4, "d", "Updatedname-4-V3", TEST_EPOCH_MS + 6L)); + records.add(TestChangeEvent.of(dest, 5, "d", TEST_EPOCH_MS + 7L)); + records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 8L)); + records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 9L)); + records.add(TestChangeEvent.of(dest, 6, "u", TEST_EPOCH_MS + 10L)); + records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", TEST_EPOCH_MS + 11L)); consumer.handleBatch(records, TestUtil.getCommitter()); ds = getTableData("testc.inventory.customers_upsert"); ds.show(); @@ -95,10 +98,10 @@ public void testSimpleUpsert() throws Exception { // in case of duplicate records including epoch ts, its should keep latest one based on operation priority // ("c", 1, "r", 2, "u", 3, "d", 4); records.clear(); - records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", 1L)); - records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", 1L)); - records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", 1L)); - records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", 1L)); + records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", TEST_EPOCH_MS + 1L)); consumer.handleBatch(records, TestUtil.getCommitter()); ds = getTableData("testc.inventory.customers_upsert"); ds.show(); @@ -107,10 +110,10 @@ public void testSimpleUpsert() throws Exception { // if its not standard insert followed by update! should keep latest one records.clear(); - records.add(TestChangeEvent.of(dest, 7, "u", 1L)); - records.add(TestChangeEvent.of(dest, 7, "d", 2L)); - records.add(TestChangeEvent.of(dest, 7, "r", 3L)); - records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", 4L)); + records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.of(dest, 7, "d", TEST_EPOCH_MS + 2L)); + records.add(TestChangeEvent.of(dest, 7, "r", TEST_EPOCH_MS + 3L)); + records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", TEST_EPOCH_MS + 4L)); consumer.handleBatch(records, TestUtil.getCommitter()); ds = getTableData("testc.inventory.customers_upsert"); ds.show(); @@ -120,13 +123,13 @@ public void testSimpleUpsert() throws Exception { @Test public void testSimpleUpsertCompositeKey() throws Exception { - String dest = "inventory.customers_upsert_compositekey"; + String dest = "testc.inventory.customers_upsert_compositekey"; // test simple inserts List> records = new ArrayList<>(); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", 1L)); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", 1L)); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L)); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L)); consumer.handleBatch(records, TestUtil.getCommitter()); Dataset ds = getTableData("testc.inventory.customers_upsert_compositekey"); @@ -135,10 +138,10 @@ public void testSimpleUpsertCompositeKey() throws Exception { Assertions.assertEquals(ds.where("id = 1").count(), 2); records.clear(); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L)); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L)); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user1", 3L)); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user2", 1L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user1", TEST_EPOCH_MS + 3L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user2", TEST_EPOCH_MS + 1L)); consumer.handleBatch(records, TestUtil.getCommitter()); ds = getTableData("testc.inventory.customers_upsert_compositekey"); ds.show(); @@ -147,12 +150,9 @@ public void testSimpleUpsertCompositeKey() throws Exception { } public static class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. @Override public Map getConfigOverrides() { Map config = new HashMap<>(); - config.put("debezium.sink.iceberg.upsert", "true"); config.put("debezium.sink.iceberg.upsert-keep-deletes", "false"); return config; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index 3cd0f0e7..6d42b77a 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -8,7 +8,10 @@ package io.debezium.server.iceberg; -import io.debezium.server.iceberg.testresources.*; +import io.debezium.server.iceberg.testresources.BaseSparkTest; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.debezium.server.iceberg.testresources.TestChangeEvent; +import io.debezium.server.iceberg.testresources.TestUtil; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; @@ -32,16 +35,16 @@ */ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) -@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) @TestProfile(IcebergChangeConsumerUpsertTest.IcebergChangeConsumerUpsertTestProfile.class) public class IcebergChangeConsumerUpsertTest extends BaseSparkTest { @Inject IcebergChangeConsumer consumer; + final static Long TEST_EPOCH_MS = 1577840461000L; @Test public void testSimpleUpsert() throws Exception { - String dest = "inventory.customers_upsert"; + String dest = "testc.inventory.customers_upsert"; // test simple inserts List> records = new ArrayList<>(); records.add(TestChangeEvent.of(dest, 1, "c")); @@ -72,17 +75,17 @@ public void testSimpleUpsert() throws Exception { records.clear(); // incase of duplicate records it should only keep the latest by epoch ts - records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", 1L)); - records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", 2L)); - records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", 3L)); - records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", 4L)); - records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", 5L)); - records.add(TestChangeEvent.of(dest, 4, "r", "Updatedname-4-V3", 6L)); - records.add(TestChangeEvent.of(dest, 5, "r", 7L)); - records.add(TestChangeEvent.of(dest, 6, "r", 8L)); - records.add(TestChangeEvent.of(dest, 6, "r", 9L)); - records.add(TestChangeEvent.of(dest, 6, "u", 10L)); - records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", 11L)); + records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", TEST_EPOCH_MS + 2L)); + records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", TEST_EPOCH_MS + 3L)); + records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", TEST_EPOCH_MS + 4L)); + records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", TEST_EPOCH_MS + 5L)); + records.add(TestChangeEvent.of(dest, 4, "r", "Updatedname-4-V3", TEST_EPOCH_MS + 6L)); + records.add(TestChangeEvent.of(dest, 5, "r", TEST_EPOCH_MS + 7L)); + records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 8L)); + records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 9L)); + records.add(TestChangeEvent.of(dest, 6, "u", TEST_EPOCH_MS + 10L)); + records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", TEST_EPOCH_MS + 11L)); consumer.handleBatch(records, TestUtil.getCommitter()); ds = getTableData("testc.inventory.customers_upsert"); ds.sort("id").show(false); @@ -95,10 +98,10 @@ public void testSimpleUpsert() throws Exception { // in case of duplicate records including epoch ts, its should keep latest one based on operation priority // ("c", 1, "r", 2, "u", 3, "d", 4); records.clear(); - records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", 1L)); - records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", 1L)); - records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", 1L)); - records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", 1L)); + records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", TEST_EPOCH_MS + 1L)); consumer.handleBatch(records, TestUtil.getCommitter()); ds = getTableData("testc.inventory.customers_upsert"); ds.show(); @@ -107,10 +110,10 @@ public void testSimpleUpsert() throws Exception { // if its not standard insert followed by update! should keep latest one records.clear(); - records.add(TestChangeEvent.of(dest, 7, "u", 1L)); - records.add(TestChangeEvent.of(dest, 7, "u", 2L)); - records.add(TestChangeEvent.of(dest, 7, "r", 3L)); - records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", 4L)); + records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 2L)); + records.add(TestChangeEvent.of(dest, 7, "r", TEST_EPOCH_MS + 3L)); + records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", TEST_EPOCH_MS + 4L)); consumer.handleBatch(records, TestUtil.getCommitter()); ds = getTableData("testc.inventory.customers_upsert"); ds.show(); @@ -120,13 +123,13 @@ public void testSimpleUpsert() throws Exception { @Test public void testSimpleUpsertCompositeKey() throws Exception { - String dest = "inventory.customers_upsert_compositekey"; + String dest = "testc.inventory.customers_upsert_compositekey"; // test simple inserts List> records = new ArrayList<>(); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", 1L)); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", 1L)); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L)); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L)); consumer.handleBatch(records, TestUtil.getCommitter()); Dataset ds = getTableData("testc.inventory.customers_upsert_compositekey"); @@ -135,7 +138,7 @@ public void testSimpleUpsertCompositeKey() throws Exception { Assertions.assertEquals(ds.where("id = 1").count(), 2); records.clear(); - records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user2", 1L)); + records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user2", TEST_EPOCH_MS + 1L)); consumer.handleBatch(records, TestUtil.getCommitter()); ds = getTableData("testc.inventory.customers_upsert_compositekey"); ds.show(); @@ -145,12 +148,12 @@ public void testSimpleUpsertCompositeKey() throws Exception { @Test public void testSimpleUpsertNoKey() throws Exception { - String dest = "inventory.customers_upsert_nokey"; + String dest = "testc.inventory.customers_upsert_nokey"; // when there is no PK it should fall back to append mode List> records = new ArrayList<>(); - records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user1", 1L)); - records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", 1L)); - records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user1", 2L)); + records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L)); consumer.handleBatch(records, TestUtil.getCommitter()); Dataset ds = getTableData("testc.inventory.customers_upsert_nokey"); ds.show(); @@ -158,9 +161,9 @@ public void testSimpleUpsertNoKey() throws Exception { Assertions.assertEquals(ds.where("id = 1").count(), 3); records.clear(); - records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", 1L)); - records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user2", 1L)); - records.add(TestChangeEvent.ofNoKey(dest, 1, "r", "user1", 3L)); + records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user2", TEST_EPOCH_MS + 1L)); + records.add(TestChangeEvent.ofNoKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L)); consumer.handleBatch(records, TestUtil.getCommitter()); ds = getTableData("testc.inventory.customers_upsert_nokey"); ds.show(); @@ -169,12 +172,9 @@ public void testSimpleUpsertNoKey() throws Exception { } public static class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. @Override public Map getConfigOverrides() { Map config = new HashMap<>(); - config.put("debezium.sink.iceberg.upsert", "true"); config.put("debezium.sink.iceberg.upsert-keep-deletes", "true"); return config; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index cca7060c..f72088ae 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -58,14 +58,10 @@ public void testSimpleUpload() { } public static class IcebergEventsChangeConsumerTestProfile implements QuarkusTestProfile { - - //This method allows us to override configuration properties. @Override public Map getConfigOverrides() { Map config = new HashMap<>(); - config.put("debezium.sink.type", "icebergevents"); - return config; } }