Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit test schema changes #43

Merged
merged 4 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonN
LOGGER.debug("Processing nested field:{}", tableFields);

for (Types.NestedField field : tableFields.fields()) {
// Set value to null if json event don't have the field
if (data == null || !data.has(field.name()) || data.get(field.name()) == null) {
mappedResult.put(field.name(), null);
continue;
}
// get the value of the field from json event, map it to iceberg value
mappedResult.put(field.name(), jsonToGenericRecordVal(field, data.get(field.name())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,11 @@
package io.debezium.server.iceberg;

import io.debezium.server.TestConfigSource;
import io.debezium.util.Testing;

import java.nio.file.Path;

import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;

public class ConfigSource extends TestConfigSource {

public static final String S3_REGION = "us-east-1";
public static final String S3_BUCKET = "test-bucket";
public static final Path HISTORY_FILE = Testing.Files.createTestingPath("dbhistory.txt").toAbsolutePath();

@Override
public int getOrdinal() {
Expand Down Expand Up @@ -62,9 +56,8 @@ public ConfigSource() {
config.put("debezium.transforms.unwrap.drop.tombstones", "true");

// DEBEZIUM SOURCE conf
config.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
config.put("debezium.source.database.history", "io.debezium.relational.history.FileDatabaseHistory");
config.put("debezium.source.database.history.file.filename", HISTORY_FILE.toAbsolutePath().toString());
config.put("debezium.source.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore");
config.put("debezium.source.database.history", "io.debezium.relational.history.MemoryDatabaseHistory");
config.put("debezium.source.offset.flush.interval.ms", "60000");
config.put("debezium.source.database.server.name", "testc");
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
Expand All @@ -21,7 +20,6 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -35,12 +33,19 @@
@TestProfile(IcebergChangeConsumerMysqlTestProfile.class)
public class IcebergChangeConsumerMysqlTest extends BaseSparkTest {

@Test
public void testSimpleUpload() throws Exception {

@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000")
Integer maxBatchSize;
Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.customers");
return df.filter("id is not null").count() >= 4;
} catch (Exception e) {
return false;
}
});
// S3Minio.listFiles();

@Test
public void testTombstoneEvents() throws Exception {
// create test table
String sqlCreate = "CREATE TABLE IF NOT EXISTS inventory.test_delete_table (" +
" c_id INTEGER ," +
Expand All @@ -57,32 +62,19 @@ public void testTombstoneEvents() throws Exception {
SourceMysqlDB.runSQL(sqlInsert);
SourceMysqlDB.runSQL(sqlDelete);
SourceMysqlDB.runSQL(sqlInsert);
SourceMysqlDB.runSQL(sqlDelete);
SourceMysqlDB.runSQL(sqlInsert);

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.test_delete_table");
df.show();
return df.count() >= 12; // 4 insert 4 delete 4 insert!
} catch (Exception e) {
return false;
}
});
}

@Test
public void testSimpleUpload() {
Testing.Print.enable();

Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.customers");
return df.filter("id is not null").count() >= 4;
return df.count() == 20; // 4X3 insert 4X2 delete!
} catch (Exception e) {
return false;
}
});

S3Minio.listFiles();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,15 @@

package io.debezium.server.iceberg;

import io.debezium.server.DebeziumServer;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -31,9 +25,10 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
Expand All @@ -46,53 +41,29 @@
@QuarkusTestResource(SourcePostgresqlDB.class)
@TestProfile(IcebergChangeConsumerTestProfile.class)
public class IcebergChangeConsumerTest extends BaseSparkTest {

@Inject
DebeziumServer server;

protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class);
@ConfigProperty(name = "debezium.sink.type")
String sinkType;

@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.warehouse")
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

{
// Testing.Debug.enable();
Testing.Files.delete(ConfigSource.OFFSET_STORE_PATH);
Testing.Files.createTestingFile(ConfigSource.OFFSET_STORE_PATH);
}

private HadoopCatalog getIcebergCatalog() {
// loop and set hadoopConf
Configuration hadoopConf = new Configuration();
for (String name : ConfigProvider.getConfig().getPropertyNames()) {
if (name.startsWith("debezium.sink.iceberg.")) {
hadoopConf.set(name.substring("debezium.sink.iceberg.".length()),
ConfigProvider.getConfig().getValue(name, String.class));
}
}
HadoopCatalog icebergCatalog = new HadoopCatalog();
icebergCatalog.setConf(hadoopConf);

Map<String, String> configMap = new HashMap<>();
hadoopConf.forEach(e-> configMap.put(e.getKey(), e.getValue()));
icebergCatalog.initialize("iceberg", configMap);
return icebergCatalog;
}

private org.apache.iceberg.Table getTable(String table) {

protected org.apache.iceberg.Table getTable(String table) {
HadoopCatalog catalog = getIcebergCatalog();
return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
}

@Test
public void testDatatypes() throws Exception {
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.table_datatypes;\n" +
" CREATE TABLE IF NOT EXISTS inventory.table_datatypes (\n" +
" DROP TABLE IF EXISTS inventory.data_types;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_types (\n" +
" c_id INTEGER ,\n" +
" c_text TEXT,\n" +
" c_varchar VARCHAR,\n" +
Expand All @@ -104,14 +75,14 @@ public void testDatatypes() throws Exception {
" c_decimal DECIMAL(18,4),\n" +
" c_numeric NUMERIC(18,4),\n" +
" c_interval INTERVAL,\n" +
" c_boolean BOOLean,\n" +
" c_boolean BOOLEAN,\n" +
" c_uuid UUID,\n" +
" c_bytea BYTEA,\n" +
" c_json json,\n" +
" c_jsonb jsonb\n" +
" c_json JSON,\n" +
" c_jsonb JSONB\n" +
" );";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.table_datatypes (" +
sql = "INSERT INTO inventory.data_types (" +
"c_id, " +
"c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " +
"c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid,c_bytea, " +
Expand All @@ -127,7 +98,7 @@ public void testDatatypes() throws Exception {
SourcePostgresqlDB.runSQL(sql);
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.table_datatypes");
Dataset<Row> df = getTableData("testc.inventory.data_types");
df.show(true);
return df.where("c_text is null AND c_varchar is null AND c_int is null " +
"AND c_date is null AND c_timestamp is null AND c_timestamptz is null " +
Expand All @@ -140,17 +111,8 @@ public void testDatatypes() throws Exception {
}

@Test
public void testIcebergConsumer() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
//ds.show();
return ds.count() >= 4;
} catch (Exception e) {
return false;
}
});

public void testSchemaChanges() throws Exception {
// TEST add new columns, drop not null constraint
SourcePostgresqlDB.runSQL("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE ID = 1002 ;");
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers ADD test_varchar_column varchar(255);");
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers ADD test_boolean_column boolean;");
Expand All @@ -167,46 +129,77 @@ public void testIcebergConsumer() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
ds.show();
return ds.where("first_name == 'George__UPDATE1'").count() == 3
//ds.show();
return
ds.where("__op == 'r'").count() == 4 // snapshot rows. initial table data
&& ds.where("__op == 'u'").count() == 3 // 3 update
&& ds.where("__op == 'c'").count() == 1 // 1 insert
&& ds.where("__op == 'd'").count() == 1 // 1 insert
&& ds.where("first_name == 'George__UPDATE1'").count() == 3
&& ds.where("first_name == 'SallyUSer2'").count() == 1
&& ds.where("last_name is null").count() == 1
&& ds.where("id == '1004'").where("__op == 'd'").count() == 1;
} catch (Exception e) {
return false;
}
});


// added columns are not recognized by iceberg
getTableData("testc.inventory.customers").show();
// add new columns to iceberg table! and check if new column values are populated!

// TEST add new columns to iceberg table then check if its data populated!
Table table = getTable("testc.inventory.customers");

// !!!!! IMPORTANT !!! column list here is in reverse order!! for testing purpose!
// NOTE column list below are in reverse order!! testing the behaviour purpose!
table.updateSchema()
// test_date_column is Long type because debezium serializes date type as number
// NOTE test_date_column is Long type because debezium serializes date type as number
.addColumn("test_date_column", Types.LongType.get())
.addColumn("test_boolean_column", Types.BooleanType.get())
.addColumn("test_varchar_column", Types.StringType.get())
.commit();

// insert row after defining new column in target iceberg table
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');");

// remove column from source
SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers DROP COLUMN email;");
SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " +
"(default,'User3','lastname_value3','test_varchar_value3',true, '2020-01-01'::DATE);");
"(default,'User3','lastname_value3','after-dropping-email-column-from-source',true, '2020-01-01'::DATE);");

Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
ds.show();
return ds.where("first_name == 'User3'").count() == 1
&& ds.where("test_varchar_column == 'test_varchar_value3'").count() == 1;
&& ds.where("first_name == 'After-Defining-Iceberg-fields'").count() == 1
&& ds.where("test_varchar_column == 'after-dropping-email-column-from-source' AND email is null").count() == 1;
} catch (Exception e) {
return false;
}
});
getTableData("testc.inventory.customers").show();

// CASE 1:(Adding new column to source) (A column missing in iceberg table)
// data of the new column is ignored till same column defined in iceberg table
// for example: if a column not found in iceberg table its data is dropped ignored and not copied to target!
// once iceberg table adds same column then data for this column recognized and populated

// CASE 2:(Removing column from source) (An extra column in iceberg table)
// these columns are populated with null value

// CASE 3:(Renaming column from source)
// this is CASE 2 + CASE 1 : old column will be populated with null values and new column will not be recognized
// and populated till it's added to iceberg table

S3Minio.listFiles();

}

// @Test
// @Disabled
// public void testDataTypeChanges() throws Exception {
// @TODO change boolean to string. string to bolean
// @TODO change int to string, string to int
// }

@Test
public void testSimpleUpload() {
Expand All @@ -219,10 +212,8 @@ public void testSimpleUpload() {
return false;
}
});
}

@Test
public void testGeomData() {

// test nested data(struct) consumed
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.geom");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class IcebergEventsChangeConsumerTest extends BaseSparkTest {
String sinkType;

@Test
public void testIcebergEvents() {
public void testSimpleUpload() {
Assertions.assertEquals(sinkType, "icebergevents");
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public void testBatchsizeWait() throws Exception {
}
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.test_date_table");
df.createOrReplaceGlobalTempView("test_date_table_batch_size");
Dataset<Row> df = getTableData("testc.inventory.test_data");
df.createOrReplaceGlobalTempView("test_data_batch_size");
df = spark
.sql("SELECT substring(input_file,94,60) as input_file, " +
"count(*) as batch_size FROM global_temp.test_date_table_batch_size group " +
"count(*) as batch_size FROM global_temp.test_data_batch_size group " +
"by 1");
//df.show(false);
return df.filter("batch_size = " + maxBatchSize).count() >= 5;
Expand Down
Loading