Skip to content

Commit

Permalink
Improve tests (#109)
Browse files Browse the repository at this point in the history
* Cleanup tests

* Cleanup tests
  • Loading branch information
ismailsimsek committed Sep 4, 2022
1 parent de5b248 commit debc790
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ConfigSource() {
// debezium unwrap message
config.put("debezium.transforms", "unwrap");
config.put("debezium.transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState");
config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db");
config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db,ts_ms");
config.put("debezium.transforms.unwrap.delete.handling.mode", "rewrite");
config.put("debezium.transforms.unwrap.drop.tombstones", "true");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public void testSimpleUpload() {
}

public static class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand All @@ -64,7 +62,6 @@ public Map<String, String> getConfigOverrides() {
config.put("%mongodb.debezium.source.mongodb.name", "testc");
config.put("%mongodb.debezium.source.database.include.list", "inventory"); // ok
config.put("%mongodb.debezium.source.collection.include.list", "inventory.products");

// IMPORTANT !!! FIX MongoDbConnector KEY FIELD NAME "id"=>"_id" !!!
config.put("%mongodb.debezium.transforms", "unwrap,renamekeyfield");
config.put("%mongodb.debezium.transforms.renamekeyfield.type",
Expand All @@ -78,7 +75,6 @@ public Map<String, String> getConfigOverrides() {
public String getConfigProfile() {
return "mongodb";
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ public void testSimpleUpload() throws Exception {
}

public static class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand All @@ -96,7 +94,6 @@ public Map<String, String> getConfigOverrides() {
public String getConfigProfile() {
return "mysql";
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void testSimpleUpload() {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
ds.show();
ds.show(false);
return ds.count() >= 3;
} catch (Exception e) {
return false;
Expand Down Expand Up @@ -295,15 +295,12 @@ public void testMapDestination() {
}

public static class IcebergChangeConsumerTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("debezium.sink.iceberg.write.format.default", "orc");
config.put("debezium.sink.iceberg.destination-regexp", "\\d");
//config.put("debezium.sink.iceberg.destination-regexp-replace", "");

return config;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.*;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.TestChangeEvent;
import io.debezium.server.iceberg.testresources.TestUtil;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
Expand All @@ -32,17 +35,17 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class)
public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest {

@Inject
IcebergChangeConsumer consumer;
final static Long TEST_EPOCH_MS = 1577840461000L;

@Test
public void testSimpleUpsert() throws Exception {

String dest = "inventory.customers_upsert";
String dest = "testc.inventory.customers_upsert";
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.of(dest, 1, "c"));
records.add(TestChangeEvent.of(dest, 2, "c"));
Expand Down Expand Up @@ -72,17 +75,17 @@ public void testSimpleUpsert() throws Exception {

records.clear();
// incase of duplicate records it should only keep the latest by epoch ts
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", 5L));
records.add(TestChangeEvent.of(dest, 4, "d", "Updatedname-4-V3", 6L));
records.add(TestChangeEvent.of(dest, 5, "d", 7L));
records.add(TestChangeEvent.of(dest, 6, "r", 8L));
records.add(TestChangeEvent.of(dest, 6, "r", 9L));
records.add(TestChangeEvent.of(dest, 6, "u", 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", 11L));
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", TEST_EPOCH_MS + 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", TEST_EPOCH_MS + 5L));
records.add(TestChangeEvent.of(dest, 4, "d", "Updatedname-4-V3", TEST_EPOCH_MS + 6L));
records.add(TestChangeEvent.of(dest, 5, "d", TEST_EPOCH_MS + 7L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 8L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 9L));
records.add(TestChangeEvent.of(dest, 6, "u", TEST_EPOCH_MS + 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", TEST_EPOCH_MS + 11L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -95,10 +98,10 @@ public void testSimpleUpsert() throws Exception {
// in case of duplicate records including epoch ts, its should keep latest one based on operation priority
// ("c", 1, "r", 2, "u", 3, "d", 4);
records.clear();
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", 1L));
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -107,10 +110,10 @@ public void testSimpleUpsert() throws Exception {

// if its not standard insert followed by update! should keep latest one
records.clear();
records.add(TestChangeEvent.of(dest, 7, "u", 1L));
records.add(TestChangeEvent.of(dest, 7, "d", 2L));
records.add(TestChangeEvent.of(dest, 7, "r", 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", 4L));
records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 7, "d", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 7, "r", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", TEST_EPOCH_MS + 4L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -120,13 +123,13 @@ public void testSimpleUpsert() throws Exception {

@Test
public void testSimpleUpsertCompositeKey() throws Exception {
String dest = "inventory.customers_upsert_compositekey";
String dest = "testc.inventory.customers_upsert_compositekey";
// test simple inserts
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
consumer.handleBatch(records, TestUtil.getCommitter());

Dataset<Row> ds = getTableData("testc.inventory.customers_upsert_compositekey");
Expand All @@ -135,10 +138,10 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1").count(), 2);

records.clear();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user1", 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user2", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user1", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user2", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert_compositekey");
ds.show();
Expand All @@ -147,12 +150,9 @@ public void testSimpleUpsertCompositeKey() throws Exception {
}

public static class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.iceberg.upsert", "true");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "false");
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.*;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.TestChangeEvent;
import io.debezium.server.iceberg.testresources.TestUtil;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
Expand All @@ -32,16 +35,16 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertTest.IcebergChangeConsumerUpsertTestProfile.class)
public class IcebergChangeConsumerUpsertTest extends BaseSparkTest {

@Inject
IcebergChangeConsumer consumer;
final static Long TEST_EPOCH_MS = 1577840461000L;

@Test
public void testSimpleUpsert() throws Exception {
String dest = "inventory.customers_upsert";
String dest = "testc.inventory.customers_upsert";
// test simple inserts
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.of(dest, 1, "c"));
Expand Down Expand Up @@ -72,17 +75,17 @@ public void testSimpleUpsert() throws Exception {

records.clear();
// incase of duplicate records it should only keep the latest by epoch ts
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", 5L));
records.add(TestChangeEvent.of(dest, 4, "r", "Updatedname-4-V3", 6L));
records.add(TestChangeEvent.of(dest, 5, "r", 7L));
records.add(TestChangeEvent.of(dest, 6, "r", 8L));
records.add(TestChangeEvent.of(dest, 6, "r", 9L));
records.add(TestChangeEvent.of(dest, 6, "u", 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", 11L));
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", TEST_EPOCH_MS + 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", TEST_EPOCH_MS + 5L));
records.add(TestChangeEvent.of(dest, 4, "r", "Updatedname-4-V3", TEST_EPOCH_MS + 6L));
records.add(TestChangeEvent.of(dest, 5, "r", TEST_EPOCH_MS + 7L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 8L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 9L));
records.add(TestChangeEvent.of(dest, 6, "u", TEST_EPOCH_MS + 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", TEST_EPOCH_MS + 11L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.sort("id").show(false);
Expand All @@ -95,10 +98,10 @@ public void testSimpleUpsert() throws Exception {
// in case of duplicate records including epoch ts, its should keep latest one based on operation priority
// ("c", 1, "r", 2, "u", 3, "d", 4);
records.clear();
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", 1L));
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -107,10 +110,10 @@ public void testSimpleUpsert() throws Exception {

// if its not standard insert followed by update! should keep latest one
records.clear();
records.add(TestChangeEvent.of(dest, 7, "u", 1L));
records.add(TestChangeEvent.of(dest, 7, "u", 2L));
records.add(TestChangeEvent.of(dest, 7, "r", 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", 4L));
records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 7, "r", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", TEST_EPOCH_MS + 4L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -120,13 +123,13 @@ public void testSimpleUpsert() throws Exception {

@Test
public void testSimpleUpsertCompositeKey() throws Exception {
String dest = "inventory.customers_upsert_compositekey";
String dest = "testc.inventory.customers_upsert_compositekey";
// test simple inserts
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
consumer.handleBatch(records, TestUtil.getCommitter());

Dataset<Row> ds = getTableData("testc.inventory.customers_upsert_compositekey");
Expand All @@ -135,7 +138,7 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1").count(), 2);

records.clear();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user2", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user2", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert_compositekey");
ds.show();
Expand All @@ -145,22 +148,22 @@ public void testSimpleUpsertCompositeKey() throws Exception {

@Test
public void testSimpleUpsertNoKey() throws Exception {
String dest = "inventory.customers_upsert_nokey";
String dest = "testc.inventory.customers_upsert_nokey";
// when there is no PK it should fall back to append mode
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user1", 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user1", 2L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
consumer.handleBatch(records, TestUtil.getCommitter());
Dataset<Row> ds = getTableData("testc.inventory.customers_upsert_nokey");
ds.show();
Assertions.assertEquals(ds.count(), 3);
Assertions.assertEquals(ds.where("id = 1").count(), 3);

records.clear();
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user2", 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "r", "user1", 3L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert_nokey");
ds.show();
Expand All @@ -169,12 +172,9 @@ public void testSimpleUpsertNoKey() throws Exception {
}

public static class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.iceberg.upsert", "true");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "true");
return config;
Expand Down
Loading

0 comments on commit debc790

Please sign in to comment.