Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve tests and Add JDBC catalog with S3FileIO example config #203

Merged
merged 5 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ on:

env:
SPARK_LOCAL_IP: 127.0.0.1
AWS_REGION: us-east-1

jobs:
build:
Expand All @@ -33,4 +34,4 @@ jobs:
distribution: 'temurin'
java-version: 11
- name: Build with Maven
run: mvn -B package --file pom.xml
run: mvn -B package --file pom.xml -Dsurefire.skipAfterFailureCount=1
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

# S3 config using JdbcCatalog catalog And S3FileIO
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog
debezium.sink.iceberg.uri=jdbc_db_url
debezium.sink.iceberg.jdbc.user=my_user
debezium.sink.iceberg.jdbc.password=my_password
debezium.sink.iceberg.table-namespace=debeziumdata
debezium.sink.iceberg.catalog-name=iceberg
debezium.sink.iceberg.warehouse=s3a://my_bucket/iceberg_warehouse
# Use S3FileIO
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.s3.endpoint=http://localhost:9000
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY
debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY

# S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO
### using mino as S3
debezium.sink.iceberg.s3.endpoint=http://localhost:9000;
Expand Down
6 changes: 6 additions & 0 deletions debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@Disabled // @TODO fix
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMangoDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMangodbTest.IcebergChangeConsumerMangodbTestProfile.class)
@TestProfile(IcebergChangeConsumerMangodbTest.TestProfile.class)
public class IcebergChangeConsumerMangodbTest extends BaseSparkTest {

@Test
Expand All @@ -52,7 +52,7 @@ public void testSimpleUpload() {
});
}

public static class IcebergChangeConsumerMangodbTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@Disabled // @TODO remove spark with antlr4 version
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerMysqlTest.IcebergChangeConsumerMysqlTestProfile.class)
@TestProfile(IcebergChangeConsumerMysqlTest.TestProfile.class)
public class IcebergChangeConsumerMysqlTest extends BaseTest {

@Test
Expand Down Expand Up @@ -80,7 +80,7 @@ public void testSimpleUpload() throws Exception {

}

public static class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerTest.IcebergChangeConsumerTestProfile.class)
@TestProfile(IcebergChangeConsumerTest.TestProfile.class)
public class IcebergChangeConsumerTest extends BaseSparkTest {

protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class);
Expand Down Expand Up @@ -307,7 +307,7 @@ public void testMapDestination() {
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2"));
}

public static class IcebergChangeConsumerTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.IcebergChangeConsumerUpsertTestDeleteDeletesProfile.class)
@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.TestProfile.class)
public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest {

@Inject
Expand Down Expand Up @@ -148,7 +148,7 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.where("first_name= 'user2'").count(), 0);
}

public static class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergChangeConsumerUpsertTest.IcebergChangeConsumerUpsertTestProfile.class)
@TestProfile(IcebergChangeConsumerUpsertTest.TestProfile.class)
public class IcebergChangeConsumerUpsertTest extends BaseSparkTest {

@Inject
Expand Down Expand Up @@ -170,7 +170,7 @@ public void testSimpleUpsertNoKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1 AND __op= 'c' AND first_name= 'user2'").count(), 2);
}

public static class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@TestProfile(IcebergEventsChangeConsumerTest.IcebergEventsChangeConsumerTestProfile.class)
@TestProfile(IcebergEventsChangeConsumerTest.TestProfile.class)
public class IcebergEventsChangeConsumerTest extends BaseSparkTest {
@ConfigProperty(name = "debezium.sink.type")
String sinkType;
Expand All @@ -56,7 +56,7 @@ public void testSimpleUpload() {
S3Minio.listFiles();
}

public static class IcebergEventsChangeConsumerTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
public class TestConfigSource implements ConfigSource {

public static final String S3_REGION = "us-east-1";
public static final String S3_BUCKET = "test-bucket";
public static final String S3_BUCKET_NAME = "test-bucket";
public static final String CATALOG_TABLE_NAMESPACE = "debeziumevents";
public static final String ICEBERG_CATALOG_NAME = "iceberg";
public static final String S3_BUCKET = "s3a://" + S3_BUCKET_NAME + "/iceberg_warehouse";
protected Map<String, String> config = new HashMap<>();


Expand All @@ -38,12 +41,10 @@ public TestConfigSource() {
config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds!
// iceberg
config.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_");
config.put("debezium.sink.iceberg.table-namespace", "debeziumevents");
config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET);
config.put("debezium.sink.iceberg.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse");
config.put("debezium.sink.iceberg.table-namespace", CATALOG_TABLE_NAMESPACE);
config.put("debezium.sink.iceberg.catalog-name", ICEBERG_CATALOG_NAME);
// use hadoop catalog for tests
config.put("debezium.sink.iceberg.type", "hadoop");
config.put("debezium.sink.iceberg.catalog-name", "mycatalog");
//config.put("debezium.sink.iceberg.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog");

// enable disable schema
config.put("debezium.format.value.schemas.enable", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package io.debezium.server.iceberg.batchsizewait;

import io.debezium.server.iceberg.testresources.S3Minio;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
Expand All @@ -21,7 +23,8 @@
import org.junit.jupiter.api.Test;

@QuarkusTest
@TestProfile(DynamicBatchSizeWaitTest.DynamicBatchSizeWaitTestProfile.class)
@TestProfile(DynamicBatchSizeWaitTest.TestProfile.class)
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
class DynamicBatchSizeWaitTest {

@Inject
Expand Down Expand Up @@ -71,7 +74,7 @@ void shouldDecreaseSleepMs() {
Assertions.assertTrue(dynamicSleep.getWaitMs(120) <= 100);
}

public static class DynamicBatchSizeWaitTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {

@Override
public Map<String, String> getConfigOverrides() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.junit.jupiter.api.Test;

@QuarkusTest
@TestProfile(MaxBatchSizeWaitTest.MaxBatchSizeWaitTestProfile.class)
@TestProfile(MaxBatchSizeWaitTest.TestProfile.class)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
class MaxBatchSizeWaitTest extends BaseSparkTest {
Expand Down Expand Up @@ -63,7 +63,7 @@ public void testBatchsizeWait() throws Exception {
});
}

public static class MaxBatchSizeWaitTestProfile implements QuarkusTestProfile {
public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.spark.sql.SparkSession;
import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.BeforeAll;
import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET;

/**
Expand All @@ -43,20 +44,30 @@ static void setup() {
sparkconf
.set("spark.ui.enabled", "false")
.set("spark.eventLog.enabled", "false")
.set("spark.hadoop.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY)
.set("spark.hadoop.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY)
// minio specific setting using minio as S3
.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:" + S3Minio.getMappedPort())
.set("spark.hadoop.fs.s3a.path.style.access", "true")
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

// enable iceberg SQL Extensions
// enable iceberg SQL Extensions and Catalog
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
// hadoop catalog
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hadoop")
//.set("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog")
.set("spark.sql.catalog.spark_catalog.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse")
.set("spark.sql.warehouse.dir", "s3a://" + S3_BUCKET + "/iceberg_warehouse");
.set("spark.sql.catalog.spark_catalog.warehouse", S3_BUCKET)
.set("spark.sql.catalog.spark_catalog.default-namespaces", CATALOG_TABLE_NAMESPACE)
.set("spark.sql.warehouse.dir", S3_BUCKET)
// // JdbcCatalog catalog, add additional catalog
// .set("spark.sql.defaultCatalog", ICEBERG_CATALOG_NAME)
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog")
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".warehouse", S3_BUCKET)
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".cache-enabled", "false")
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".catalog-impl", JdbcCatalog.class.getName())
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".default-namespaces", CATALOG_TABLE_NAMESPACE)
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".uri", JdbcCatalogDB.container.getJdbcUrl())
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".jdbc.user", JdbcCatalogDB.container.getUsername())
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".jdbc.password", JdbcCatalogDB.container.getPassword())
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString())
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.path-style-access", "true")
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.access-key-id", S3Minio.MINIO_ACCESS_KEY)
// .set("spark.sql.catalog." + ICEBERG_CATALOG_NAME + ".s3.secret-access-key", S3Minio.MINIO_SECRET_KEY)
;

BaseSparkTest.spark = SparkSession
.builder()
Expand Down Expand Up @@ -124,8 +135,7 @@ protected HadoopCatalog getIcebergCatalog() {
}

public Dataset<Row> getTableData(String table) {
table = "debeziumevents.debeziumcdc_" + table.replace(".", "_");
//System.out.println("--loading-->" + table);
table = CATALOG_TABLE_NAMESPACE + ".debeziumcdc_" + table.replace(".", "_");
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg.testresources;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.iceberg.jdbc.JdbcCatalog;
import org.testcontainers.containers.MySQLContainer;

public class JdbcCatalogDB implements QuarkusTestResourceLifecycleManager {
public static MySQLContainer<?> container = new MySQLContainer<>();

@Override
public Map<String, String> start() {
container.start();

Map<String, String> config = new ConcurrentHashMap<>();

config.put("debezium.sink.iceberg.catalog-impl", JdbcCatalog.class.getName());
config.put("debezium.sink.iceberg.uri", container.getJdbcUrl());
config.put("debezium.sink.iceberg.jdbc.user", container.getUsername());
config.put("debezium.sink.iceberg.jdbc.password", container.getPassword());

return config;
}

@Override
public void stop() {
if (container != null) {
container.stop();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.DockerImageName;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET_NAME;

public class S3Minio implements QuarkusTestResourceLifecycleManager {

Expand Down Expand Up @@ -121,19 +123,29 @@ public Map<String, String> start() {
client.ignoreCertCheck();
client.makeBucket(MakeBucketArgs.builder()
.region(TestConfigSource.S3_REGION)
.bucket(TestConfigSource.S3_BUCKET)
.bucket(S3_BUCKET_NAME)
.build());
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("Minio Started!");
Map<String, String> params = new ConcurrentHashMap<>();
params.put("debezium.sink.iceberg.fs.s3a.endpoint", "http://localhost:" + container.getMappedPort(MINIO_DEFAULT_PORT).toString());
params.put("debezium.sink.iceberg.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY);
params.put("debezium.sink.iceberg.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY);
params.put("debezium.sink.iceberg.fs.s3a.path.style.access", "true");

return params;
Map<String, String> config = new ConcurrentHashMap<>();
// FOR JDBC CATALOG
config.put("debezium.sink.iceberg.s3.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString());
config.put("debezium.sink.iceberg.s3.path-style-access", "true");
config.put("debezium.sink.iceberg.s3.access-key-id", S3Minio.MINIO_ACCESS_KEY);
config.put("debezium.sink.iceberg.s3.secret-access-key", S3Minio.MINIO_SECRET_KEY);
config.put("debezium.sink.iceberg.s3.region", TestConfigSource.S3_REGION);
config.put("debezium.sink.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
config.put("debezium.sink.iceberg.warehouse", S3_BUCKET);
// FOR HADOOP CATALOG
config.put("debezium.sink.iceberg.fs.s3a.endpoint", "http://localhost:" + S3Minio.getMappedPort().toString());
config.put("debezium.sink.iceberg.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY);
config.put("debezium.sink.iceberg.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY);
config.put("debezium.sink.iceberg.fs.s3a.path.style.access", "true");
config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET_NAME);

return config;
}


Expand Down
Loading