From a866277b808e68dc037a20ff0419f5fca5b0c154 Mon Sep 17 00:00:00 2001 From: ismail simsek Date: Mon, 25 Oct 2021 17:04:27 +0200 Subject: [PATCH] Unit test schema changes (#43) * cleanup tests --- .../debezium/server/iceberg/IcebergUtil.java | 2 + .../debezium/server/iceberg/ConfigSource.java | 11 +- .../IcebergChangeConsumerMysqlTest.java | 36 ++--- .../iceberg/IcebergChangeConsumerTest.java | 131 ++++++++---------- .../IcebergEventsChangeConsumerTest.java | 2 +- .../batchsizewait/MaxBatchSizeWaitTest.java | 6 +- .../iceberg/testresources/BaseSparkTest.java | 34 +++-- 7 files changed, 107 insertions(+), 115 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index c17ec16a..4f8e8ec8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -110,10 +110,12 @@ public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonN LOGGER.debug("Processing nested field:{}", tableFields); for (Types.NestedField field : tableFields.fields()) { + // Set value to null if json event don't have the field if (data == null || !data.has(field.name()) || data.get(field.name()) == null) { mappedResult.put(field.name(), null); continue; } + // get the value of the field from json event, map it to iceberg value mappedResult.put(field.name(), jsonToGenericRecordVal(field, data.get(field.name()))); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index f1faa470..40e72109 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -9,17 +9,11 @@ package io.debezium.server.iceberg; import io.debezium.server.TestConfigSource; -import io.debezium.util.Testing; - -import java.nio.file.Path; - -import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; public class ConfigSource extends TestConfigSource { public static final String S3_REGION = "us-east-1"; public static final String S3_BUCKET = "test-bucket"; - public static final Path HISTORY_FILE = Testing.Files.createTestingPath("dbhistory.txt").toAbsolutePath(); @Override public int getOrdinal() { @@ -62,9 +56,8 @@ public ConfigSource() { config.put("debezium.transforms.unwrap.drop.tombstones", "true"); // DEBEZIUM SOURCE conf - config.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); - config.put("debezium.source.database.history", "io.debezium.relational.history.FileDatabaseHistory"); - config.put("debezium.source.database.history.file.filename", HISTORY_FILE.toAbsolutePath().toString()); + config.put("debezium.source.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore"); + config.put("debezium.source.database.history", "io.debezium.relational.history.MemoryDatabaseHistory"); config.put("debezium.source.offset.flush.interval.ms", "60000"); config.put("debezium.source.database.server.name", "testc"); config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java index 9c048a2f..2ffa4617 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java @@ -11,7 +11,6 @@ import io.debezium.server.iceberg.testresources.BaseSparkTest; import io.debezium.server.iceberg.testresources.S3Minio; import io.debezium.server.iceberg.testresources.SourceMysqlDB; -import io.debezium.util.Testing; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; @@ -21,7 +20,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.awaitility.Awaitility; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.junit.jupiter.api.Test; /** @@ -35,12 +33,19 @@ @TestProfile(IcebergChangeConsumerMysqlTestProfile.class) public class IcebergChangeConsumerMysqlTest extends BaseSparkTest { + @Test + public void testSimpleUpload() throws Exception { - @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000") - Integer maxBatchSize; + Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> { + try { + Dataset df = getTableData("testc.inventory.customers"); + return df.filter("id is not null").count() >= 4; + } catch (Exception e) { + return false; + } + }); + // S3Minio.listFiles(); - @Test - public void testTombstoneEvents() throws Exception { // create test table String sqlCreate = "CREATE TABLE IF NOT EXISTS inventory.test_delete_table (" + " c_id INTEGER ," + @@ -57,32 +62,19 @@ public void testTombstoneEvents() throws Exception { SourceMysqlDB.runSQL(sqlInsert); SourceMysqlDB.runSQL(sqlDelete); SourceMysqlDB.runSQL(sqlInsert); + SourceMysqlDB.runSQL(sqlDelete); + SourceMysqlDB.runSQL(sqlInsert); Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { Dataset df = getTableData("testc.inventory.test_delete_table"); df.show(); - return df.count() >= 12; // 4 insert 4 delete 4 insert! - } catch (Exception e) { - return false; - } - }); - } - - @Test - public void testSimpleUpload() { - Testing.Print.enable(); - - Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> { - try { - Dataset df = getTableData("testc.inventory.customers"); - return df.filter("id is not null").count() >= 4; + return df.count() == 20; // 4X3 insert 4X2 delete! } catch (Exception e) { return false; } }); - S3Minio.listFiles(); } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 9fb70800..8732da08 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -8,21 +8,15 @@ package io.debezium.server.iceberg; -import io.debezium.server.DebeziumServer; import io.debezium.server.iceberg.testresources.BaseSparkTest; import io.debezium.server.iceberg.testresources.S3Minio; import io.debezium.server.iceberg.testresources.SourcePostgresqlDB; -import io.debezium.util.Testing; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import javax.inject.Inject; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -31,9 +25,10 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.awaitility.Awaitility; -import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -46,53 +41,29 @@ @QuarkusTestResource(SourcePostgresqlDB.class) @TestProfile(IcebergChangeConsumerTestProfile.class) public class IcebergChangeConsumerTest extends BaseSparkTest { - - @Inject - DebeziumServer server; + + protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class); @ConfigProperty(name = "debezium.sink.type") String sinkType; + @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") String tablePrefix; @ConfigProperty(name = "debezium.sink.iceberg.warehouse") String warehouseLocation; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") String namespace; - - { - // Testing.Debug.enable(); - Testing.Files.delete(ConfigSource.OFFSET_STORE_PATH); - Testing.Files.createTestingFile(ConfigSource.OFFSET_STORE_PATH); - } - - private HadoopCatalog getIcebergCatalog() { - // loop and set hadoopConf - Configuration hadoopConf = new Configuration(); - for (String name : ConfigProvider.getConfig().getPropertyNames()) { - if (name.startsWith("debezium.sink.iceberg.")) { - hadoopConf.set(name.substring("debezium.sink.iceberg.".length()), - ConfigProvider.getConfig().getValue(name, String.class)); - } - } - HadoopCatalog icebergCatalog = new HadoopCatalog(); - icebergCatalog.setConf(hadoopConf); - - Map configMap = new HashMap<>(); - hadoopConf.forEach(e-> configMap.put(e.getKey(), e.getValue())); - icebergCatalog.initialize("iceberg", configMap); - return icebergCatalog; - } - - private org.apache.iceberg.Table getTable(String table) { + + protected org.apache.iceberg.Table getTable(String table) { HadoopCatalog catalog = getIcebergCatalog(); return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_"))); } - + @Test - public void testDatatypes() throws Exception { + public void testConsumingVariousDataTypes() throws Exception { assertEquals(sinkType, "iceberg"); String sql = "\n" + - " DROP TABLE IF EXISTS inventory.table_datatypes;\n" + - " CREATE TABLE IF NOT EXISTS inventory.table_datatypes (\n" + + " DROP TABLE IF EXISTS inventory.data_types;\n" + + " CREATE TABLE IF NOT EXISTS inventory.data_types (\n" + " c_id INTEGER ,\n" + " c_text TEXT,\n" + " c_varchar VARCHAR,\n" + @@ -104,14 +75,14 @@ public void testDatatypes() throws Exception { " c_decimal DECIMAL(18,4),\n" + " c_numeric NUMERIC(18,4),\n" + " c_interval INTERVAL,\n" + - " c_boolean BOOLean,\n" + + " c_boolean BOOLEAN,\n" + " c_uuid UUID,\n" + " c_bytea BYTEA,\n" + - " c_json json,\n" + - " c_jsonb jsonb\n" + + " c_json JSON,\n" + + " c_jsonb JSONB\n" + " );"; SourcePostgresqlDB.runSQL(sql); - sql = "INSERT INTO inventory.table_datatypes (" + + sql = "INSERT INTO inventory.data_types (" + "c_id, " + "c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " + "c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid,c_bytea, " + @@ -127,7 +98,7 @@ public void testDatatypes() throws Exception { SourcePostgresqlDB.runSQL(sql); Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> { try { - Dataset df = getTableData("testc.inventory.table_datatypes"); + Dataset df = getTableData("testc.inventory.data_types"); df.show(true); return df.where("c_text is null AND c_varchar is null AND c_int is null " + "AND c_date is null AND c_timestamp is null AND c_timestamptz is null " + @@ -140,17 +111,8 @@ public void testDatatypes() throws Exception { } @Test - public void testIcebergConsumer() throws Exception { - Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { - try { - Dataset ds = getTableData("testc.inventory.customers"); - //ds.show(); - return ds.count() >= 4; - } catch (Exception e) { - return false; - } - }); - + public void testSchemaChanges() throws Exception { + // TEST add new columns, drop not null constraint SourcePostgresqlDB.runSQL("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE ID = 1002 ;"); SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers ADD test_varchar_column varchar(255);"); SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers ADD test_boolean_column boolean;"); @@ -167,8 +129,13 @@ public void testIcebergConsumer() throws Exception { Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { try { Dataset ds = getTableData("testc.inventory.customers"); - ds.show(); - return ds.where("first_name == 'George__UPDATE1'").count() == 3 + //ds.show(); + return + ds.where("__op == 'r'").count() == 4 // snapshot rows. initial table data + && ds.where("__op == 'u'").count() == 3 // 3 update + && ds.where("__op == 'c'").count() == 1 // 1 insert + && ds.where("__op == 'd'").count() == 1 // 1 insert + && ds.where("first_name == 'George__UPDATE1'").count() == 3 && ds.where("first_name == 'SallyUSer2'").count() == 1 && ds.where("last_name is null").count() == 1 && ds.where("id == '1004'").where("__op == 'd'").count() == 1; @@ -176,37 +143,63 @@ public void testIcebergConsumer() throws Exception { return false; } }); - + + // added columns are not recognized by iceberg getTableData("testc.inventory.customers").show(); - // add new columns to iceberg table! and check if new column values are populated! + + // TEST add new columns to iceberg table then check if its data populated! Table table = getTable("testc.inventory.customers"); - - // !!!!! IMPORTANT !!! column list here is in reverse order!! for testing purpose! + // NOTE column list below are in reverse order!! testing the behaviour purpose! table.updateSchema() - // test_date_column is Long type because debezium serializes date type as number + // NOTE test_date_column is Long type because debezium serializes date type as number .addColumn("test_date_column", Types.LongType.get()) .addColumn("test_boolean_column", Types.BooleanType.get()) .addColumn("test_varchar_column", Types.StringType.get()) .commit(); - + // insert row after defining new column in target iceberg table + SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " + + "(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');"); + + // remove column from source SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers DROP COLUMN email;"); SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " + - "(default,'User3','lastname_value3','test_varchar_value3',true, '2020-01-01'::DATE);"); + "(default,'User3','lastname_value3','after-dropping-email-column-from-source',true, '2020-01-01'::DATE);"); Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { try { Dataset ds = getTableData("testc.inventory.customers"); ds.show(); return ds.where("first_name == 'User3'").count() == 1 - && ds.where("test_varchar_column == 'test_varchar_value3'").count() == 1; + && ds.where("first_name == 'After-Defining-Iceberg-fields'").count() == 1 + && ds.where("test_varchar_column == 'after-dropping-email-column-from-source' AND email is null").count() == 1; } catch (Exception e) { return false; } }); + getTableData("testc.inventory.customers").show(); + // CASE 1:(Adding new column to source) (A column missing in iceberg table) + // data of the new column is ignored till same column defined in iceberg table + // for example: if a column not found in iceberg table its data is dropped ignored and not copied to target! + // once iceberg table adds same column then data for this column recognized and populated + + // CASE 2:(Removing column from source) (An extra column in iceberg table) + // these columns are populated with null value + + // CASE 3:(Renaming column from source) + // this is CASE 2 + CASE 1 : old column will be populated with null values and new column will not be recognized + // and populated till it's added to iceberg table + S3Minio.listFiles(); } + +// @Test +// @Disabled +// public void testDataTypeChanges() throws Exception { +// @TODO change boolean to string. string to bolean +// @TODO change int to string, string to int +// } @Test public void testSimpleUpload() { @@ -219,10 +212,8 @@ public void testSimpleUpload() { return false; } }); - } - - @Test - public void testGeomData() { + + // test nested data(struct) consumed Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { Dataset ds = getTableData("testc.inventory.geom"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index 6a00fef1..caa899f4 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -38,7 +38,7 @@ public class IcebergEventsChangeConsumerTest extends BaseSparkTest { String sinkType; @Test - public void testIcebergEvents() { + public void testSimpleUpload() { Assertions.assertEquals(sinkType, "icebergevents"); Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java index f58bf9a8..ea1463e0 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java @@ -41,11 +41,11 @@ public void testBatchsizeWait() throws Exception { } Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { - Dataset df = getTableData("testc.inventory.test_date_table"); - df.createOrReplaceGlobalTempView("test_date_table_batch_size"); + Dataset df = getTableData("testc.inventory.test_data"); + df.createOrReplaceGlobalTempView("test_data_batch_size"); df = spark .sql("SELECT substring(input_file,94,60) as input_file, " + - "count(*) as batch_size FROM global_temp.test_date_table_batch_size group " + + "count(*) as batch_size FROM global_temp.test_data_batch_size group " + "by 1"); //df.show(false); return df.filter("batch_size = " + maxBatchSize).count() >= 5; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index 6c2ac758..990a2d6c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -8,12 +8,13 @@ package io.debezium.server.iceberg.testresources; -import io.debezium.server.iceberg.ConfigSource; import io.debezium.server.iceberg.IcebergUtil; -import io.debezium.util.Testing; +import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -23,7 +24,7 @@ import static io.debezium.server.iceberg.ConfigSource.S3_BUCKET; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination. + * Integration test that uses spark to consumer data is consumed. * * @author Ismail Simsek */ @@ -34,9 +35,22 @@ public class BaseSparkTest { private static final String SPARK_PROP_PREFIX = "debezium.sink.sparkbatch."; protected static SparkSession spark; - static { - Testing.Files.delete(ConfigSource.OFFSET_STORE_PATH); - Testing.Files.createTestingFile(ConfigSource.OFFSET_STORE_PATH); + protected HadoopCatalog getIcebergCatalog() { + // loop and set hadoopConf + Configuration hadoopConf = new Configuration(); + for (String name : ConfigProvider.getConfig().getPropertyNames()) { + if (name.startsWith("debezium.sink.iceberg.")) { + hadoopConf.set(name.substring("debezium.sink.iceberg.".length()), + ConfigProvider.getConfig().getValue(name, String.class)); + } + } + HadoopCatalog icebergCatalog = new HadoopCatalog(); + icebergCatalog.setConf(hadoopConf); + + Map configMap = new HashMap<>(); + hadoopConf.forEach(e-> configMap.put(e.getKey(), e.getValue())); + icebergCatalog.initialize("iceberg", configMap); + return icebergCatalog; } @BeforeAll @@ -73,7 +87,7 @@ static void setup() { public static void PGCreateTestDataTable() throws Exception { // create test table String sql = "" + - " CREATE TABLE IF NOT EXISTS inventory.test_date_table (\n" + + " CREATE TABLE IF NOT EXISTS inventory.test_data (\n" + " c_id INTEGER ,\n" + " c_text TEXT,\n" + " c_varchar VARCHAR" + @@ -94,7 +108,7 @@ public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) { if (addRandomDelay) { Thread.sleep(TestUtil.randomInt(20000, 100000)); } - String sql = "INSERT INTO inventory.test_date_table (c_id, c_text, c_varchar ) " + + String sql = "INSERT INTO inventory.test_data (c_id, c_text, c_varchar ) " + "VALUES "; StringBuilder values = new StringBuilder("\n(" + TestUtil.randomInt(15, 32) + ", '" + TestUtil.randomString(524) + "', '" + TestUtil.randomString(524) + "')"); for (int i = 0; i < 100; i++) { @@ -115,7 +129,7 @@ public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) { public static void mysqlCreateTestDataTable() throws Exception { // create test table String sql = "\n" + - " CREATE TABLE IF NOT EXISTS inventory.test_date_table (\n" + + " CREATE TABLE IF NOT EXISTS inventory.test_data (\n" + " c_id INTEGER ,\n" + " c_text TEXT,\n" + " c_varchar TEXT\n" + @@ -126,7 +140,7 @@ public static void mysqlCreateTestDataTable() throws Exception { public static int mysqlLoadTestDataTable(int numRows) throws Exception { int numInsert = 0; do { - String sql = "INSERT INTO inventory.test_date_table (c_id, c_text, c_varchar ) " + + String sql = "INSERT INTO inventory.test_data (c_id, c_text, c_varchar ) " + "VALUES "; StringBuilder values = new StringBuilder("\n(" + TestUtil.randomInt(15, 32) + ", '" + TestUtil.randomString(524) + "', '" + TestUtil.randomString(524) + "')"); for (int i = 0; i < 10; i++) {