Skip to content

Commit

Permalink
Improve tests and Add JDBC catalog with S3FileIO example config (#203)
Browse files Browse the repository at this point in the history
* Improve tests and Add using JDBC catalog with S3FileIO example configuration

* Improve tests and Add using JDBC catalog with S3FileIO example configuration

* Improve tests and Add using JDBC catalog with S3FileIO example configuration

* Remove hadoop (and aws-java-sdk-bundle) dependencies from release

* Remove hadoop (and aws-java-sdk-bundle) dependencies from release
  • Loading branch information
ismailsimsek committed Jun 1, 2023
1 parent 0728a23 commit 6804857
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 52 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ on:

env:
SPARK_LOCAL_IP: 127.0.0.1
AWS_REGION: us-east-1

jobs:
build:
Expand All @@ -33,4 +34,4 @@ jobs:
distribution: 'temurin'
java-version: 11
- name: Build with Maven
run: mvn -B package --file pom.xml
run: mvn -B package --file pom.xml -Dsurefire.skipAfterFailureCount=1
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

# S3 config using JdbcCatalog catalog And S3FileIO
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog
debezium.sink.iceberg.uri=jdbc_db_url
debezium.sink.iceberg.jdbc.user=my_user
debezium.sink.iceberg.jdbc.password=my_password
debezium.sink.iceberg.table-namespace=debeziumdata
debezium.sink.iceberg.catalog-name=iceberg
debezium.sink.iceberg.warehouse=s3a://my_bucket/iceberg_warehouse
# Use S3FileIO
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.s3.endpoint=http://localhost:9000
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY
debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY

# S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO
### using mino as S3
debezium.sink.iceberg.s3.endpoint=http://localhost:9000;
Expand Down
6 changes: 6 additions & 0 deletions debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@Disabled // @TODO fix
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMangoDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMangodbTest.IcebergChangeConsumerMangodbTestProfile.class)
@TestProfile(IcebergChangeConsumerMangodbTest.TestProfile.class)
public class IcebergChangeConsumerMangodbTest extends BaseSparkTest {

@Test
Expand All @@ -52,7 +52,7 @@ public void testSimpleUpload() {
});
}

public static class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@Disabled // @TODO remove spark with antlr4 version
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMysqlTest.IcebergChangeConsumerMysqlTestProfile.class)
@TestProfile(IcebergChangeConsumerMysqlTest.TestProfile.class)
public class IcebergChangeConsumerMysqlTest extends BaseTest {

@Test
Expand Down Expand Up @@ -80,7 +80,7 @@ public void testSimpleUpload() throws Exception {

}

public static class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerTest.IcebergChangeConsumerTestProfile.class)
@TestProfile(IcebergChangeConsumerTest.TestProfile.class)
public class IcebergChangeConsumerTest extends BaseSparkTest {

protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class);
Expand Down Expand Up @@ -307,7 +307,7 @@ public void testMapDestination() {
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2"));
}

public static class IcebergChangeConsumerTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class)
@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.TestProfile.class)
public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest {

@Inject
Expand Down Expand Up @@ -148,7 +148,7 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.where("first_name= 'user2'").count(), 0);
}

public static class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertTest.IcebergChangeConsumerUpsertTestProfile.class)
@TestProfile(IcebergChangeConsumerUpsertTest.TestProfile.class)
public class IcebergChangeConsumerUpsertTest extends BaseSparkTest {

@Inject
Expand Down Expand Up @@ -170,7 +170,7 @@ public void testSimpleUpsertNoKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1 AND __op= 'c' AND first_name= 'user2'").count(), 2);
}

public static class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergEventsChangeConsumerTest.IcebergEventsChangeConsumerTestProfile.class)
@TestProfile(IcebergEventsChangeConsumerTest.TestProfile.class)
public class IcebergEventsChangeConsumerTest extends BaseSparkTest {
@ConfigProperty(name = "debezium.sink.type")
String sinkType;
Expand All @@ -56,7 +56,7 @@ public void testSimpleUpload() {
S3Minio.listFiles();
}

public static class IcebergEventsChangeConsumerTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
public class TestConfigSource implements ConfigSource {

public static final String S3_REGION = "us-east-1";
public static final String S3_BUCKET = "test-bucket";
public static final String S3_BUCKET_NAME = "test-bucket";
public static final String CATALOG_TABLE_NAMESPACE = "debeziumevents";
public static final String ICEBERG_CATALOG_NAME = "iceberg";
public static final String S3_BUCKET = "s3a://" + S3_BUCKET_NAME + "/iceberg_warehouse";
protected Map<String, String> config = new HashMap<>();


Expand All @@ -38,12 +41,10 @@ public TestConfigSource() {
config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds!
// iceberg
config.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_");
config.put("debezium.sink.iceberg.table-namespace", "debeziumevents");
config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET);
config.put("debezium.sink.iceberg.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse");
config.put("debezium.sink.iceberg.table-namespace", CATALOG_TABLE_NAMESPACE);
config.put("debezium.sink.iceberg.catalog-name", ICEBERG_CATALOG_NAME);
// use hadoop catalog for tests
config.put("debezium.sink.iceberg.type", "hadoop");
config.put("debezium.sink.iceberg.catalog-name", "mycatalog");
//config.put("debezium.sink.iceberg.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog");

// enable disable schema
config.put("debezium.format.value.schemas.enable", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package io.debezium.server.iceberg.batchsizewait;

import io.debezium.server.iceberg.testresources.S3Minio;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
Expand All @@ -21,7 +23,8 @@
import org.junit.jupiter.api.Test;

@QuarkusTest
@TestProfile(DynamicBatchSizeWaitTest.DynamicBatchSizeWaitTestProfile.class)
@TestProfile(DynamicBatchSizeWaitTest.TestProfile.class)
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
class DynamicBatchSizeWaitTest {

@Inject
Expand Down Expand Up @@ -71,7 +74,7 @@ void shouldDecreaseSleepMs() {
Assertions.assertTrue(dynamicSleep.getWaitMs(120) <= 100);
}

public static class DynamicBatchSizeWaitTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {

@Override
public Map<String, String> getConfigOverrides() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.junit.jupiter.api.Test;

@QuarkusTest
@TestProfile(MaxBatchSizeWaitTest.MaxBatchSizeWaitTestProfile.class)
@TestProfile(MaxBatchSizeWaitTest.TestProfile.class)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
class MaxBatchSizeWaitTest extends BaseSparkTest {
Expand Down Expand Up @@ -63,7 +63,7 @@ public void testBatchsizeWait() throws Exception {
});
}

public static class MaxBatchSizeWaitTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.spark.sql.SparkSession;
import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.BeforeAll;
import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET;

/**
Expand All @@ -43,20 +44,30 @@ static void setup() {
sparkconf
.set("spark.ui.enabled", "false")
.set("spark.eventLog.enabled", "false")
.set("spark.hadoop.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY)
.set("spark.hadoop.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY)
// minio specific setting using minio as S3
.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:" + S3Minio.getMappedPort())
.set("spark.hadoop.fs.s3a.path.style.access", "true")
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

// enable iceberg SQL Extensions
// enable iceberg SQL Extensions and Catalog
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
// hadoop catalog
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hadoop")
//.set("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog")
.set("spark.sql.catalog.spark_catalog.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse")
.set("spark.sql.warehouse.dir", "s3a://" + S3_BUCKET + "/iceberg_warehouse");
.set("spark.sql.catalog.spark_catalog.warehouse", S3_BUCKET)
.set("spark.sql.catalog.spark_catalog.default-namespaces", CATALOG_TABLE_NAMESPACE)
.set("spark.sql.warehouse.dir", S3_BUCKET)
// // JdbcCatalog catalog, add additional catalog
// .set("spark.sql.defaultCatalog", ICEBERG_CATALOG_NAME)
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog")
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".warehouse", S3_BUCKET)
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".cache-enabled", "false")
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".catalog-impl", JdbcCatalog.class.getName())
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".default-namespaces", CATALOG_TABLE_NAMESPACE)
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".uri", JdbcCatalogDB.container.getJdbcUrl())
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".jdbc.user", JdbcCatalogDB.container.getUsername())
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".jdbc.password", JdbcCatalogDB.container.getPassword())
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString())
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.path-style-access", "true")
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.access-key-id", S3Minio.MINIO_ACCESS_KEY)
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.secret-access-key", S3Minio.MINIO_SECRET_KEY)
;

BaseSparkTest.spark = SparkSession
.builder()
Expand Down Expand Up @@ -124,8 +135,7 @@ protected HadoopCatalog getIcebergCatalog() {
}

public Dataset<Row> getTableData(String table) {
table = "debeziumevents.debeziumcdc_" + table.replace(".", "_");
//System.out.println("--loading-->" + table);
table = CATALOG_TABLE_NAMESPACE + ".debeziumcdc_" + table.replace(".", "_");
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg.testresources;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.iceberg.jdbc.JdbcCatalog;
import org.testcontainers.containers.MySQLContainer;

public class JdbcCatalogDB implements QuarkusTestResourceLifecycleManager {
public static MySQLContainer<?> container = new MySQLContainer<>();

@Override
public Map<String, String> start() {
container.start();

Map<String, String> config = new ConcurrentHashMap<>();

config.put("debezium.sink.iceberg.catalog-impl", JdbcCatalog.class.getName());
config.put("debezium.sink.iceberg.uri", container.getJdbcUrl());
config.put("debezium.sink.iceberg.jdbc.user", container.getUsername());
config.put("debezium.sink.iceberg.jdbc.password", container.getPassword());

return config;
}

@Override
public void stop() {
if (container != null) {
container.stop();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.DockerImageName;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET_NAME;

public class S3Minio implements QuarkusTestResourceLifecycleManager {

Expand Down Expand Up @@ -121,19 +123,29 @@ public Map<String, String> start() {
client.ignoreCertCheck();
client.makeBucket(MakeBucketArgs.builder()
.region(TestConfigSource.S3_REGION)
.bucket(TestConfigSource.S3_BUCKET)
.bucket(S3_BUCKET_NAME)
.build());
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("Minio Started!");
Map<String, String> params = new ConcurrentHashMap<>();
params.put("debezium.sink.iceberg.fs.s3a.endpoint", "http://localhost:" + container.getMappedPort(MINIO_DEFAULT_PORT).toString());
params.put("debezium.sink.iceberg.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY);
params.put("debezium.sink.iceberg.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY);
params.put("debezium.sink.iceberg.fs.s3a.path.style.access", "true");

return params;
Map<String, String> config = new ConcurrentHashMap<>();
// FOR JDBC CATALOG
config.put("debezium.sink.iceberg.s3.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString());
config.put("debezium.sink.iceberg.s3.path-style-access", "true");
config.put("debezium.sink.iceberg.s3.access-key-id", S3Minio.MINIO_ACCESS_KEY);
config.put("debezium.sink.iceberg.s3.secret-access-key", S3Minio.MINIO_SECRET_KEY);
config.put("debezium.sink.iceberg.s3.region", TestConfigSource.S3_REGION);
config.put("debezium.sink.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
config.put("debezium.sink.iceberg.warehouse", S3_BUCKET);
// FOR HADOOP CATALOG
config.put("debezium.sink.iceberg.fs.s3a.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString());
config.put("debezium.sink.iceberg.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY);
config.put("debezium.sink.iceberg.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY);
config.put("debezium.sink.iceberg.fs.s3a.path.style.access", "true");
config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET_NAME);

return config;
}


Expand Down
Loading

0 comments on commit 6804857

Please sign in to comment.