Skip to content

Commit

Permalink
Unit test schema changes (#43)
Browse files Browse the repository at this point in the history
* cleanup tests
  • Loading branch information
ismailsimsek committed Oct 25, 2021
1 parent 0ad4efb commit a866277
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonN
LOGGER.debug("Processing nested field:{}", tableFields);

for (Types.NestedField field : tableFields.fields()) {
// Set value to null if json event don't have the field
if (data == null || !data.has(field.name()) || data.get(field.name()) == null) {
mappedResult.put(field.name(), null);
continue;
}
// get the value of the field from json event, map it to iceberg value
mappedResult.put(field.name(), jsonToGenericRecordVal(field, data.get(field.name())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,11 @@
package io.debezium.server.iceberg;

import io.debezium.server.TestConfigSource;
import io.debezium.util.Testing;

import java.nio.file.Path;

import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;

public class ConfigSource extends TestConfigSource {

public static final String S3_REGION = "us-east-1";
public static final String S3_BUCKET = "test-bucket";
public static final Path HISTORY_FILE = Testing.Files.createTestingPath("dbhistory.txt").toAbsolutePath();

@Override
public int getOrdinal() {
Expand Down Expand Up @@ -62,9 +56,8 @@ public ConfigSource() {
config.put("debezium.transforms.unwrap.drop.tombstones", "true");

// DEBEZIUM SOURCE conf
config.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
config.put("debezium.source.database.history", "io.debezium.relational.history.FileDatabaseHistory");
config.put("debezium.source.database.history.file.filename", HISTORY_FILE.toAbsolutePath().toString());
config.put("debezium.source.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore");
config.put("debezium.source.database.history", "io.debezium.relational.history.MemoryDatabaseHistory");
config.put("debezium.source.offset.flush.interval.ms", "60000");
config.put("debezium.source.database.server.name", "testc");
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
Expand All @@ -21,7 +20,6 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -35,12 +33,19 @@
@TestProfile(IcebergChangeConsumerMysqlTestProfile.class)
public class IcebergChangeConsumerMysqlTest extends BaseSparkTest {

@Test
public void testSimpleUpload() throws Exception {

@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000")
Integer maxBatchSize;
Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.customers");
return df.filter("id is not null").count() >= 4;
} catch (Exception e) {
return false;
}
});
// S3Minio.listFiles();

@Test
public void testTombstoneEvents() throws Exception {
// create test table
String sqlCreate = "CREATE TABLE IF NOT EXISTS inventory.test_delete_table (" +
" c_id INTEGER ," +
Expand All @@ -57,32 +62,19 @@ public void testTombstoneEvents() throws Exception {
SourceMysqlDB.runSQL(sqlInsert);
SourceMysqlDB.runSQL(sqlDelete);
SourceMysqlDB.runSQL(sqlInsert);
SourceMysqlDB.runSQL(sqlDelete);
SourceMysqlDB.runSQL(sqlInsert);

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.test_delete_table");
df.show();
return df.count() >= 12; // 4 insert 4 delete 4 insert!
} catch (Exception e) {
return false;
}
});
}

@Test
public void testSimpleUpload() {
Testing.Print.enable();

Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.customers");
return df.filter("id is not null").count() >= 4;
return df.count() == 20; // 4X3 insert 4X2 delete!
} catch (Exception e) {
return false;
}
});

S3Minio.listFiles();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,15 @@

package io.debezium.server.iceberg;

import io.debezium.server.DebeziumServer;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -31,9 +25,10 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
Expand All @@ -46,53 +41,29 @@
@QuarkusTestResource(SourcePostgresqlDB.class)
@TestProfile(IcebergChangeConsumerTestProfile.class)
public class IcebergChangeConsumerTest extends BaseSparkTest {

@Inject
DebeziumServer server;

protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class);
@ConfigProperty(name = "debezium.sink.type")
String sinkType;

@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.warehouse")
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

{
// Testing.Debug.enable();
Testing.Files.delete(ConfigSource.OFFSET_STORE_PATH);
Testing.Files.createTestingFile(ConfigSource.OFFSET_STORE_PATH);
}

private HadoopCatalog getIcebergCatalog() {
// loop and set hadoopConf
Configuration hadoopConf = new Configuration();
for (String name : ConfigProvider.getConfig().getPropertyNames()) {
if (name.startsWith("debezium.sink.iceberg.")) {
hadoopConf.set(name.substring("debezium.sink.iceberg.".length()),
ConfigProvider.getConfig().getValue(name, String.class));
}
}
HadoopCatalog icebergCatalog = new HadoopCatalog();
icebergCatalog.setConf(hadoopConf);

Map<String, String> configMap = new HashMap<>();
hadoopConf.forEach(e-> configMap.put(e.getKey(), e.getValue()));
icebergCatalog.initialize("iceberg", configMap);
return icebergCatalog;
}

private org.apache.iceberg.Table getTable(String table) {

protected org.apache.iceberg.Table getTable(String table) {
HadoopCatalog catalog = getIcebergCatalog();
return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
}

@Test
public void testDatatypes() throws Exception {
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.table_datatypes;\n" +
" CREATE TABLE IF NOT EXISTS inventory.table_datatypes (\n" +
" DROP TABLE IF EXISTS inventory.data_types;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_types (\n" +
" c_id INTEGER ,\n" +
" c_text TEXT,\n" +
" c_varchar VARCHAR,\n" +
Expand All @@ -104,14 +75,14 @@ public void testDatatypes() throws Exception {
" c_decimal DECIMAL(18,4),\n" +
" c_numeric NUMERIC(18,4),\n" +
" c_interval INTERVAL,\n" +
" c_boolean BOOLean,\n" +
" c_boolean BOOLEAN,\n" +
" c_uuid UUID,\n" +
" c_bytea BYTEA,\n" +
" c_json json,\n" +
" c_jsonb jsonb\n" +
" c_json JSON,\n" +
" c_jsonb JSONB\n" +
" );";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.table_datatypes (" +
sql = "INSERT INTO inventory.data_types (" +
"c_id, " +
"c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " +
"c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid,c_bytea, " +
Expand All @@ -127,7 +98,7 @@ public void testDatatypes() throws Exception {
SourcePostgresqlDB.runSQL(sql);
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.table_datatypes");
Dataset<Row> df = getTableData("testc.inventory.data_types");
df.show(true);
return df.where("c_text is null AND c_varchar is null AND c_int is null " +
"AND c_date is null AND c_timestamp is null AND c_timestamptz is null " +
Expand All @@ -140,17 +111,8 @@ public void testDatatypes() throws Exception {
}

@Test
public void testIcebergConsumer() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
//ds.show();
return ds.count() >= 4;
} catch (Exception e) {
return false;
}
});

public void testSchemaChanges() throws Exception {
// TEST add new columns, drop not null constraint
SourcePostgresqlDB.runSQL("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE ID = 1002 ;");
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers ADD test_varchar_column varchar(255);");
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers ADD test_boolean_column boolean;");
Expand All @@ -167,46 +129,77 @@ public void testIcebergConsumer() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
ds.show();
return ds.where("first_name == 'George__UPDATE1'").count() == 3
//ds.show();
return
ds.where("__op == 'r'").count() == 4 // snapshot rows. initial table data
&& ds.where("__op == 'u'").count() == 3 // 3 update
&& ds.where("__op == 'c'").count() == 1 // 1 insert
&& ds.where("__op == 'd'").count() == 1 // 1 insert
&& ds.where("first_name == 'George__UPDATE1'").count() == 3
&& ds.where("first_name == 'SallyUSer2'").count() == 1
&& ds.where("last_name is null").count() == 1
&& ds.where("id == '1004'").where("__op == 'd'").count() == 1;
} catch (Exception e) {
return false;
}
});


// added columns are not recognized by iceberg
getTableData("testc.inventory.customers").show();
// add new columns to iceberg table! and check if new column values are populated!

// TEST add new columns to iceberg table then check if its data populated!
Table table = getTable("testc.inventory.customers");

// !!!!! IMPORTANT !!! column list here is in reverse order!! for testing purpose!
// NOTE column list below are in reverse order!! testing the behaviour purpose!
table.updateSchema()
// test_date_column is Long type because debezium serializes date type as number
// NOTE test_date_column is Long type because debezium serializes date type as number
.addColumn("test_date_column", Types.LongType.get())
.addColumn("test_boolean_column", Types.BooleanType.get())
.addColumn("test_varchar_column", Types.StringType.get())
.commit();

// insert row after defining new column in target iceberg table
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');");

// remove column from source
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers DROP COLUMN email;");
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'User3','lastname_value3','test_varchar_value3',true, '2020-01-01'::DATE);");
"(default,'User3','lastname_value3','after-dropping-email-column-from-source',true, '2020-01-01'::DATE);");

Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
ds.show();
return ds.where("first_name == 'User3'").count() == 1
&& ds.where("test_varchar_column == 'test_varchar_value3'").count() == 1;
&& ds.where("first_name == 'After-Defining-Iceberg-fields'").count() == 1
&& ds.where("test_varchar_column == 'after-dropping-email-column-from-source' AND email is null").count() == 1;
} catch (Exception e) {
return false;
}
});
getTableData("testc.inventory.customers").show();

// CASE 1:(Adding new column to source) (A column missing in iceberg table)
// data of the new column is ignored till same column defined in iceberg table
// for example: if a column not found in iceberg table its data is dropped ignored and not copied to target!
// once iceberg table adds same column then data for this column recognized and populated

// CASE 2:(Removing column from source) (An extra column in iceberg table)
// these columns are populated with null value

// CASE 3:(Renaming column from source)
// this is CASE 2 + CASE 1 : old column will be populated with null values and new column will not be recognized
// and populated till it's added to iceberg table

S3Minio.listFiles();

}

// @Test
// @Disabled
// public void testDataTypeChanges() throws Exception {
// @TODO change boolean to string. string to bolean
// @TODO change int to string, string to int
// }

@Test
public void testSimpleUpload() {
Expand All @@ -219,10 +212,8 @@ public void testSimpleUpload() {
return false;
}
});
}

@Test
public void testGeomData() {

// test nested data(struct) consumed
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.geom");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class IcebergEventsChangeConsumerTest extends BaseSparkTest {
String sinkType;

@Test
public void testIcebergEvents() {
public void testSimpleUpload() {
Assertions.assertEquals(sinkType, "icebergevents");
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public void testBatchsizeWait() throws Exception {
}
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.test_date_table");
df.createOrReplaceGlobalTempView("test_date_table_batch_size");
Dataset<Row> df = getTableData("testc.inventory.test_data");
df.createOrReplaceGlobalTempView("test_data_batch_size");
df = spark
.sql("SELECT substring(input_file,94,60) as input_file, " +
"count(*) as batch_size FROM global_temp.test_date_table_batch_size group " +
"count(*) as batch_size FROM global_temp.test_data_batch_size group " +
"by 1");
//df.show(false);
return df.filter("batch_size = " + maxBatchSize).count() >= 5;
Expand Down
Loading

0 comments on commit a866277

Please sign in to comment.