Skip to content

Commit

Permalink
Test Field Addition is working (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 15, 2024
1 parent 1eff67a commit 219d0c1
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -220,49 +221,60 @@ public void testSchemaChanges() throws Exception {
}

@Test
@Disabled
public void testDataTypeChanges() throws Exception {
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.data_type_changes;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_type_changes (\n" +
" c_id INTEGER ,\n" +
" c_varchar VARCHAR,\n" +
" c_int2string INTEGER,\n" +
" c_date2string DATE,\n" +
" c_timestamp2string TIMESTAMP,\n" +
" string2int VARCHAR,\n" +
" string2timestamp VARCHAR,\n" +
" string2boolean VARCHAR\n" +
" );";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (1, 'STRING-DATA-1', 123, current_date , current_timestamp, 111, current_timestamp, false)";
public void testFieldAddition() throws Exception {
String sql = """
DROP TABLE IF EXISTS inventory.data_type_changes;
CREATE TABLE IF NOT EXISTS inventory.data_type_changes (
c_id INTEGER ,
c_varchar VARCHAR
);
INSERT INTO inventory.data_type_changes
(c_id, c_varchar)
VALUES (1, 'STRING-DATA-1')
""";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (2, 'STRING-DATA-2', 222, current_date , current_timestamp, 222, current_timestamp, true)";
Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.data_type_changes");
ds.printSchema();
ds.show();
return ds.count() >= 1 &&
Objects.equals(dataTypeString(ds, "c_varchar"), "string");
} catch (Exception e) {
return false;
}
});
sql = "ALTER TABLE inventory.data_type_changes ADD COLUMN c_integer INTEGER;" +
" INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_integer) " +
" VALUES (2, 'STRING-DATA-2', 222)";
SourcePostgresqlDB.runSQL(sql);

SourcePostgresqlDB.runSQL("ALTER TABLE inventory.data_type_changes " +
"ALTER COLUMN c_int2string TYPE VARCHAR(555), " +
"ALTER COLUMN c_date2string TYPE VARCHAR(555), " +
"ALTER COLUMN c_timestamp2string TYPE VARCHAR(555), " +
"ALTER COLUMN string2int TYPE INTEGER USING string2int::integer, " +
"ALTER COLUMN string2timestamp TYPE TIMESTAMP USING string2timestamp::TIMESTAMP, " +
"ALTER COLUMN string2boolean TYPE boolean USING string2boolean::boolean"
);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (3, 'STRING-DATA-3', '333', 'current_date-3' , 'current_timestamp-3', 333, current_timestamp, false)";
Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.data_type_changes");
ds.printSchema();
ds.show();
System.out.println(dataTypeString(ds, "c_integer"));
return ds.count() >= 2 &&
Objects.equals(dataTypeString(ds, "c_integer"), "integer");
} catch (Exception e) {
return false;
}
});
sql = "ALTER TABLE inventory.data_type_changes ADD COLUMN c_timestamptz TIMESTAMPTZ;" +
" INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_integer, c_timestamptz) " +
" VALUES (3, 'STRING-DATA-3', 333, current_timestamp)";
SourcePostgresqlDB.runSQL(sql);

Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.data_type_changes");
ds.printSchema();
ds.show();
return ds.where("__op == 'r'").count() == 19;
System.out.println(dataTypeString(ds, "c_timestamptz"));
return ds.count() >= 3 &&
Objects.equals(dataTypeString(ds, "c_timestamptz"), "string");
} catch (Exception e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructField;
import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.BeforeAll;
import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE;
Expand Down Expand Up @@ -70,6 +71,18 @@ static void setup() {

}

public static String dataTypeString(Dataset<Row> dataset, String colName) {
StructField[] fields = dataset.schema().fields();
String dataType = null;
for(StructField field: fields) {
if(field.name().equals(colName)) {
dataType = field.dataType().typeName();
break;
}
}
return dataType;
}

public static void PGCreateTestDataTable() throws Exception {
// create test table
String sql = "" +
Expand Down

0 comments on commit 219d0c1

Please sign in to comment.