Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve tests #109

Merged
merged 2 commits into from
Sep 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ConfigSource() {
// debezium unwrap message
config.put("debezium.transforms", "unwrap");
config.put("debezium.transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState");
config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db");
config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db,ts_ms");
config.put("debezium.transforms.unwrap.delete.handling.mode", "rewrite");
config.put("debezium.transforms.unwrap.drop.tombstones", "true");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public void testSimpleUpload() {
}

public static class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand All @@ -64,7 +62,6 @@ public Map<String, String> getConfigOverrides() {
config.put("%mongodb.debezium.source.mongodb.name", "testc");
config.put("%mongodb.debezium.source.database.include.list", "inventory"); // ok
config.put("%mongodb.debezium.source.collection.include.list", "inventory.products");

// IMPORTANT !!! FIX MongoDbConnector KEY FIELD NAME "id"=>"_id" !!!
config.put("%mongodb.debezium.transforms", "unwrap,renamekeyfield");
config.put("%mongodb.debezium.transforms.renamekeyfield.type",
Expand All @@ -78,7 +75,6 @@ public Map<String, String> getConfigOverrides() {
public String getConfigProfile() {
return "mongodb";
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ public void testSimpleUpload() throws Exception {
}

public static class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand All @@ -96,7 +94,6 @@ public Map<String, String> getConfigOverrides() {
public String getConfigProfile() {
return "mysql";
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void testSimpleUpload() {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
ds.show();
ds.show(false);
return ds.count() >= 3;
} catch (Exception e) {
return false;
Expand Down Expand Up @@ -295,15 +295,12 @@ public void testMapDestination() {
}

public static class IcebergChangeConsumerTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("debezium.sink.iceberg.write.format.default", "orc");
config.put("debezium.sink.iceberg.destination-regexp", "\\d");
//config.put("debezium.sink.iceberg.destination-regexp-replace", "");

return config;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.*;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.TestChangeEvent;
import io.debezium.server.iceberg.testresources.TestUtil;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
Expand All @@ -32,17 +35,17 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class)
public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest {

@Inject
IcebergChangeConsumer consumer;
final static Long TEST_EPOCH_MS = 1577840461000L;

@Test
public void testSimpleUpsert() throws Exception {

String dest = "inventory.customers_upsert";
String dest = "testc.inventory.customers_upsert";
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.of(dest, 1, "c"));
records.add(TestChangeEvent.of(dest, 2, "c"));
Expand Down Expand Up @@ -72,17 +75,17 @@ public void testSimpleUpsert() throws Exception {

records.clear();
// incase of duplicate records it should only keep the latest by epoch ts
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", 5L));
records.add(TestChangeEvent.of(dest, 4, "d", "Updatedname-4-V3", 6L));
records.add(TestChangeEvent.of(dest, 5, "d", 7L));
records.add(TestChangeEvent.of(dest, 6, "r", 8L));
records.add(TestChangeEvent.of(dest, 6, "r", 9L));
records.add(TestChangeEvent.of(dest, 6, "u", 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", 11L));
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", TEST_EPOCH_MS + 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", TEST_EPOCH_MS + 5L));
records.add(TestChangeEvent.of(dest, 4, "d", "Updatedname-4-V3", TEST_EPOCH_MS + 6L));
records.add(TestChangeEvent.of(dest, 5, "d", TEST_EPOCH_MS + 7L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 8L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 9L));
records.add(TestChangeEvent.of(dest, 6, "u", TEST_EPOCH_MS + 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", TEST_EPOCH_MS + 11L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -95,10 +98,10 @@ public void testSimpleUpsert() throws Exception {
// in case of duplicate records including epoch ts, its should keep latest one based on operation priority
// ("c", 1, "r", 2, "u", 3, "d", 4);
records.clear();
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", 1L));
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -107,10 +110,10 @@ public void testSimpleUpsert() throws Exception {

// if its not standard insert followed by update! should keep latest one
records.clear();
records.add(TestChangeEvent.of(dest, 7, "u", 1L));
records.add(TestChangeEvent.of(dest, 7, "d", 2L));
records.add(TestChangeEvent.of(dest, 7, "r", 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", 4L));
records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 7, "d", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 7, "r", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", TEST_EPOCH_MS + 4L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -120,13 +123,13 @@ public void testSimpleUpsert() throws Exception {

@Test
public void testSimpleUpsertCompositeKey() throws Exception {
String dest = "inventory.customers_upsert_compositekey";
String dest = "testc.inventory.customers_upsert_compositekey";
// test simple inserts
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
consumer.handleBatch(records, TestUtil.getCommitter());

Dataset<Row> ds = getTableData("testc.inventory.customers_upsert_compositekey");
Expand All @@ -135,10 +138,10 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1").count(), 2);

records.clear();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user1", 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user2", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user1", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user2", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert_compositekey");
ds.show();
Expand All @@ -147,12 +150,9 @@ public void testSimpleUpsertCompositeKey() throws Exception {
}

public static class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.iceberg.upsert", "true");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "false");
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.*;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.TestChangeEvent;
import io.debezium.server.iceberg.testresources.TestUtil;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
Expand All @@ -32,16 +35,16 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertTest.IcebergChangeConsumerUpsertTestProfile.class)
public class IcebergChangeConsumerUpsertTest extends BaseSparkTest {

@Inject
IcebergChangeConsumer consumer;
final static Long TEST_EPOCH_MS = 1577840461000L;

@Test
public void testSimpleUpsert() throws Exception {
String dest = "inventory.customers_upsert";
String dest = "testc.inventory.customers_upsert";
// test simple inserts
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.of(dest, 1, "c"));
Expand Down Expand Up @@ -72,17 +75,17 @@ public void testSimpleUpsert() throws Exception {

records.clear();
// incase of duplicate records it should only keep the latest by epoch ts
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", 5L));
records.add(TestChangeEvent.of(dest, 4, "r", "Updatedname-4-V3", 6L));
records.add(TestChangeEvent.of(dest, 5, "r", 7L));
records.add(TestChangeEvent.of(dest, 6, "r", 8L));
records.add(TestChangeEvent.of(dest, 6, "r", 9L));
records.add(TestChangeEvent.of(dest, 6, "u", 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", 11L));
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", TEST_EPOCH_MS + 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", TEST_EPOCH_MS + 5L));
records.add(TestChangeEvent.of(dest, 4, "r", "Updatedname-4-V3", TEST_EPOCH_MS + 6L));
records.add(TestChangeEvent.of(dest, 5, "r", TEST_EPOCH_MS + 7L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 8L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 9L));
records.add(TestChangeEvent.of(dest, 6, "u", TEST_EPOCH_MS + 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", TEST_EPOCH_MS + 11L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.sort("id").show(false);
Expand All @@ -95,10 +98,10 @@ public void testSimpleUpsert() throws Exception {
// in case of duplicate records including epoch ts, its should keep latest one based on operation priority
// ("c", 1, "r", 2, "u", 3, "d", 4);
records.clear();
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", 1L));
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -107,10 +110,10 @@ public void testSimpleUpsert() throws Exception {

// if its not standard insert followed by update! should keep latest one
records.clear();
records.add(TestChangeEvent.of(dest, 7, "u", 1L));
records.add(TestChangeEvent.of(dest, 7, "u", 2L));
records.add(TestChangeEvent.of(dest, 7, "r", 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", 4L));
records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 7, "r", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", TEST_EPOCH_MS + 4L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -120,13 +123,13 @@ public void testSimpleUpsert() throws Exception {

@Test
public void testSimpleUpsertCompositeKey() throws Exception {
String dest = "inventory.customers_upsert_compositekey";
String dest = "testc.inventory.customers_upsert_compositekey";
// test simple inserts
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
consumer.handleBatch(records, TestUtil.getCommitter());

Dataset<Row> ds = getTableData("testc.inventory.customers_upsert_compositekey");
Expand All @@ -135,7 +138,7 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1").count(), 2);

records.clear();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user2", 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user2", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert_compositekey");
ds.show();
Expand All @@ -145,22 +148,22 @@ public void testSimpleUpsertCompositeKey() throws Exception {

@Test
public void testSimpleUpsertNoKey() throws Exception {
String dest = "inventory.customers_upsert_nokey";
String dest = "testc.inventory.customers_upsert_nokey";
// when there is no PK it should fall back to append mode
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user1", 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user1", 2L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
consumer.handleBatch(records, TestUtil.getCommitter());
Dataset<Row> ds = getTableData("testc.inventory.customers_upsert_nokey");
ds.show();
Assertions.assertEquals(ds.count(), 3);
Assertions.assertEquals(ds.where("id = 1").count(), 3);

records.clear();
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user2", 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "r", "user1", 3L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert_nokey");
ds.show();
Expand All @@ -169,12 +172,9 @@ public void testSimpleUpsertNoKey() throws Exception {
}

public static class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.iceberg.upsert", "true");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "true");
return config;
Expand Down
Loading