From ffe60cf2b141ba2db58a6fc8d5fc63600aba525d Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 30 May 2023 18:01:14 +0200 Subject: [PATCH 1/5] Improve tests and Add using JDBC catalog with S3FileIO example configuration --- .../conf/application.properties.example | 15 +++++++ debezium-server-iceberg-sink/pom.xml | 6 +++ .../IcebergChangeConsumerMangodbTest.java | 4 +- .../IcebergChangeConsumerMysqlTest.java | 4 +- .../iceberg/IcebergChangeConsumerTest.java | 4 +- ...ChangeConsumerUpsertDeleteDeletesTest.java | 4 +- .../IcebergChangeConsumerUpsertTest.java | 4 +- .../IcebergEventsChangeConsumerTest.java | 4 +- .../server/iceberg/TestConfigSource.java | 13 +++--- .../DynamicBatchSizeWaitTest.java | 4 +- .../batchsizewait/MaxBatchSizeWaitTest.java | 4 +- .../iceberg/testresources/BaseSparkTest.java | 36 ++++++++++------ .../iceberg/testresources/JdbcCatalogDB.java | 43 +++++++++++++++++++ .../server/iceberg/testresources/S3Minio.java | 27 ++++++++---- pom.xml | 2 +- 15 files changed, 130 insertions(+), 44 deletions(-) create mode 100644 debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/JdbcCatalogDB.java diff --git a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example index 6625124a..e9f40f15 100644 --- a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example +++ b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example @@ -22,6 +22,21 @@ debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY debezium.sink.iceberg.fs.s3a.path.style.access=true debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem +# S3 config using JdbcCatalog catalog And S3FileIO +debezium.sink.iceberg.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog +debezium.sink.iceberg.uri=jdbc_db_url +debezium.sink.iceberg.jdbc.user=my_user +debezium.sink.iceberg.jdbc.password=my_password +debezium.sink.iceberg.table-namespace=debeziumdata +debezium.sink.iceberg.catalog-name=iceberg +debezium.sink.iceberg.warehouse=s3a://my_bucket/iceberg_warehouse +# Use S3FileIO +debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO +debezium.sink.iceberg.s3.endpoint=http://localhost:9000 +debezium.sink.iceberg.s3.path-style-access=true +debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY +debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY + # S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO ### using mino as S3 debezium.sink.iceberg.s3.endpoint=http://localhost:9000; diff --git a/debezium-server-iceberg-sink/pom.xml b/debezium-server-iceberg-sink/pom.xml index f588fe5c..f9079e91 100644 --- a/debezium-server-iceberg-sink/pom.xml +++ b/debezium-server-iceberg-sink/pom.xml @@ -231,6 +231,12 @@ ${version.testcontainers} test + + org.testcontainers + mysql + ${version.testcontainers} + test + org.testcontainers mongodb diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java index 257a12d3..9b58c99b 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMangodbTest.java @@ -34,7 +34,7 @@ @Disabled // @TODO fix @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = SourceMangoDB.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergChangeConsumerMangodbTest.IcebergChangeConsumerMangodbTestProfile.class) +@TestProfile(IcebergChangeConsumerMangodbTest.TestProfile.class) public class IcebergChangeConsumerMangodbTest extends BaseSparkTest { @Test @@ -52,7 +52,7 @@ public void testSimpleUpload() { }); } - public static class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile { + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { Map config = new HashMap<>(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java index 4782c89e..80850aa7 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java @@ -34,7 +34,7 @@ @Disabled // @TODO remove spark with antlr4 version @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergChangeConsumerMysqlTest.IcebergChangeConsumerMysqlTestProfile.class) +@TestProfile(IcebergChangeConsumerMysqlTest.TestProfile.class) public class IcebergChangeConsumerMysqlTest extends BaseTest { @Test @@ -80,7 +80,7 @@ public void testSimpleUpload() throws Exception { } - public static class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile { + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { Map config = new HashMap<>(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 98eef82f..79afb5a8 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -44,7 +44,7 @@ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergChangeConsumerTest.IcebergChangeConsumerTestProfile.class) +@TestProfile(IcebergChangeConsumerTest.TestProfile.class) public class IcebergChangeConsumerTest extends BaseSparkTest { protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class); @@ -307,7 +307,7 @@ public void testMapDestination() { assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2")); } - public static class IcebergChangeConsumerTestProfile implements QuarkusTestProfile { + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { Map config = new HashMap<>(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java index ed798843..b66d4873 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java @@ -34,7 +34,7 @@ */ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class) +@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.TestProfile.class) public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest { @Inject @@ -148,7 +148,7 @@ public void testSimpleUpsertCompositeKey() throws Exception { Assertions.assertEquals(ds.where("first_name= 'user2'").count(), 0); } - public static class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements QuarkusTestProfile { + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { Map config = new HashMap<>(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index 3eef7487..1c07ce14 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -34,7 +34,7 @@ */ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergChangeConsumerUpsertTest.IcebergChangeConsumerUpsertTestProfile.class) +@TestProfile(IcebergChangeConsumerUpsertTest.TestProfile.class) public class IcebergChangeConsumerUpsertTest extends BaseSparkTest { @Inject @@ -170,7 +170,7 @@ public void testSimpleUpsertNoKey() throws Exception { Assertions.assertEquals(ds.where("id = 1 AND __op= 'c' AND first_name= 'user2'").count(), 2); } - public static class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfile { + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { Map config = new HashMap<>(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index 355b2fe9..a4b31aa1 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -34,7 +34,7 @@ @QuarkusTest @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) -@TestProfile(IcebergEventsChangeConsumerTest.IcebergEventsChangeConsumerTestProfile.class) +@TestProfile(IcebergEventsChangeConsumerTest.TestProfile.class) public class IcebergEventsChangeConsumerTest extends BaseSparkTest { @ConfigProperty(name = "debezium.sink.type") String sinkType; @@ -56,7 +56,7 @@ public void testSimpleUpload() { S3Minio.listFiles(); } - public static class IcebergEventsChangeConsumerTestProfile implements QuarkusTestProfile { + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { Map config = new HashMap<>(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 76fd1c2e..6092f0d2 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -17,7 +17,10 @@ public class TestConfigSource implements ConfigSource { public static final String S3_REGION = "us-east-1"; - public static final String S3_BUCKET = "test-bucket"; + public static final String S3_BUCKET_NAME = "test-bucket"; + public static final String CATALOG_TABLE_NAMESPACE = "debeziumevents"; + public static final String ICEBERG_CATALOG_NAME = "iceberg"; + public static final String S3_BUCKET = "s3a://" + S3_BUCKET_NAME + "/iceberg_warehouse"; protected Map config = new HashMap<>(); @@ -38,12 +41,10 @@ public TestConfigSource() { config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds! // iceberg config.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_"); - config.put("debezium.sink.iceberg.table-namespace", "debeziumevents"); - config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET); - config.put("debezium.sink.iceberg.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse"); + config.put("debezium.sink.iceberg.table-namespace", CATALOG_TABLE_NAMESPACE); + config.put("debezium.sink.iceberg.catalog-name", ICEBERG_CATALOG_NAME); + // use hadoop catalog for tests config.put("debezium.sink.iceberg.type", "hadoop"); - config.put("debezium.sink.iceberg.catalog-name", "mycatalog"); - //config.put("debezium.sink.iceberg.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog"); // enable disable schema config.put("debezium.format.value.schemas.enable", "true"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java index 81ade996..9e611452 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java @@ -21,7 +21,7 @@ import org.junit.jupiter.api.Test; @QuarkusTest -@TestProfile(DynamicBatchSizeWaitTest.DynamicBatchSizeWaitTestProfile.class) +@TestProfile(DynamicBatchSizeWaitTest.TestProfile.class) class DynamicBatchSizeWaitTest { @Inject @@ -71,7 +71,7 @@ void shouldDecreaseSleepMs() { Assertions.assertTrue(dynamicSleep.getWaitMs(120) <= 100); } - public static class DynamicBatchSizeWaitTestProfile implements QuarkusTestProfile { + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java index 2c548cb8..f3a3ecdd 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java @@ -28,7 +28,7 @@ import org.junit.jupiter.api.Test; @QuarkusTest -@TestProfile(MaxBatchSizeWaitTest.MaxBatchSizeWaitTestProfile.class) +@TestProfile(MaxBatchSizeWaitTest.TestProfile.class) @QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true) @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) class MaxBatchSizeWaitTest extends BaseSparkTest { @@ -63,7 +63,7 @@ public void testBatchsizeWait() throws Exception { }); } - public static class MaxBatchSizeWaitTestProfile implements QuarkusTestProfile { + public static class TestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { Map config = new HashMap<>(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index 7e7e5e22..1608d3db 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -21,6 +21,7 @@ import org.apache.spark.sql.SparkSession; import org.eclipse.microprofile.config.ConfigProvider; import org.junit.jupiter.api.BeforeAll; +import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE; import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET; /** @@ -43,20 +44,30 @@ static void setup() { sparkconf .set("spark.ui.enabled", "false") .set("spark.eventLog.enabled", "false") - .set("spark.hadoop.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY) - .set("spark.hadoop.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY) - // minio specific setting using minio as S3 - .set("spark.hadoop.fs.s3a.endpoint", "http://localhost:" + S3Minio.getMappedPort()) - .set("spark.hadoop.fs.s3a.path.style.access", "true") - .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - - // enable iceberg SQL Extensions + // enable iceberg SQL Extensions and Catalog .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + // hadoop catalog .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") .set("spark.sql.catalog.spark_catalog.type", "hadoop") - //.set("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog") - .set("spark.sql.catalog.spark_catalog.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse") - .set("spark.sql.warehouse.dir", "s3a://" + S3_BUCKET + "/iceberg_warehouse"); + .set("spark.sql.catalog.spark_catalog.warehouse", S3_BUCKET) + .set("spark.sql.catalog.spark_catalog.default-namespaces", CATALOG_TABLE_NAMESPACE) + .set("spark.sql.warehouse.dir", S3_BUCKET) +// // JdbcCatalog catalog, add additional catalog +// .set("spark.sql.defaultCatalog", ICEBERG_CATALOG_NAME) +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog") +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".warehouse", S3_BUCKET) +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".cache-enabled", "false") +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".catalog-impl", JdbcCatalog.class.getName()) +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".default-namespaces", CATALOG_TABLE_NAMESPACE) +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".uri", JdbcCatalogDB.container.getJdbcUrl()) +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".jdbc.user", JdbcCatalogDB.container.getUsername()) +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".jdbc.password", JdbcCatalogDB.container.getPassword()) +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".io-impl", "org.apache.iceberg.aws.s3.S3FileIO") +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString()) +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.path-style-access", "true") +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.access-key-id", S3Minio.MINIO_ACCESS_KEY) +// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.secret-access-key", S3Minio.MINIO_SECRET_KEY) + ; BaseSparkTest.spark = SparkSession .builder() @@ -124,8 +135,7 @@ protected HadoopCatalog getIcebergCatalog() { } public Dataset getTableData(String table) { - table = "debeziumevents.debeziumcdc_" + table.replace(".", "_"); - //System.out.println("--loading-->" + table); + table = CATALOG_TABLE_NAMESPACE + ".debeziumcdc_" + table.replace(".", "_"); return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/JdbcCatalogDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/JdbcCatalogDB.java new file mode 100644 index 00000000..db931fb3 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/JdbcCatalogDB.java @@ -0,0 +1,43 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.testresources; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.testcontainers.containers.MySQLContainer; + +public class JdbcCatalogDB implements QuarkusTestResourceLifecycleManager { + public static MySQLContainer container = new MySQLContainer<>(); + + @Override + public Map start() { + container.start(); + + Map config = new ConcurrentHashMap<>(); + + config.put("debezium.sink.iceberg.catalog-impl", JdbcCatalog.class.getName()); + config.put("debezium.sink.iceberg.uri", container.getJdbcUrl()); + config.put("debezium.sink.iceberg.jdbc.user", container.getUsername()); + config.put("debezium.sink.iceberg.jdbc.password", container.getPassword()); + + return config; + } + + @Override + public void stop() { + if (container != null) { + container.stop(); + } + } + +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java index 2cbd5388..b2dcc516 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java @@ -28,6 +28,8 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.utility.DockerImageName; +import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET; +import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET_NAME; public class S3Minio implements QuarkusTestResourceLifecycleManager { @@ -121,19 +123,28 @@ public Map start() { client.ignoreCertCheck(); client.makeBucket(MakeBucketArgs.builder() .region(TestConfigSource.S3_REGION) - .bucket(TestConfigSource.S3_BUCKET) + .bucket(S3_BUCKET_NAME) .build()); } catch (Exception e) { e.printStackTrace(); } LOGGER.info("Minio Started!"); - Map params = new ConcurrentHashMap<>(); - params.put("debezium.sink.iceberg.fs.s3a.endpoint", "http://localhost:" + container.getMappedPort(MINIO_DEFAULT_PORT).toString()); - params.put("debezium.sink.iceberg.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY); - params.put("debezium.sink.iceberg.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY); - params.put("debezium.sink.iceberg.fs.s3a.path.style.access", "true"); - - return params; + Map config = new ConcurrentHashMap<>(); + // FOR JDBC CATALOG + config.put("debezium.sink.iceberg.s3.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString()); + config.put("debezium.sink.iceberg.s3.path-style-access", "true"); + config.put("debezium.sink.iceberg.s3.access-key-id", S3Minio.MINIO_ACCESS_KEY); + config.put("debezium.sink.iceberg.s3.secret-access-key", S3Minio.MINIO_SECRET_KEY); + config.put("debezium.sink.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + config.put("debezium.sink.iceberg.warehouse", S3_BUCKET); + // FOR HADOOP CATALOG + config.put("debezium.sink.iceberg.fs.s3a.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString()); + config.put("debezium.sink.iceberg.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY); + config.put("debezium.sink.iceberg.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY); + config.put("debezium.sink.iceberg.fs.s3a.path.style.access", "true"); + config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET_NAME); + + return config; } diff --git a/pom.xml b/pom.xml index 903eb2bf..046a8de4 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,7 @@ 2.2.1.Final 8.0.32 - 3.0.4.Final + 3.1.0.Final 4.8 From 8fdb5c80e80bd8f4221ac5c959023f50b5ae7459 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 1 Jun 2023 10:00:56 +0200 Subject: [PATCH 2/5] Improve tests and Add using JDBC catalog with S3FileIO example configuration --- pom.xml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pom.xml b/pom.xml index 046a8de4..dc466e59 100644 --- a/pom.xml +++ b/pom.xml @@ -44,13 +44,6 @@ 4.8 - - - orgapacheiceberg-release - https://repository.apache.org/content/repositories/orgapacheiceberg-1134/ - - - From 97f05a7b550b55f0135e39c21c2c9203b2d5e977 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 1 Jun 2023 10:19:20 +0200 Subject: [PATCH 3/5] Improve tests and Add using JDBC catalog with S3FileIO example configuration --- .../server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java index 9e611452..62e07552 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWaitTest.java @@ -8,6 +8,8 @@ package io.debezium.server.iceberg.batchsizewait; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; @@ -22,6 +24,7 @@ @QuarkusTest @TestProfile(DynamicBatchSizeWaitTest.TestProfile.class) +@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) class DynamicBatchSizeWaitTest { @Inject From 303316cb9e738884501a67c87a1c0e05c59b7c43 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 1 Jun 2023 10:58:38 +0200 Subject: [PATCH 4/5] Remove hadoop (and aws-java-sdk-bundle) dependencies from release --- .../java/io/debezium/server/iceberg/testresources/S3Minio.java | 1 + 1 file changed, 1 insertion(+) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java index b2dcc516..af330270 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java @@ -135,6 +135,7 @@ public Map start() { config.put("debezium.sink.iceberg.s3.path-style-access", "true"); config.put("debezium.sink.iceberg.s3.access-key-id", S3Minio.MINIO_ACCESS_KEY); config.put("debezium.sink.iceberg.s3.secret-access-key", S3Minio.MINIO_SECRET_KEY); + config.put("debezium.sink.iceberg.s3.region", TestConfigSource.S3_REGION); config.put("debezium.sink.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); config.put("debezium.sink.iceberg.warehouse", S3_BUCKET); // FOR HADOOP CATALOG From e8f13e57985769e3f1486447283efb8e66acb04d Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 1 Jun 2023 11:07:15 +0200 Subject: [PATCH 5/5] Remove hadoop (and aws-java-sdk-bundle) dependencies from release --- .github/workflows/build.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c9aecadc..1e364553 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -19,6 +19,7 @@ on: env: SPARK_LOCAL_IP: 127.0.0.1 + AWS_REGION: us-east-1 jobs: build: @@ -33,4 +34,4 @@ jobs: distribution: 'temurin' java-version: 11 - name: Build with Maven - run: mvn -B package --file pom.xml + run: mvn -B package --file pom.xml -Dsurefire.skipAfterFailureCount=1