From f4afeb88470f5c66c416f1c19f1918aaee822120 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Wed, 15 Dec 2021 21:56:36 +0100 Subject: [PATCH] upgrade debezium and quarkus version --- debezium-server-iceberg-sink/pom.xml | 18 +++++++++-- .../iceberg/IcebergChangeConsumerTest.java | 15 --------- .../iceberg/testresources/BaseSparkTest.java | 18 ++++++++++- .../server/iceberg/testresources/S3Minio.java | 7 ++-- .../iceberg/testresources/SourceMysqlDB.java | 32 ++++++++----------- .../testresources/SourcePostgresqlDB.java | 25 ++++++--------- pom.xml | 16 +++++++--- 7 files changed, 70 insertions(+), 61 deletions(-) diff --git a/debezium-server-iceberg-sink/pom.xml b/debezium-server-iceberg-sink/pom.xml index 9f9e1faf..999301b9 100644 --- a/debezium-server-iceberg-sink/pom.xml +++ b/debezium-server-iceberg-sink/pom.xml @@ -157,7 +157,7 @@ io.minio minio - 8.0.3 + 8.3.1 test @@ -169,7 +169,7 @@ org.awaitility awaitility - 4.0.3 + 4.1.0 test @@ -188,7 +188,13 @@ org.testcontainers testcontainers - 1.15.1 + ${version.testcontainers} + test + + + org.testcontainers + mongodb + ${version.testcontainers} test @@ -201,6 +207,12 @@ debezium-connector-mysql test + + io.debezium + debezium-connector-mongodb + ${version.debezium} + test + diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 8e879800..ca99ebf0 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -18,9 +18,6 @@ import java.time.Duration; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -47,18 +44,6 @@ public class IcebergChangeConsumerTest extends BaseSparkTest { @ConfigProperty(name = "debezium.sink.type") String sinkType; - @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") - String tablePrefix; - @ConfigProperty(name = "debezium.sink.iceberg.warehouse") - String warehouseLocation; - @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") - String namespace; - - protected org.apache.iceberg.Table getTable(String table) { - HadoopCatalog catalog = getIcebergCatalog(); - return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_"))); - } - @Test public void testConsumingVariousDataTypes() throws Exception { assertEquals(sinkType, "iceberg"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index 990a2d6c..5a0dc9c2 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -14,12 +14,15 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.junit.jupiter.api.BeforeAll; import static io.debezium.server.iceberg.ConfigSource.S3_BUCKET; @@ -29,12 +32,25 @@ * @author Ismail Simsek */ public class BaseSparkTest { + + @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") + String tablePrefix; + @ConfigProperty(name = "debezium.sink.iceberg.warehouse") + String warehouseLocation; + @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") + String namespace; + protected static final SparkConf sparkconf = new SparkConf() .setAppName("CDC-S3-Batch-Spark-Sink") .setMaster("local[2]"); private static final String SPARK_PROP_PREFIX = "debezium.sink.sparkbatch."; protected static SparkSession spark; + protected org.apache.iceberg.Table getTable(String table) { + HadoopCatalog catalog = getIcebergCatalog(); + return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_"))); + } + protected HadoopCatalog getIcebergCatalog() { // loop and set hadoopConf Configuration hadoopConf = new Configuration(); @@ -153,7 +169,7 @@ public static int mysqlLoadTestDataTable(int numRows) throws Exception { } public Dataset getTableData(String table) { - return spark.newSession().sql("SELECT input_file_name() as input_file, * FROM debeziumevents.debeziumcdc_" + table.replace(".", "_")); + return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM debeziumevents.debeziumcdc_" + table.replace(".", "_")); } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java index d36bc1ad..5832ba93 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java @@ -38,10 +38,8 @@ public class S3Minio implements QuarkusTestResourceLifecycleManager { static final String DEFAULT_IMAGE = "minio/minio:latest"; static final String DEFAULT_STORAGE_DIRECTORY = "/data"; static final String HEALTH_ENDPOINT = "/minio/health/ready"; - public static MinioClient client; - - static private final GenericContainer container = new GenericContainer<>(DockerImageName.parse(DEFAULT_IMAGE)) + .withExposedPorts(MINIO_DEFAULT_PORT) .waitingFor(new HttpWaitStrategy() .forPath(HEALTH_ENDPOINT) .forPort(MINIO_DEFAULT_PORT) @@ -50,6 +48,7 @@ public class S3Minio implements QuarkusTestResourceLifecycleManager { .withEnv("MINIO_SECRET_KEY", MINIO_SECRET_KEY) .withEnv("MINIO_REGION_NAME", ConfigSource.S3_REGION) .withCommand("server " + DEFAULT_STORAGE_DIRECTORY); + public static MinioClient client; public static List getObjectList(String bucketName) { List objects = new ArrayList<>(); @@ -71,7 +70,7 @@ public static void listFiles() { try { List bucketList = client.listBuckets(); for (Bucket bucket : bucketList) { - System.out.printf("Bucket:%s ROOT", bucket.name()); + System.out.printf("Bucket:%s ROOT", bucket.name()); Iterable> results = client.listObjects(ListObjectsArgs.builder().bucket(bucket.name()).recursive(true).build()); for (Result result : results) { Item item = result.get(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java index de315aea..4474479d 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java @@ -30,24 +30,22 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager { public static final String MYSQL_PASSWORD = "mysqlpw"; public static final String MYSQL_DEBEZIUM_USER = "debezium"; public static final String MYSQL_DEBEZIUM_PASSWORD = "dbz"; - public static final String MYSQL_IMAGE = "debezium/example-mysql:1.5"; + public static final String MYSQL_IMAGE = "debezium/example-mysql:1.7.0.Final"; public static final String MYSQL_HOST = "127.0.0.1"; public static final String MYSQL_DATABASE = "inventory"; public static final Integer MYSQL_PORT_DEFAULT = 3306; private static final Logger LOGGER = LoggerFactory.getLogger(SourceMysqlDB.class); - static private GenericContainer container; - - @Override - public void stop() { - if (container != null) { - container.stop(); - } - } + static private final GenericContainer container = new GenericContainer<>(MYSQL_IMAGE) + .waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2)) + .withEnv("MYSQL_USER", MYSQL_USER) + .withEnv("MYSQL_PASSWORD", MYSQL_PASSWORD) + .withEnv("MYSQL_ROOT_PASSWORD", MYSQL_ROOT_PASSWORD) + .withStartupTimeout(Duration.ofSeconds(30)); public static void runSQL(String query) throws SQLException, ClassNotFoundException { try { - String url = "jdbc:mysql://" + MYSQL_HOST + ":" + getMappedPort() + "/" + MYSQL_DATABASE + "?useSSL=false"; + String url = "jdbc:mysql://" + MYSQL_HOST + ":" + container.getMappedPort(MYSQL_PORT_DEFAULT) + "/" + MYSQL_DATABASE + "?useSSL=false"; Class.forName("com.mysql.cj.jdbc.Driver"); Connection con = DriverManager.getConnection(url, MYSQL_USER, MYSQL_PASSWORD); Statement st = con.createStatement(); @@ -61,12 +59,6 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept @Override public Map start() { - container = new GenericContainer<>(MYSQL_IMAGE) - .waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2)) - .withEnv("MYSQL_USER", MYSQL_USER) - .withEnv("MYSQL_PASSWORD", MYSQL_PASSWORD) - .withEnv("MYSQL_ROOT_PASSWORD", MYSQL_ROOT_PASSWORD) - .withStartupTimeout(Duration.ofSeconds(30)); container.start(); Map params = new ConcurrentHashMap<>(); @@ -78,9 +70,11 @@ public Map start() { return params; } - public static Integer getMappedPort() { - return container.getMappedPort(MYSQL_PORT_DEFAULT); + @Override + public void stop() { + if (container != null) { + container.stop(); + } } - } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java index ca37bd1c..556bbf0d 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java @@ -28,17 +28,24 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager { public static final String POSTGRES_USER = "postgres"; public static final String POSTGRES_PASSWORD = "postgres"; public static final String POSTGRES_DBNAME = "postgres"; - public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.5"; + public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.7.0.Final"; public static final String POSTGRES_HOST = "localhost"; public static final Integer POSTGRES_PORT_DEFAULT = 5432; private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class); - private static GenericContainer container; + private static GenericContainer container = new GenericContainer<>(POSTGRES_IMAGE) + .waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2)) + .withEnv("POSTGRES_USER", POSTGRES_USER) + .withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD) + .withEnv("POSTGRES_DB", POSTGRES_DBNAME) + .withEnv("POSTGRES_INITDB_ARGS", "-E UTF8") + .withEnv("LANG", "en_US.utf8") + .withStartupTimeout(Duration.ofSeconds(30)); public static void runSQL(String query) throws SQLException, ClassNotFoundException { try { - String url = "jdbc:postgresql://" + POSTGRES_HOST + ":" + getMappedPort() + "/" + POSTGRES_DBNAME; + String url = "jdbc:postgresql://" + POSTGRES_HOST + ":" + container.getMappedPort(POSTGRES_PORT_DEFAULT) + "/" + POSTGRES_DBNAME; Class.forName("org.postgresql.Driver"); Connection con = DriverManager.getConnection(url, POSTGRES_USER, POSTGRES_PASSWORD); Statement st = con.createStatement(); @@ -50,20 +57,8 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept } } - public static Integer getMappedPort() { - return container.getMappedPort(POSTGRES_PORT_DEFAULT); - } - @Override public Map start() { - container = new GenericContainer<>(POSTGRES_IMAGE) - .waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2)) - .withEnv("POSTGRES_USER", POSTGRES_USER) - .withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD) - .withEnv("POSTGRES_DB", POSTGRES_DBNAME) - .withEnv("POSTGRES_INITDB_ARGS", "-E UTF8") - .withEnv("LANG", "en_US.utf8") - .withStartupTimeout(Duration.ofSeconds(30)); container.start(); Map params = new ConcurrentHashMap<>(); diff --git a/pom.xml b/pom.xml index 42f88de9..e15f29a0 100644 --- a/pom.xml +++ b/pom.xml @@ -27,14 +27,15 @@ 8 true - 3.0.7 + 3.0.9 3.3.0 2.12.6 - 0.12.0 + 0.12.1 3.1.2 3.3.1 - 2.16.88 - 1.11.1 + 2.17.75 + 1.12.2 + 1.15.3 1.8.0.Final 8.0.27 @@ -136,6 +137,13 @@ ${version.jackson} true + + + com.squareup.okhttp3 + okhttp + 4.9.2 + test +