diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index c9aecadc..1e364553 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -19,6 +19,7 @@ on:
env:
SPARK_LOCAL_IP: 127.0.0.1
+ AWS_REGION: us-east-1
jobs:
build:
@@ -33,4 +34,4 @@ jobs:
distribution: 'temurin'
java-version: 11
- name: Build with Maven
- run: mvn -B package --file pom.xml
+ run: mvn -B package --file pom.xml -Dsurefire.skipAfterFailureCount=1
diff --git a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example
index 6625124a..e9f40f15 100644
--- a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example
+++ b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example
@@ -22,6 +22,21 @@ debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
+# S3 config using JdbcCatalog catalog And S3FileIO
+debezium.sink.iceberg.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog
+debezium.sink.iceberg.uri=jdbc_db_url
+debezium.sink.iceberg.jdbc.user=my_user
+debezium.sink.iceberg.jdbc.password=my_password
+debezium.sink.iceberg.table-namespace=debeziumdata
+debezium.sink.iceberg.catalog-name=iceberg
+debezium.sink.iceberg.warehouse=s3a://my_bucket/iceberg_warehouse
+# Use S3FileIO
+debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
+debezium.sink.iceberg.s3.endpoint=http://localhost:9000
+debezium.sink.iceberg.s3.path-style-access=true
+debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY
+debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY
+
# S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO
### using mino as S3
debezium.sink.iceberg.s3.endpoint=http://localhost:9000;
diff --git a/debezium-server-iceberg-sink/pom.xml b/debezium-server-iceberg-sink/pom.xml
index f588fe5c..f9079e91 100644
--- a/debezium-server-iceberg-sink/pom.xml
+++ b/debezium-server-iceberg-sink/pom.xml
@@ -231,6 +231,12 @@
${version.testcontainers}
test
+
+ org.testcontainers
+ mysql
+ ${version.testcontainers}
+ test
+
org.testcontainers
mongodb
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java
index 257a12d3..9b58c99b 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java
@@ -34,7 +34,7 @@
@Disabled // @TODO fix
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMangoDB.class, restrictToAnnotatedClass = true)
-@TestProfile(IcebergChangeConsumerMangodbTest.IcebergChangeConsumerMangodbTestProfile.class)
+@TestProfile(IcebergChangeConsumerMangodbTest.TestProfile.class)
public class IcebergChangeConsumerMangodbTest extends BaseSparkTest {
@Test
@@ -52,7 +52,7 @@ public void testSimpleUpload() {
});
}
- public static class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile {
+ public static class TestProfile implements QuarkusTestProfile {
@Override
public Map getConfigOverrides() {
Map config = new HashMap<>();
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java
index 4782c89e..80850aa7 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java
@@ -34,7 +34,7 @@
@Disabled // @TODO remove spark with antlr4 version
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true)
-@TestProfile(IcebergChangeConsumerMysqlTest.IcebergChangeConsumerMysqlTestProfile.class)
+@TestProfile(IcebergChangeConsumerMysqlTest.TestProfile.class)
public class IcebergChangeConsumerMysqlTest extends BaseTest {
@Test
@@ -80,7 +80,7 @@ public void testSimpleUpload() throws Exception {
}
- public static class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile {
+ public static class TestProfile implements QuarkusTestProfile {
@Override
public Map getConfigOverrides() {
Map config = new HashMap<>();
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java
index 98eef82f..79afb5a8 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java
@@ -44,7 +44,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
-@TestProfile(IcebergChangeConsumerTest.IcebergChangeConsumerTestProfile.class)
+@TestProfile(IcebergChangeConsumerTest.TestProfile.class)
public class IcebergChangeConsumerTest extends BaseSparkTest {
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class);
@@ -307,7 +307,7 @@ public void testMapDestination() {
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2"));
}
- public static class IcebergChangeConsumerTestProfile implements QuarkusTestProfile {
+ public static class TestProfile implements QuarkusTestProfile {
@Override
public Map getConfigOverrides() {
Map config = new HashMap<>();
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java
index ed798843..b66d4873 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java
@@ -34,7 +34,7 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
-@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class)
+@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.TestProfile.class)
public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest {
@Inject
@@ -148,7 +148,7 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.where("first_name= 'user2'").count(), 0);
}
- public static class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements QuarkusTestProfile {
+ public static class TestProfile implements QuarkusTestProfile {
@Override
public Map getConfigOverrides() {
Map config = new HashMap<>();
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java
index 3eef7487..1c07ce14 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java
@@ -34,7 +34,7 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
-@TestProfile(IcebergChangeConsumerUpsertTest.IcebergChangeConsumerUpsertTestProfile.class)
+@TestProfile(IcebergChangeConsumerUpsertTest.TestProfile.class)
public class IcebergChangeConsumerUpsertTest extends BaseSparkTest {
@Inject
@@ -170,7 +170,7 @@ public void testSimpleUpsertNoKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1 AND __op= 'c' AND first_name= 'user2'").count(), 2);
}
- public static class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfile {
+ public static class TestProfile implements QuarkusTestProfile {
@Override
public Map getConfigOverrides() {
Map config = new HashMap<>();
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java
index 355b2fe9..a4b31aa1 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java
@@ -34,7 +34,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
-@TestProfile(IcebergEventsChangeConsumerTest.IcebergEventsChangeConsumerTestProfile.class)
+@TestProfile(IcebergEventsChangeConsumerTest.TestProfile.class)
public class IcebergEventsChangeConsumerTest extends BaseSparkTest {
@ConfigProperty(name = "debezium.sink.type")
String sinkType;
@@ -56,7 +56,7 @@ public void testSimpleUpload() {
S3Minio.listFiles();
}
- public static class IcebergEventsChangeConsumerTestProfile implements QuarkusTestProfile {
+ public static class TestProfile implements QuarkusTestProfile {
@Override
public Map getConfigOverrides() {
Map config = new HashMap<>();
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java
index 76fd1c2e..6092f0d2 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java
@@ -17,7 +17,10 @@
public class TestConfigSource implements ConfigSource {
public static final String S3_REGION = "us-east-1";
- public static final String S3_BUCKET = "test-bucket";
+ public static final String S3_BUCKET_NAME = "test-bucket";
+ public static final String CATALOG_TABLE_NAMESPACE = "debeziumevents";
+ public static final String ICEBERG_CATALOG_NAME = "iceberg";
+ public static final String S3_BUCKET = "s3a://" + S3_BUCKET_NAME + "/iceberg_warehouse";
protected Map config = new HashMap<>();
@@ -38,12 +41,10 @@ public TestConfigSource() {
config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds!
// iceberg
config.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_");
- config.put("debezium.sink.iceberg.table-namespace", "debeziumevents");
- config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET);
- config.put("debezium.sink.iceberg.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse");
+ config.put("debezium.sink.iceberg.table-namespace", CATALOG_TABLE_NAMESPACE);
+ config.put("debezium.sink.iceberg.catalog-name", ICEBERG_CATALOG_NAME);
+ // use hadoop catalog for tests
config.put("debezium.sink.iceberg.type", "hadoop");
- config.put("debezium.sink.iceberg.catalog-name", "mycatalog");
- //config.put("debezium.sink.iceberg.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog");
// enable disable schema
config.put("debezium.format.value.schemas.enable", "true");
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java
index 81ade996..62e07552 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java
@@ -8,6 +8,8 @@
package io.debezium.server.iceberg.batchsizewait;
+import io.debezium.server.iceberg.testresources.S3Minio;
+import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
@@ -21,7 +23,8 @@
import org.junit.jupiter.api.Test;
@QuarkusTest
-@TestProfile(DynamicBatchSizeWaitTest.DynamicBatchSizeWaitTestProfile.class)
+@TestProfile(DynamicBatchSizeWaitTest.TestProfile.class)
+@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
class DynamicBatchSizeWaitTest {
@Inject
@@ -71,7 +74,7 @@ void shouldDecreaseSleepMs() {
Assertions.assertTrue(dynamicSleep.getWaitMs(120) <= 100);
}
- public static class DynamicBatchSizeWaitTestProfile implements QuarkusTestProfile {
+ public static class TestProfile implements QuarkusTestProfile {
@Override
public Map getConfigOverrides() {
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java
index 2c548cb8..f3a3ecdd 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java
@@ -28,7 +28,7 @@
import org.junit.jupiter.api.Test;
@QuarkusTest
-@TestProfile(MaxBatchSizeWaitTest.MaxBatchSizeWaitTestProfile.class)
+@TestProfile(MaxBatchSizeWaitTest.TestProfile.class)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
class MaxBatchSizeWaitTest extends BaseSparkTest {
@@ -63,7 +63,7 @@ public void testBatchsizeWait() throws Exception {
});
}
- public static class MaxBatchSizeWaitTestProfile implements QuarkusTestProfile {
+ public static class TestProfile implements QuarkusTestProfile {
@Override
public Map getConfigOverrides() {
Map config = new HashMap<>();
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java
index 7e7e5e22..1608d3db 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java
@@ -21,6 +21,7 @@
import org.apache.spark.sql.SparkSession;
import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.BeforeAll;
+import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET;
/**
@@ -43,20 +44,30 @@ static void setup() {
sparkconf
.set("spark.ui.enabled", "false")
.set("spark.eventLog.enabled", "false")
- .set("spark.hadoop.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY)
- .set("spark.hadoop.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY)
- // minio specific setting using minio as S3
- .set("spark.hadoop.fs.s3a.endpoint", "http://localhost:" + S3Minio.getMappedPort())
- .set("spark.hadoop.fs.s3a.path.style.access", "true")
- .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
-
- // enable iceberg SQL Extensions
+ // enable iceberg SQL Extensions and Catalog
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ // hadoop catalog
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hadoop")
- //.set("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog")
- .set("spark.sql.catalog.spark_catalog.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse")
- .set("spark.sql.warehouse.dir", "s3a://" + S3_BUCKET + "/iceberg_warehouse");
+ .set("spark.sql.catalog.spark_catalog.warehouse", S3_BUCKET)
+ .set("spark.sql.catalog.spark_catalog.default-namespaces", CATALOG_TABLE_NAMESPACE)
+ .set("spark.sql.warehouse.dir", S3_BUCKET)
+// // JdbcCatalog catalog, add additional catalog
+// .set("spark.sql.defaultCatalog", ICEBERG_CATALOG_NAME)
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog")
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".warehouse", S3_BUCKET)
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".cache-enabled", "false")
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".catalog-impl", JdbcCatalog.class.getName())
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".default-namespaces", CATALOG_TABLE_NAMESPACE)
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".uri", JdbcCatalogDB.container.getJdbcUrl())
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".jdbc.user", JdbcCatalogDB.container.getUsername())
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".jdbc.password", JdbcCatalogDB.container.getPassword())
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString())
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.path-style-access", "true")
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.access-key-id", S3Minio.MINIO_ACCESS_KEY)
+// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.secret-access-key", S3Minio.MINIO_SECRET_KEY)
+ ;
BaseSparkTest.spark = SparkSession
.builder()
@@ -124,8 +135,7 @@ protected HadoopCatalog getIcebergCatalog() {
}
public Dataset getTableData(String table) {
- table = "debeziumevents.debeziumcdc_" + table.replace(".", "_");
- //System.out.println("--loading-->" + table);
+ table = CATALOG_TABLE_NAMESPACE + ".debeziumcdc_" + table.replace(".", "_");
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table);
}
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/JdbcCatalogDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/JdbcCatalogDB.java
new file mode 100644
index 00000000..db931fb3
--- /dev/null
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/JdbcCatalogDB.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * * Copyright memiiso Authors.
+ * *
+ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ */
+
+package io.debezium.server.iceberg.testresources;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.testcontainers.containers.MySQLContainer;
+
+public class JdbcCatalogDB implements QuarkusTestResourceLifecycleManager {
+ public static MySQLContainer> container = new MySQLContainer<>();
+
+ @Override
+ public Map start() {
+ container.start();
+
+ Map config = new ConcurrentHashMap<>();
+
+ config.put("debezium.sink.iceberg.catalog-impl", JdbcCatalog.class.getName());
+ config.put("debezium.sink.iceberg.uri", container.getJdbcUrl());
+ config.put("debezium.sink.iceberg.jdbc.user", container.getUsername());
+ config.put("debezium.sink.iceberg.jdbc.password", container.getPassword());
+
+ return config;
+ }
+
+ @Override
+ public void stop() {
+ if (container != null) {
+ container.stop();
+ }
+ }
+
+}
diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java
index 2cbd5388..af330270 100644
--- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java
+++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java
@@ -28,6 +28,8 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.DockerImageName;
+import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET;
+import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET_NAME;
public class S3Minio implements QuarkusTestResourceLifecycleManager {
@@ -121,19 +123,29 @@ public Map start() {
client.ignoreCertCheck();
client.makeBucket(MakeBucketArgs.builder()
.region(TestConfigSource.S3_REGION)
- .bucket(TestConfigSource.S3_BUCKET)
+ .bucket(S3_BUCKET_NAME)
.build());
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("Minio Started!");
- Map params = new ConcurrentHashMap<>();
- params.put("debezium.sink.iceberg.fs.s3a.endpoint", "http://localhost:" + container.getMappedPort(MINIO_DEFAULT_PORT).toString());
- params.put("debezium.sink.iceberg.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY);
- params.put("debezium.sink.iceberg.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY);
- params.put("debezium.sink.iceberg.fs.s3a.path.style.access", "true");
-
- return params;
+ Map config = new ConcurrentHashMap<>();
+ // FOR JDBC CATALOG
+ config.put("debezium.sink.iceberg.s3.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString());
+ config.put("debezium.sink.iceberg.s3.path-style-access", "true");
+ config.put("debezium.sink.iceberg.s3.access-key-id", S3Minio.MINIO_ACCESS_KEY);
+ config.put("debezium.sink.iceberg.s3.secret-access-key", S3Minio.MINIO_SECRET_KEY);
+ config.put("debezium.sink.iceberg.s3.region", TestConfigSource.S3_REGION);
+ config.put("debezium.sink.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
+ config.put("debezium.sink.iceberg.warehouse", S3_BUCKET);
+ // FOR HADOOP CATALOG
+ config.put("debezium.sink.iceberg.fs.s3a.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString());
+ config.put("debezium.sink.iceberg.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY);
+ config.put("debezium.sink.iceberg.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY);
+ config.put("debezium.sink.iceberg.fs.s3a.path.style.access", "true");
+ config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET_NAME);
+
+ return config;
}
diff --git a/pom.xml b/pom.xml
index 903eb2bf..dc466e59 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,18 +39,11 @@
2.2.1.Final
8.0.32
- 3.0.4.Final
+ 3.1.0.Final
4.8
-
-
- orgapacheiceberg-release
- https://repository.apache.org/content/repositories/orgapacheiceberg-1134/
-
-
-