Skip to content

Commit

Permalink
test testDataTypeChanges (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Oct 26, 2021
1 parent a866277 commit 2f3d56b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ static Object jsonToGenericRecordVal(Types.NestedField field,
val = node.isNull() ? null : node.asBoolean();
break;
case STRING:
val = node.asText(null);
// if the node is not a value node (method isValueNode returns false), convert it to string.
val = node.isValueNode() ? node.asText(null) : node.toString();
break;
case BINARY:
try {
Expand All @@ -166,7 +167,8 @@ static Object jsonToGenericRecordVal(Types.NestedField field,
break;
default:
// default to String type
val = node.asText(null);
// if the node is not a value node (method isValueNode returns false), convert it to string.
val = node.isValueNode() ? node.asText(null) : node.toString();
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,23 +42,23 @@
@QuarkusTestResource(SourcePostgresqlDB.class)
@TestProfile(IcebergChangeConsumerTestProfile.class)
public class IcebergChangeConsumerTest extends BaseSparkTest {

protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class);
@ConfigProperty(name = "debezium.sink.type")
String sinkType;

@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.warehouse")
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

protected org.apache.iceberg.Table getTable(String table) {
HadoopCatalog catalog = getIcebergCatalog();
return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
}

@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
Expand Down Expand Up @@ -132,21 +133,21 @@ public void testSchemaChanges() throws Exception {
//ds.show();
return
ds.where("__op == 'r'").count() == 4 // snapshot rows. initial table data
&& ds.where("__op == 'u'").count() == 3 // 3 update
&& ds.where("__op == 'c'").count() == 1 // 1 insert
&& ds.where("__op == 'd'").count() == 1 // 1 insert
&& ds.where("first_name == 'George__UPDATE1'").count() == 3
&& ds.where("first_name == 'SallyUSer2'").count() == 1
&& ds.where("last_name is null").count() == 1
&& ds.where("id == '1004'").where("__op == 'd'").count() == 1;
&& ds.where("__op == 'u'").count() == 3 // 3 update
&& ds.where("__op == 'c'").count() == 1 // 1 insert
&& ds.where("__op == 'd'").count() == 1 // 1 insert
&& ds.where("first_name == 'George__UPDATE1'").count() == 3
&& ds.where("first_name == 'SallyUSer2'").count() == 1
&& ds.where("last_name is null").count() == 1
&& ds.where("id == '1004'").where("__op == 'd'").count() == 1;
} catch (Exception e) {
return false;
}
});

// added columns are not recognized by iceberg
getTableData("testc.inventory.customers").show();

// TEST add new columns to iceberg table then check if its data populated!
Table table = getTable("testc.inventory.customers");
// NOTE column list below are in reverse order!! testing the behaviour purpose!
Expand All @@ -159,7 +160,7 @@ public void testSchemaChanges() throws Exception {
// insert row after defining new column in target iceberg table
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');");

// remove column from source
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers DROP COLUMN email;");
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
Expand All @@ -185,21 +186,63 @@ public void testSchemaChanges() throws Exception {

// CASE 2:(Removing column from source) (An extra column in iceberg table)
// these columns are populated with null value

// CASE 3:(Renaming column from source)
// this is CASE 2 + CASE 1 : old column will be populated with null values and new column will not be recognized
// and populated till it's added to iceberg table

S3Minio.listFiles();

}

// @Test
// @Disabled
// public void testDataTypeChanges() throws Exception {
// @TODO change boolean to string. string to bolean
// @TODO change int to string, string to int
// }

@Test
@Disabled
public void testDataTypeChanges() throws Exception {
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.data_type_changes;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_type_changes (\n" +
" c_id INTEGER ,\n" +
" c_varchar VARCHAR,\n" +
" c_int2string INTEGER,\n" +
" c_date2string DATE,\n" +
" c_timestamp2string TIMESTAMP,\n" +
" string2int VARCHAR,\n" +
" string2timestamp VARCHAR,\n" +
" string2boolean VARCHAR\n" +
" );";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (1, 'STRING-DATA-1', 123, current_date , current_timestamp, 111, current_timestamp, false)";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (2, 'STRING-DATA-2', 222, current_date , current_timestamp, 222, current_timestamp, true)";
SourcePostgresqlDB.runSQL(sql);

SourcePostgresqlDB.runSQL("ALTER TABLE inventory.data_type_changes " +
"ALTER COLUMN c_int2string TYPE VARCHAR(555), " +
"ALTER COLUMN c_date2string TYPE VARCHAR(555), " +
"ALTER COLUMN c_timestamp2string TYPE VARCHAR(555), " +
"ALTER COLUMN string2int TYPE INTEGER USING string2int::integer, " +
"ALTER COLUMN string2timestamp TYPE TIMESTAMP USING string2timestamp::TIMESTAMP, " +
"ALTER COLUMN string2boolean TYPE boolean USING string2boolean::boolean"
);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (3, 'STRING-DATA-3', '333', 'current_date-3' , 'current_timestamp-3', 333, current_timestamp, false)";
SourcePostgresqlDB.runSQL(sql);

Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.data_type_changes");
ds.show();
return ds.where("__op == 'r'").count() == 19;
} catch (Exception e) {
return false;
}
});
}

@Test
public void testSimpleUpload() {
Expand All @@ -212,7 +255,7 @@ public void testSimpleUpload() {
return false;
}
});

// test nested data(struct) consumed
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Expand Down

0 comments on commit 2f3d56b

Please sign in to comment.