From 2f3d56b18a80f8aa6805df7b7c72a260fdc9e2aa Mon Sep 17 00:00:00 2001 From: ismail simsek Date: Wed, 27 Oct 2021 00:05:28 +0200 Subject: [PATCH] test testDataTypeChanges (#47) --- .../debezium/server/iceberg/IcebergUtil.java | 6 +- .../iceberg/IcebergChangeConsumerTest.java | 91 ++++++++++++++----- 2 files changed, 71 insertions(+), 26 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 4f8e8ec8..bc852ba4 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -143,7 +143,8 @@ static Object jsonToGenericRecordVal(Types.NestedField field, val = node.isNull() ? null : node.asBoolean(); break; case STRING: - val = node.asText(null); + // if the node is not a value node (method isValueNode returns false), convert it to string. + val = node.isValueNode() ? node.asText(null) : node.toString(); break; case BINARY: try { @@ -166,7 +167,8 @@ static Object jsonToGenericRecordVal(Types.NestedField field, break; default: // default to String type - val = node.asText(null); + // if the node is not a value node (method isValueNode returns false), convert it to string. + val = node.isValueNode() ? node.asText(null) : node.toString(); break; } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 8732da08..4106bb23 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -26,6 +26,7 @@ import org.apache.spark.sql.Row; import org.awaitility.Awaitility; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,23 +42,23 @@ @QuarkusTestResource(SourcePostgresqlDB.class) @TestProfile(IcebergChangeConsumerTestProfile.class) public class IcebergChangeConsumerTest extends BaseSparkTest { - + protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class); @ConfigProperty(name = "debezium.sink.type") String sinkType; - + @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") String tablePrefix; @ConfigProperty(name = "debezium.sink.iceberg.warehouse") String warehouseLocation; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") String namespace; - + protected org.apache.iceberg.Table getTable(String table) { HadoopCatalog catalog = getIcebergCatalog(); return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_"))); } - + @Test public void testConsumingVariousDataTypes() throws Exception { assertEquals(sinkType, "iceberg"); @@ -132,21 +133,21 @@ public void testSchemaChanges() throws Exception { //ds.show(); return ds.where("__op == 'r'").count() == 4 // snapshot rows. initial table data - && ds.where("__op == 'u'").count() == 3 // 3 update - && ds.where("__op == 'c'").count() == 1 // 1 insert - && ds.where("__op == 'd'").count() == 1 // 1 insert - && ds.where("first_name == 'George__UPDATE1'").count() == 3 - && ds.where("first_name == 'SallyUSer2'").count() == 1 - && ds.where("last_name is null").count() == 1 - && ds.where("id == '1004'").where("__op == 'd'").count() == 1; + && ds.where("__op == 'u'").count() == 3 // 3 update + && ds.where("__op == 'c'").count() == 1 // 1 insert + && ds.where("__op == 'd'").count() == 1 // 1 insert + && ds.where("first_name == 'George__UPDATE1'").count() == 3 + && ds.where("first_name == 'SallyUSer2'").count() == 1 + && ds.where("last_name is null").count() == 1 + && ds.where("id == '1004'").where("__op == 'd'").count() == 1; } catch (Exception e) { return false; } }); - + // added columns are not recognized by iceberg getTableData("testc.inventory.customers").show(); - + // TEST add new columns to iceberg table then check if its data populated! Table table = getTable("testc.inventory.customers"); // NOTE column list below are in reverse order!! testing the behaviour purpose! @@ -159,7 +160,7 @@ public void testSchemaChanges() throws Exception { // insert row after defining new column in target iceberg table SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " + "(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');"); - + // remove column from source SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers DROP COLUMN email;"); SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " + @@ -185,21 +186,63 @@ public void testSchemaChanges() throws Exception { // CASE 2:(Removing column from source) (An extra column in iceberg table) // these columns are populated with null value - + // CASE 3:(Renaming column from source) // this is CASE 2 + CASE 1 : old column will be populated with null values and new column will not be recognized // and populated till it's added to iceberg table - + S3Minio.listFiles(); } - -// @Test -// @Disabled -// public void testDataTypeChanges() throws Exception { -// @TODO change boolean to string. string to bolean -// @TODO change int to string, string to int -// } + + @Test + @Disabled + public void testDataTypeChanges() throws Exception { + String sql = "\n" + + " DROP TABLE IF EXISTS inventory.data_type_changes;\n" + + " CREATE TABLE IF NOT EXISTS inventory.data_type_changes (\n" + + " c_id INTEGER ,\n" + + " c_varchar VARCHAR,\n" + + " c_int2string INTEGER,\n" + + " c_date2string DATE,\n" + + " c_timestamp2string TIMESTAMP,\n" + + " string2int VARCHAR,\n" + + " string2timestamp VARCHAR,\n" + + " string2boolean VARCHAR\n" + + " );"; + SourcePostgresqlDB.runSQL(sql); + sql = "INSERT INTO inventory.data_type_changes " + + " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + + " VALUES (1, 'STRING-DATA-1', 123, current_date , current_timestamp, 111, current_timestamp, false)"; + SourcePostgresqlDB.runSQL(sql); + sql = "INSERT INTO inventory.data_type_changes " + + " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + + " VALUES (2, 'STRING-DATA-2', 222, current_date , current_timestamp, 222, current_timestamp, true)"; + SourcePostgresqlDB.runSQL(sql); + + SourcePostgresqlDB.runSQL("ALTER TABLE inventory.data_type_changes " + + "ALTER COLUMN c_int2string TYPE VARCHAR(555), " + + "ALTER COLUMN c_date2string TYPE VARCHAR(555), " + + "ALTER COLUMN c_timestamp2string TYPE VARCHAR(555), " + + "ALTER COLUMN string2int TYPE INTEGER USING string2int::integer, " + + "ALTER COLUMN string2timestamp TYPE TIMESTAMP USING string2timestamp::TIMESTAMP, " + + "ALTER COLUMN string2boolean TYPE boolean USING string2boolean::boolean" + ); + sql = "INSERT INTO inventory.data_type_changes " + + " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + + " VALUES (3, 'STRING-DATA-3', '333', 'current_date-3' , 'current_timestamp-3', 333, current_timestamp, false)"; + SourcePostgresqlDB.runSQL(sql); + + Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.data_type_changes"); + ds.show(); + return ds.where("__op == 'r'").count() == 19; + } catch (Exception e) { + return false; + } + }); + } @Test public void testSimpleUpload() { @@ -212,7 +255,7 @@ public void testSimpleUpload() { return false; } }); - + // test nested data(struct) consumed Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try {