Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add new test to see effects of Data Type Changes #47

Merged
merged 1 commit into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ static Object jsonToGenericRecordVal(Types.NestedField field,
val = node.isNull() ? null : node.asBoolean();
break;
case STRING:
val = node.asText(null);
// if the node is not a value node (method isValueNode returns false), convert it to string.
val = node.isValueNode() ? node.asText(null) : node.toString();
break;
case BINARY:
try {
Expand All @@ -166,7 +167,8 @@ static Object jsonToGenericRecordVal(Types.NestedField field,
break;
default:
// default to String type
val = node.asText(null);
// if the node is not a value node (method isValueNode returns false), convert it to string.
val = node.isValueNode() ? node.asText(null) : node.toString();
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,23 +42,23 @@
@QuarkusTestResource(SourcePostgresqlDB.class)
@TestProfile(IcebergChangeConsumerTestProfile.class)
public class IcebergChangeConsumerTest extends BaseSparkTest {

protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class);
@ConfigProperty(name = "debezium.sink.type")
String sinkType;

@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.warehouse")
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

protected org.apache.iceberg.Table getTable(String table) {
HadoopCatalog catalog = getIcebergCatalog();
return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
}

@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
Expand Down Expand Up @@ -132,21 +133,21 @@ public void testSchemaChanges() throws Exception {
//ds.show();
return
ds.where("__op == 'r'").count() == 4 // snapshot rows. initial table data
&& ds.where("__op == 'u'").count() == 3 // 3 update
&& ds.where("__op == 'c'").count() == 1 // 1 insert
&& ds.where("__op == 'd'").count() == 1 // 1 insert
&& ds.where("first_name == 'George__UPDATE1'").count() == 3
&& ds.where("first_name == 'SallyUSer2'").count() == 1
&& ds.where("last_name is null").count() == 1
&& ds.where("id == '1004'").where("__op == 'd'").count() == 1;
&& ds.where("__op == 'u'").count() == 3 // 3 update
&& ds.where("__op == 'c'").count() == 1 // 1 insert
&& ds.where("__op == 'd'").count() == 1 // 1 insert
&& ds.where("first_name == 'George__UPDATE1'").count() == 3
&& ds.where("first_name == 'SallyUSer2'").count() == 1
&& ds.where("last_name is null").count() == 1
&& ds.where("id == '1004'").where("__op == 'd'").count() == 1;
} catch (Exception e) {
return false;
}
});

// added columns are not recognized by iceberg
getTableData("testc.inventory.customers").show();

// TEST add new columns to iceberg table then check if its data populated!
Table table = getTable("testc.inventory.customers");
// NOTE column list below are in reverse order!! testing the behaviour purpose!
Expand All @@ -159,7 +160,7 @@ public void testSchemaChanges() throws Exception {
// insert row after defining new column in target iceberg table
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');");

// remove column from source
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers DROP COLUMN email;");
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
Expand All @@ -185,21 +186,63 @@ public void testSchemaChanges() throws Exception {

// CASE 2:(Removing column from source) (An extra column in iceberg table)
// these columns are populated with null value

// CASE 3:(Renaming column from source)
// this is CASE 2 + CASE 1 : old column will be populated with null values and new column will not be recognized
// and populated till it's added to iceberg table

S3Minio.listFiles();

}

// @Test
// @Disabled
// public void testDataTypeChanges() throws Exception {
// @TODO change boolean to string. string to bolean
// @TODO change int to string, string to int
// }

@Test
@Disabled
public void testDataTypeChanges() throws Exception {
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.data_type_changes;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_type_changes (\n" +
" c_id INTEGER ,\n" +
" c_varchar VARCHAR,\n" +
" c_int2string INTEGER,\n" +
" c_date2string DATE,\n" +
" c_timestamp2string TIMESTAMP,\n" +
" string2int VARCHAR,\n" +
" string2timestamp VARCHAR,\n" +
" string2boolean VARCHAR\n" +
" );";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (1, 'STRING-DATA-1', 123, current_date , current_timestamp, 111, current_timestamp, false)";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (2, 'STRING-DATA-2', 222, current_date , current_timestamp, 222, current_timestamp, true)";
SourcePostgresqlDB.runSQL(sql);

SourcePostgresqlDB.runSQL("ALTER TABLE inventory.data_type_changes " +
"ALTER COLUMN c_int2string TYPE VARCHAR(555), " +
"ALTER COLUMN c_date2string TYPE VARCHAR(555), " +
"ALTER COLUMN c_timestamp2string TYPE VARCHAR(555), " +
"ALTER COLUMN string2int TYPE INTEGER USING string2int::integer, " +
"ALTER COLUMN string2timestamp TYPE TIMESTAMP USING string2timestamp::TIMESTAMP, " +
"ALTER COLUMN string2boolean TYPE boolean USING string2boolean::boolean"
);
sql = "INSERT INTO inventory.data_type_changes " +
" (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " +
" VALUES (3, 'STRING-DATA-3', '333', 'current_date-3' , 'current_timestamp-3', 333, current_timestamp, false)";
SourcePostgresqlDB.runSQL(sql);

Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.data_type_changes");
ds.show();
return ds.where("__op == 'r'").count() == 19;
} catch (Exception e) {
return false;
}
});
}

@Test
public void testSimpleUpload() {
Expand All @@ -212,7 +255,7 @@ public void testSimpleUpload() {
return false;
}
});

// test nested data(struct) consumed
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Expand Down