From 48d7355804fae0bbe01ec550c9d7be4e6e7a38b8 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 18 Mar 2024 19:44:45 +0100 Subject: [PATCH] Test upsert with tables without PK (#295) * Test upsert with tables without PK * Update TestConfigSource.java * Update IcebergChangeConsumerUpsertTest.java * Update IcebergChangeConsumerUpsertTest.java * Update IcebergChangeConsumerUpsertTest.java --- .../IcebergChangeConsumerUpsertTest.java | 45 +++++++++++++++++-- .../server/iceberg/TestConfigSource.java | 1 + .../testresources/SourcePostgresqlDB.java | 2 +- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index 1c07ce14..3e6ced79 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -8,15 +8,14 @@ package io.debezium.server.iceberg; -import io.debezium.server.iceberg.testresources.BaseSparkTest; -import io.debezium.server.iceberg.testresources.S3Minio; -import io.debezium.server.iceberg.testresources.TestChangeEvent; -import io.debezium.server.iceberg.testresources.TestUtil; +import io.debezium.server.iceberg.testresources.*; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; +import java.sql.SQLException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -25,6 +24,7 @@ import jakarta.inject.Inject; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -34,6 +34,7 @@ */ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) +@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) @TestProfile(IcebergChangeConsumerUpsertTest.TestProfile.class) public class IcebergChangeConsumerUpsertTest extends BaseSparkTest { @@ -170,6 +171,42 @@ public void testSimpleUpsertNoKey() throws Exception { Assertions.assertEquals(ds.where("id = 1 AND __op= 'c' AND first_name= 'user2'").count(), 2); } + + @Test + public void testTableUpsertNokey() throws SQLException, ClassNotFoundException { + String sql = "\n" + + " DROP TABLE IF EXISTS inventory.table_without_pk;\n" + + " CREATE TABLE IF NOT EXISTS inventory.table_without_pk (\n" + + " c_id INTEGER ,\n" + + " c_varchar VARCHAR\n" + + " );" + + "ALTER TABLE inventory.table_without_pk REPLICA IDENTITY FULL;"; + SourcePostgresqlDB.runSQL(sql); + SourcePostgresqlDB.runSQL( + "INSERT INTO inventory.table_without_pk (c_id, c_varchar) VALUES (1, 'STRING-DATA-1');" + + "INSERT INTO inventory.table_without_pk (c_id, c_varchar) VALUES (2, 'STRING-DATA-2');"); + Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.table_without_pk"); + ds.show(); + return ds.count() == 2; + } catch (Exception e) { + return false; + } + }); + SourcePostgresqlDB.runSQL("UPDATE inventory.table_without_pk SET c_varchar='STRING-UPDATE-1'; "); + Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.table_without_pk"); + ds.show(); + return ds.count() == 4 + && ds.where("__op == 'u'").count() == 2; + } catch (Exception e) { + return false; + } + }); + } + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 0287a72f..f6dbfd18 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -68,6 +68,7 @@ public TestConfigSource() { config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); config.put("%postgresql.debezium.source.database.whitelist", "inventory"); config.put("debezium.source.table.whitelist", "inventory.*"); + config.put("%postgresql.debezium.source.replica.identity.autoset.values", "inventory.*:FULL"); config.put("quarkus.log.level", "INFO"); config.put("quarkus.log.category.\"org.apache.spark\".level", "WARN"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java index 903ae7aa..9a1f7f29 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java @@ -28,7 +28,7 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager { public static final String POSTGRES_USER = "postgres"; public static final String POSTGRES_PASSWORD = "postgres"; public static final String POSTGRES_DBNAME = "postgres"; - public static final String POSTGRES_IMAGE = "debezium/example-postgres:2.3"; + public static final String POSTGRES_IMAGE = "debezium/example-postgres:2.5"; public static final String POSTGRES_HOST = "localhost"; public static final Integer POSTGRES_PORT_DEFAULT = 5432; private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class);