From 2027eb45dfdceaab4739d96e523f79a00ce76586 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 18 Mar 2024 18:54:05 +0100 Subject: [PATCH 1/5] Test upsert with tables without PK --- .../server/iceberg/testresources/SourcePostgresqlDB.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java index 903ae7aa..9a1f7f29 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java @@ -28,7 +28,7 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager { public static final String POSTGRES_USER = "postgres"; public static final String POSTGRES_PASSWORD = "postgres"; public static final String POSTGRES_DBNAME = "postgres"; - public static final String POSTGRES_IMAGE = "debezium/example-postgres:2.3"; + public static final String POSTGRES_IMAGE = "debezium/example-postgres:2.5"; public static final String POSTGRES_HOST = "localhost"; public static final Integer POSTGRES_PORT_DEFAULT = 5432; private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class); From 86efa35a0ac0bf8d5f5b20c8a065c86c21aceca7 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 18 Mar 2024 18:54:55 +0100 Subject: [PATCH 2/5] Update TestConfigSource.java --- .../test/java/io/debezium/server/iceberg/TestConfigSource.java | 1 + 1 file changed, 1 insertion(+) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 0287a72f..f6dbfd18 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -68,6 +68,7 @@ public TestConfigSource() { config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); config.put("%postgresql.debezium.source.database.whitelist", "inventory"); config.put("debezium.source.table.whitelist", "inventory.*"); + config.put("%postgresql.debezium.source.replica.identity.autoset.values", "inventory.*:FULL"); config.put("quarkus.log.level", "INFO"); config.put("quarkus.log.category.\"org.apache.spark\".level", "WARN"); From 65f09d519e8907c0602fcd86e3c15e56cdf6cfc5 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 18 Mar 2024 18:55:44 +0100 Subject: [PATCH 3/5] Update IcebergChangeConsumerUpsertTest.java --- .../IcebergChangeConsumerUpsertTest.java | 46 +++++++++++++++++-- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index 1c07ce14..93f315bb 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -8,15 +8,14 @@ package io.debezium.server.iceberg; -import io.debezium.server.iceberg.testresources.BaseSparkTest; -import io.debezium.server.iceberg.testresources.S3Minio; -import io.debezium.server.iceberg.testresources.TestChangeEvent; -import io.debezium.server.iceberg.testresources.TestUtil; +import io.debezium.server.iceberg.testresources.*; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; +import java.sql.SQLException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -25,6 +24,7 @@ import jakarta.inject.Inject; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -34,6 +34,7 @@ */ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) +@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) @TestProfile(IcebergChangeConsumerUpsertTest.TestProfile.class) public class IcebergChangeConsumerUpsertTest extends BaseSparkTest { @@ -170,6 +171,43 @@ public void testSimpleUpsertNoKey() throws Exception { Assertions.assertEquals(ds.where("id = 1 AND __op= 'c' AND first_name= 'user2'").count(), 2); } + + @Test + public void testTableUpsertNokey() throws SQLException, ClassNotFoundException { + String sql = "\n" + + " DROP TABLE IF EXISTS inventory.table_without_pk;\n" + + " CREATE TABLE IF NOT EXISTS inventory.table_without_pk (\n" + + " c_id INTEGER ,\n" + + " c_varchar VARCHAR\n" + + " );" + + "ALTER TABLE inventory.table_without_pk REPLICA IDENTITY FULL;"; + SourcePostgresqlDB.runSQL(sql); + SourcePostgresqlDB.runSQL( + "INSERT INTO inventory.table_without_pk (c_id, c_varchar) VALUES (1, 'STRING-DATA-1');" + + "INSERT INTO inventory.table_without_pk (c_id, c_varchar) VALUES (2, 'STRING-DATA-2');"); + Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.table_without_pk"); + ds.show(); + return ds.count() == 2 + && ds.where("__op == 'r'").count() == 2; + } catch (Exception e) { + return false; + } + }); + SourcePostgresqlDB.runSQL("UPDATE inventory.table_without_pk SET c_varchar='STRING-UPDATE-1'; "); + Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.table_without_pk"); + ds.show(); + return ds.count() == 4 + && ds.where("__op == 'u'").count() == 2; + } catch (Exception e) { + return false; + } + }); + } + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { From c6c1936871c8ca3c85d86a54a56679c532ed7b35 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 18 Mar 2024 19:12:01 +0100 Subject: [PATCH 4/5] Update IcebergChangeConsumerUpsertTest.java --- .../server/iceberg/IcebergChangeConsumerUpsertTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index 93f315bb..6ed8da6f 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -185,7 +185,7 @@ public void testTableUpsertNokey() throws SQLException, ClassNotFoundException { SourcePostgresqlDB.runSQL( "INSERT INTO inventory.table_without_pk (c_id, c_varchar) VALUES (1, 'STRING-DATA-1');" + "INSERT INTO inventory.table_without_pk (c_id, c_varchar) VALUES (2, 'STRING-DATA-2');"); - Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> { + Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { try { Dataset ds = getTableData("testc.inventory.table_without_pk"); ds.show(); @@ -196,7 +196,7 @@ public void testTableUpsertNokey() throws SQLException, ClassNotFoundException { } }); SourcePostgresqlDB.runSQL("UPDATE inventory.table_without_pk SET c_varchar='STRING-UPDATE-1'; "); - Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> { + Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { try { Dataset ds = getTableData("testc.inventory.table_without_pk"); ds.show(); From bbacd998917be3cf217a856aed458eb7ab66b773 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 18 Mar 2024 19:34:11 +0100 Subject: [PATCH 5/5] Update IcebergChangeConsumerUpsertTest.java --- .../server/iceberg/IcebergChangeConsumerUpsertTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index 6ed8da6f..3e6ced79 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -189,8 +189,7 @@ public void testTableUpsertNokey() throws SQLException, ClassNotFoundException { try { Dataset ds = getTableData("testc.inventory.table_without_pk"); ds.show(); - return ds.count() == 2 - && ds.where("__op == 'r'").count() == 2; + return ds.count() == 2; } catch (Exception e) { return false; }