diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 444cfce6..a5e0570a 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -33,6 +33,7 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -220,49 +221,60 @@ public void testSchemaChanges() throws Exception { } @Test - @Disabled - public void testDataTypeChanges() throws Exception { - String sql = "\n" + - " DROP TABLE IF EXISTS inventory.data_type_changes;\n" + - " CREATE TABLE IF NOT EXISTS inventory.data_type_changes (\n" + - " c_id INTEGER ,\n" + - " c_varchar VARCHAR,\n" + - " c_int2string INTEGER,\n" + - " c_date2string DATE,\n" + - " c_timestamp2string TIMESTAMP,\n" + - " string2int VARCHAR,\n" + - " string2timestamp VARCHAR,\n" + - " string2boolean VARCHAR\n" + - " );"; - SourcePostgresqlDB.runSQL(sql); - sql = "INSERT INTO inventory.data_type_changes " + - " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + - " VALUES (1, 'STRING-DATA-1', 123, current_date , current_timestamp, 111, current_timestamp, false)"; + public void testFieldAddition() throws Exception { + String sql = """ + DROP TABLE IF EXISTS inventory.data_type_changes; + CREATE TABLE IF NOT EXISTS inventory.data_type_changes ( + c_id INTEGER , + c_varchar VARCHAR + ); + INSERT INTO inventory.data_type_changes + (c_id, c_varchar) + VALUES (1, 'STRING-DATA-1') + """; SourcePostgresqlDB.runSQL(sql); - sql = "INSERT INTO inventory.data_type_changes " + - " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + - " VALUES (2, 'STRING-DATA-2', 222, current_date , current_timestamp, 222, current_timestamp, true)"; + Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.data_type_changes"); + ds.printSchema(); + ds.show(); + return ds.count() >= 1 && + Objects.equals(dataTypeString(ds, "c_varchar"), "string"); + } catch (Exception e) { + return false; + } + }); + sql = "ALTER TABLE inventory.data_type_changes ADD COLUMN c_integer INTEGER;" + + " INSERT INTO inventory.data_type_changes " + + " (c_id, c_varchar, c_integer) " + + " VALUES (2, 'STRING-DATA-2', 222)"; SourcePostgresqlDB.runSQL(sql); - SourcePostgresqlDB.runSQL("ALTER TABLE inventory.data_type_changes " + - "ALTER COLUMN c_int2string TYPE VARCHAR(555), " + - "ALTER COLUMN c_date2string TYPE VARCHAR(555), " + - "ALTER COLUMN c_timestamp2string TYPE VARCHAR(555), " + - "ALTER COLUMN string2int TYPE INTEGER USING string2int::integer, " + - "ALTER COLUMN string2timestamp TYPE TIMESTAMP USING string2timestamp::TIMESTAMP, " + - "ALTER COLUMN string2boolean TYPE boolean USING string2boolean::boolean" - ); - sql = "INSERT INTO inventory.data_type_changes " + - " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + - " VALUES (3, 'STRING-DATA-3', '333', 'current_date-3' , 'current_timestamp-3', 333, current_timestamp, false)"; + Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.data_type_changes"); + ds.printSchema(); + ds.show(); + System.out.println(dataTypeString(ds, "c_integer")); + return ds.count() >= 2 && + Objects.equals(dataTypeString(ds, "c_integer"), "integer"); + } catch (Exception e) { + return false; + } + }); + sql = "ALTER TABLE inventory.data_type_changes ADD COLUMN c_timestamptz TIMESTAMPTZ;" + + " INSERT INTO inventory.data_type_changes " + + " (c_id, c_varchar, c_integer, c_timestamptz) " + + " VALUES (3, 'STRING-DATA-3', 333, current_timestamp)"; SourcePostgresqlDB.runSQL(sql); - Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { try { Dataset ds = getTableData("testc.inventory.data_type_changes"); ds.printSchema(); ds.show(); - return ds.where("__op == 'r'").count() == 19; + System.out.println(dataTypeString(ds, "c_timestamptz")); + return ds.count() >= 3 && + Objects.equals(dataTypeString(ds, "c_timestamptz"), "string"); } catch (Exception e) { return false; } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index f81fd743..767a0e7e 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -20,6 +20,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructField; import org.eclipse.microprofile.config.ConfigProvider; import org.junit.jupiter.api.BeforeAll; import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE; @@ -70,6 +71,18 @@ static void setup() { } + public static String dataTypeString(Dataset dataset, String colName) { + StructField[] fields = dataset.schema().fields(); + String dataType = null; + for(StructField field: fields) { + if(field.name().equals(colName)) { + dataType = field.dataType().typeName(); + break; + } + } + return dataType; + } + public static void PGCreateTestDataTable() throws Exception { // create test table String sql = "" +