Skip to content

Commit

Permalink
Improve test resources (#107)
Browse files Browse the repository at this point in the history
* Use restrictToAnnotatedClass=true for quarkus tests
  • Loading branch information
ismailsimsek committed Sep 4, 2022
1 parent 18b6879 commit de5b248
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import io.debezium.server.iceberg.testresources.SourceMangoDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -30,12 +33,12 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMangoDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMangodbTestProfile.class)
@TestProfile(IcebergChangeConsumerMangodbTest.IcebergChangeConsumerMangodbTestProfile.class)
public class IcebergChangeConsumerMangodbTest extends BaseSparkTest {

@Test
public void testSimpleUpload() {

Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.products");
Expand All @@ -47,4 +50,35 @@ public void testSimpleUpload() {
});
}

public static class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("quarkus.profile", "mongodb");
config.put("%mongodb.debezium.source.connector.class", "io.debezium.connector.mongodb.MongoDbConnector");
config.put("%mongodb.debezium.transforms.unwrap.type", "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState");
config.put("%mongodb.debezium.transforms.unwrap.add.fields", "op,source.ts_ms,db");
config.put("%mongodb.debezium.sink.iceberg.allow-field-addition", "false");
config.put("%mongodb.debezium.source.mongodb.name", "testc");
config.put("%mongodb.debezium.source.database.include.list", "inventory"); // ok
config.put("%mongodb.debezium.source.collection.include.list", "inventory.products");

// IMPORTANT !!! FIX MongoDbConnector KEY FIELD NAME "id"=>"_id" !!!
config.put("%mongodb.debezium.transforms", "unwrap,renamekeyfield");
config.put("%mongodb.debezium.transforms.renamekeyfield.type",
"org.apache.kafka.connect.transforms.ReplaceField$Key");
config.put("%mongodb.debezium.transforms.renamekeyfield.renames", "id:_id");

return config;
}

@Override
public String getConfigProfile() {
return "mongodb";
}

}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -30,7 +33,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMysqlTestProfile.class)
@TestProfile(IcebergChangeConsumerMysqlTest.IcebergChangeConsumerMysqlTestProfile.class)
public class IcebergChangeConsumerMysqlTest extends BaseSparkTest {

@Test
Expand Down Expand Up @@ -77,4 +80,23 @@ public void testSimpleUpload() throws Exception {

}

public static class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("quarkus.profile", "mysql");
config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
return config;
}

@Override
public String getConfigProfile() {
return "mysql";
}

}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;

import org.apache.iceberg.catalog.Namespace;
Expand All @@ -38,7 +41,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerTestProfile.class)
@TestProfile(IcebergChangeConsumerTest.IcebergChangeConsumerTestProfile.class)
public class IcebergChangeConsumerTest extends BaseSparkTest {

protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class);
Expand Down Expand Up @@ -291,4 +294,18 @@ public void testMapDestination() {
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2"));
}

public static class IcebergChangeConsumerTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("debezium.sink.iceberg.write.format.default", "orc");
config.put("debezium.sink.iceberg.destination-regexp", "\\d");
//config.put("debezium.sink.iceberg.destination-regexp-replace", "");

return config;
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import io.debezium.server.iceberg.testresources.*;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;

import org.apache.spark.sql.Dataset;
Expand All @@ -30,7 +33,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class)
@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class)
public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest {

@Inject
Expand Down Expand Up @@ -142,5 +145,18 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.count(), 0);
Assertions.assertEquals(ds.where("first_name= 'user2'").count(), 0);
}


public static class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.iceberg.upsert", "true");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "false");
return config;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import io.debezium.server.iceberg.testresources.*;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;

import org.apache.spark.sql.Dataset;
Expand All @@ -30,7 +33,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertTestProfile.class)
@TestProfile(IcebergChangeConsumerUpsertTest.IcebergChangeConsumerUpsertTestProfile.class)
public class IcebergChangeConsumerUpsertTest extends BaseSparkTest {

@Inject
Expand Down Expand Up @@ -165,4 +168,17 @@ public void testSimpleUpsertNoKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1 AND __op= 'c' AND first_name= 'user2'").count(), 2);
}

public static class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.iceberg.upsert", "true");
config.put("debezium.sink.iceberg.upsert-keep-deletes", "true");
return config;
}
}

}

This file was deleted.

This file was deleted.

Loading

0 comments on commit de5b248

Please sign in to comment.