diff --git a/debezium-server-iceberg-sink/pom.xml b/debezium-server-iceberg-sink/pom.xml
index 9f9e1faf..999301b9 100644
--- a/debezium-server-iceberg-sink/pom.xml
+++ b/debezium-server-iceberg-sink/pom.xml
@@ -157,7 +157,7 @@
io.minio
minio
- 8.0.3
+ 8.3.1
test
@@ -169,7 +169,7 @@
org.awaitility
awaitility
- 4.0.3
+ 4.1.0
test
@@ -188,7 +188,13 @@
org.testcontainers
testcontainers
- 1.15.1
+ ${version.testcontainers}
+ test
+
+
+ org.testcontainers
+ mongodb
+ ${version.testcontainers}
test
@@ -201,6 +207,12 @@
debezium-connector-mysql
test
+
+ io.debezium
+ debezium-connector-mongodb
+ ${version.debezium}
+ test
+
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java
index 8e879800..ca99ebf0 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java
@@ -18,9 +18,6 @@
import java.time.Duration;
import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -47,18 +44,6 @@ public class IcebergChangeConsumerTest extends BaseSparkTest {
@ConfigProperty(name = "debezium.sink.type")
String sinkType;
- @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
- String tablePrefix;
- @ConfigProperty(name = "debezium.sink.iceberg.warehouse")
- String warehouseLocation;
- @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
- String namespace;
-
- protected org.apache.iceberg.Table getTable(String table) {
- HadoopCatalog catalog = getIcebergCatalog();
- return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
- }
-
@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java
index 990a2d6c..5a0dc9c2 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java
@@ -14,12 +14,15 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.eclipse.microprofile.config.ConfigProvider;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.BeforeAll;
import static io.debezium.server.iceberg.ConfigSource.S3_BUCKET;
@@ -29,12 +32,25 @@
* @author Ismail Simsek
*/
public class BaseSparkTest {
+
+ @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
+ String tablePrefix;
+ @ConfigProperty(name = "debezium.sink.iceberg.warehouse")
+ String warehouseLocation;
+ @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
+ String namespace;
+
protected static final SparkConf sparkconf = new SparkConf()
.setAppName("CDC-S3-Batch-Spark-Sink")
.setMaster("local[2]");
private static final String SPARK_PROP_PREFIX = "debezium.sink.sparkbatch.";
protected static SparkSession spark;
+ protected org.apache.iceberg.Table getTable(String table) {
+ HadoopCatalog catalog = getIcebergCatalog();
+ return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
+ }
+
protected HadoopCatalog getIcebergCatalog() {
// loop and set hadoopConf
Configuration hadoopConf = new Configuration();
@@ -153,7 +169,7 @@ public static int mysqlLoadTestDataTable(int numRows) throws Exception {
}
public Dataset getTableData(String table) {
- return spark.newSession().sql("SELECT input_file_name() as input_file, * FROM debeziumevents.debeziumcdc_" + table.replace(".", "_"));
+ return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM debeziumevents.debeziumcdc_" + table.replace(".", "_"));
}
}
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java
index d36bc1ad..5832ba93 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java
@@ -38,10 +38,8 @@ public class S3Minio implements QuarkusTestResourceLifecycleManager {
static final String DEFAULT_IMAGE = "minio/minio:latest";
static final String DEFAULT_STORAGE_DIRECTORY = "/data";
static final String HEALTH_ENDPOINT = "/minio/health/ready";
- public static MinioClient client;
-
-
static private final GenericContainer> container = new GenericContainer<>(DockerImageName.parse(DEFAULT_IMAGE))
+ .withExposedPorts(MINIO_DEFAULT_PORT)
.waitingFor(new HttpWaitStrategy()
.forPath(HEALTH_ENDPOINT)
.forPort(MINIO_DEFAULT_PORT)
@@ -50,6 +48,7 @@ public class S3Minio implements QuarkusTestResourceLifecycleManager {
.withEnv("MINIO_SECRET_KEY", MINIO_SECRET_KEY)
.withEnv("MINIO_REGION_NAME", ConfigSource.S3_REGION)
.withCommand("server " + DEFAULT_STORAGE_DIRECTORY);
+ public static MinioClient client;
public static List- getObjectList(String bucketName) {
List
- objects = new ArrayList<>();
@@ -71,7 +70,7 @@ public static void listFiles() {
try {
List bucketList = client.listBuckets();
for (Bucket bucket : bucketList) {
- System.out.printf("Bucket:%s ROOT", bucket.name());
+ System.out.printf("Bucket:%s ROOT", bucket.name());
Iterable> results = client.listObjects(ListObjectsArgs.builder().bucket(bucket.name()).recursive(true).build());
for (Result
- result : results) {
Item item = result.get();
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java
index de315aea..4474479d 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java
@@ -30,24 +30,22 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager {
public static final String MYSQL_PASSWORD = "mysqlpw";
public static final String MYSQL_DEBEZIUM_USER = "debezium";
public static final String MYSQL_DEBEZIUM_PASSWORD = "dbz";
- public static final String MYSQL_IMAGE = "debezium/example-mysql:1.5";
+ public static final String MYSQL_IMAGE = "debezium/example-mysql:1.7.0.Final";
public static final String MYSQL_HOST = "127.0.0.1";
public static final String MYSQL_DATABASE = "inventory";
public static final Integer MYSQL_PORT_DEFAULT = 3306;
private static final Logger LOGGER = LoggerFactory.getLogger(SourceMysqlDB.class);
- static private GenericContainer> container;
-
- @Override
- public void stop() {
- if (container != null) {
- container.stop();
- }
- }
+ static private final GenericContainer> container = new GenericContainer<>(MYSQL_IMAGE)
+ .waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2))
+ .withEnv("MYSQL_USER", MYSQL_USER)
+ .withEnv("MYSQL_PASSWORD", MYSQL_PASSWORD)
+ .withEnv("MYSQL_ROOT_PASSWORD", MYSQL_ROOT_PASSWORD)
+ .withStartupTimeout(Duration.ofSeconds(30));
public static void runSQL(String query) throws SQLException, ClassNotFoundException {
try {
- String url = "jdbc:mysql://" + MYSQL_HOST + ":" + getMappedPort() + "/" + MYSQL_DATABASE + "?useSSL=false";
+ String url = "jdbc:mysql://" + MYSQL_HOST + ":" + container.getMappedPort(MYSQL_PORT_DEFAULT) + "/" + MYSQL_DATABASE + "?useSSL=false";
Class.forName("com.mysql.cj.jdbc.Driver");
Connection con = DriverManager.getConnection(url, MYSQL_USER, MYSQL_PASSWORD);
Statement st = con.createStatement();
@@ -61,12 +59,6 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept
@Override
public Map start() {
- container = new GenericContainer<>(MYSQL_IMAGE)
- .waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2))
- .withEnv("MYSQL_USER", MYSQL_USER)
- .withEnv("MYSQL_PASSWORD", MYSQL_PASSWORD)
- .withEnv("MYSQL_ROOT_PASSWORD", MYSQL_ROOT_PASSWORD)
- .withStartupTimeout(Duration.ofSeconds(30));
container.start();
Map params = new ConcurrentHashMap<>();
@@ -78,9 +70,11 @@ public Map start() {
return params;
}
- public static Integer getMappedPort() {
- return container.getMappedPort(MYSQL_PORT_DEFAULT);
+ @Override
+ public void stop() {
+ if (container != null) {
+ container.stop();
+ }
}
-
}
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java
index ca37bd1c..556bbf0d 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java
@@ -28,17 +28,24 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager {
public static final String POSTGRES_USER = "postgres";
public static final String POSTGRES_PASSWORD = "postgres";
public static final String POSTGRES_DBNAME = "postgres";
- public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.5";
+ public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.7.0.Final";
public static final String POSTGRES_HOST = "localhost";
public static final Integer POSTGRES_PORT_DEFAULT = 5432;
private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class);
- private static GenericContainer> container;
+ private static GenericContainer> container = new GenericContainer<>(POSTGRES_IMAGE)
+ .waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2))
+ .withEnv("POSTGRES_USER", POSTGRES_USER)
+ .withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
+ .withEnv("POSTGRES_DB", POSTGRES_DBNAME)
+ .withEnv("POSTGRES_INITDB_ARGS", "-E UTF8")
+ .withEnv("LANG", "en_US.utf8")
+ .withStartupTimeout(Duration.ofSeconds(30));
public static void runSQL(String query) throws SQLException, ClassNotFoundException {
try {
- String url = "jdbc:postgresql://" + POSTGRES_HOST + ":" + getMappedPort() + "/" + POSTGRES_DBNAME;
+ String url = "jdbc:postgresql://" + POSTGRES_HOST + ":" + container.getMappedPort(POSTGRES_PORT_DEFAULT) + "/" + POSTGRES_DBNAME;
Class.forName("org.postgresql.Driver");
Connection con = DriverManager.getConnection(url, POSTGRES_USER, POSTGRES_PASSWORD);
Statement st = con.createStatement();
@@ -50,20 +57,8 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept
}
}
- public static Integer getMappedPort() {
- return container.getMappedPort(POSTGRES_PORT_DEFAULT);
- }
-
@Override
public Map start() {
- container = new GenericContainer<>(POSTGRES_IMAGE)
- .waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2))
- .withEnv("POSTGRES_USER", POSTGRES_USER)
- .withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
- .withEnv("POSTGRES_DB", POSTGRES_DBNAME)
- .withEnv("POSTGRES_INITDB_ARGS", "-E UTF8")
- .withEnv("LANG", "en_US.utf8")
- .withStartupTimeout(Duration.ofSeconds(30));
container.start();
Map params = new ConcurrentHashMap<>();
diff --git a/pom.xml b/pom.xml
index 42f88de9..e15f29a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,14 +27,15 @@
8
true
- 3.0.7
+ 3.0.9
3.3.0
2.12.6
- 0.12.0
+ 0.12.1
3.1.2
3.3.1
- 2.16.88
- 1.11.1
+ 2.17.75
+ 1.12.2
+ 1.15.3
1.8.0.Final
8.0.27
@@ -136,6 +137,13 @@
${version.jackson}
true
+
+
+ com.squareup.okhttp3
+ okhttp
+ 4.9.2
+ test
+