Skip to content

Commit

Permalink
upgrade debezium and quarkus version (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Dec 30, 2021
1 parent e618b47 commit 98cf398
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 61 deletions.
18 changes: 15 additions & 3 deletions debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.0.3</version>
<version>8.3.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -169,7 +169,7 @@
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.3</version>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -188,7 +188,13 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.15.1</version>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -201,6 +207,12 @@
<artifactId>debezium-connector-mysql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>${version.debezium}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
import java.time.Duration;

import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -47,18 +44,6 @@ public class IcebergChangeConsumerTest extends BaseSparkTest {
@ConfigProperty(name = "debezium.sink.type")
String sinkType;

@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.warehouse")
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

protected org.apache.iceberg.Table getTable(String table) {
HadoopCatalog catalog = getIcebergCatalog();
return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
}

@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.BeforeAll;
import static io.debezium.server.iceberg.ConfigSource.S3_BUCKET;

Expand All @@ -29,12 +32,25 @@
* @author Ismail Simsek
*/
public class BaseSparkTest {

@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.warehouse")
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

protected static final SparkConf sparkconf = new SparkConf()
.setAppName("CDC-S3-Batch-Spark-Sink")
.setMaster("local[2]");
private static final String SPARK_PROP_PREFIX = "debezium.sink.sparkbatch.";
protected static SparkSession spark;

protected org.apache.iceberg.Table getTable(String table) {
HadoopCatalog catalog = getIcebergCatalog();
return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
}

protected HadoopCatalog getIcebergCatalog() {
// loop and set hadoopConf
Configuration hadoopConf = new Configuration();
Expand Down Expand Up @@ -153,7 +169,7 @@ public static int mysqlLoadTestDataTable(int numRows) throws Exception {
}

public Dataset<Row> getTableData(String table) {
return spark.newSession().sql("SELECT input_file_name() as input_file, * FROM debeziumevents.debeziumcdc_" + table.replace(".", "_"));
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM debeziumevents.debeziumcdc_" + table.replace(".", "_"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ public class S3Minio implements QuarkusTestResourceLifecycleManager {
static final String DEFAULT_IMAGE = "minio/minio:latest";
static final String DEFAULT_STORAGE_DIRECTORY = "/data";
static final String HEALTH_ENDPOINT = "/minio/health/ready";
public static MinioClient client;


static private final GenericContainer<?> container = new GenericContainer<>(DockerImageName.parse(DEFAULT_IMAGE))
.withExposedPorts(MINIO_DEFAULT_PORT)
.waitingFor(new HttpWaitStrategy()
.forPath(HEALTH_ENDPOINT)
.forPort(MINIO_DEFAULT_PORT)
Expand All @@ -50,6 +48,7 @@ public class S3Minio implements QuarkusTestResourceLifecycleManager {
.withEnv("MINIO_SECRET_KEY", MINIO_SECRET_KEY)
.withEnv("MINIO_REGION_NAME", ConfigSource.S3_REGION)
.withCommand("server " + DEFAULT_STORAGE_DIRECTORY);
public static MinioClient client;

public static List<Item> getObjectList(String bucketName) {
List<Item> objects = new ArrayList<>();
Expand All @@ -71,7 +70,7 @@ public static void listFiles() {
try {
List<Bucket> bucketList = client.listBuckets();
for (Bucket bucket : bucketList) {
System.out.printf("Bucket:%s ROOT", bucket.name());
System.out.printf("Bucket:%s ROOT", bucket.name());
Iterable<Result<Item>> results = client.listObjects(ListObjectsArgs.builder().bucket(bucket.name()).recursive(true).build());
for (Result<Item> result : results) {
Item item = result.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,22 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager {
public static final String MYSQL_PASSWORD = "mysqlpw";
public static final String MYSQL_DEBEZIUM_USER = "debezium";
public static final String MYSQL_DEBEZIUM_PASSWORD = "dbz";
public static final String MYSQL_IMAGE = "debezium/example-mysql:1.5";
public static final String MYSQL_IMAGE = "debezium/example-mysql:1.7.0.Final";
public static final String MYSQL_HOST = "127.0.0.1";
public static final String MYSQL_DATABASE = "inventory";
public static final Integer MYSQL_PORT_DEFAULT = 3306;
private static final Logger LOGGER = LoggerFactory.getLogger(SourceMysqlDB.class);

static private GenericContainer<?> container;

@Override
public void stop() {
if (container != null) {
container.stop();
}
}
static private final GenericContainer<?> container = new GenericContainer<>(MYSQL_IMAGE)
.waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2))
.withEnv("MYSQL_USER", MYSQL_USER)
.withEnv("MYSQL_PASSWORD", MYSQL_PASSWORD)
.withEnv("MYSQL_ROOT_PASSWORD", MYSQL_ROOT_PASSWORD)
.withStartupTimeout(Duration.ofSeconds(30));

public static void runSQL(String query) throws SQLException, ClassNotFoundException {
try {
String url = "jdbc:mysql://" + MYSQL_HOST + ":" + getMappedPort() + "/" + MYSQL_DATABASE + "?useSSL=false";
String url = "jdbc:mysql://" + MYSQL_HOST + ":" + container.getMappedPort(MYSQL_PORT_DEFAULT) + "/" + MYSQL_DATABASE + "?useSSL=false";
Class.forName("com.mysql.cj.jdbc.Driver");
Connection con = DriverManager.getConnection(url, MYSQL_USER, MYSQL_PASSWORD);
Statement st = con.createStatement();
Expand All @@ -61,12 +59,6 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept

@Override
public Map<String, String> start() {
container = new GenericContainer<>(MYSQL_IMAGE)
.waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2))
.withEnv("MYSQL_USER", MYSQL_USER)
.withEnv("MYSQL_PASSWORD", MYSQL_PASSWORD)
.withEnv("MYSQL_ROOT_PASSWORD", MYSQL_ROOT_PASSWORD)
.withStartupTimeout(Duration.ofSeconds(30));
container.start();

Map<String, String> params = new ConcurrentHashMap<>();
Expand All @@ -78,9 +70,11 @@ public Map<String, String> start() {
return params;
}

public static Integer getMappedPort() {
return container.getMappedPort(MYSQL_PORT_DEFAULT);
@Override
public void stop() {
if (container != null) {
container.stop();
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,24 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager {
public static final String POSTGRES_USER = "postgres";
public static final String POSTGRES_PASSWORD = "postgres";
public static final String POSTGRES_DBNAME = "postgres";
public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.5";
public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.7.0.Final";
public static final String POSTGRES_HOST = "localhost";
public static final Integer POSTGRES_PORT_DEFAULT = 5432;
private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class);

private static GenericContainer<?> container;
private static GenericContainer<?> container = new GenericContainer<>(POSTGRES_IMAGE)
.waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2))
.withEnv("POSTGRES_USER", POSTGRES_USER)
.withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
.withEnv("POSTGRES_DB", POSTGRES_DBNAME)
.withEnv("POSTGRES_INITDB_ARGS", "-E UTF8")
.withEnv("LANG", "en_US.utf8")
.withStartupTimeout(Duration.ofSeconds(30));

public static void runSQL(String query) throws SQLException, ClassNotFoundException {
try {

String url = "jdbc:postgresql://" + POSTGRES_HOST + ":" + getMappedPort() + "/" + POSTGRES_DBNAME;
String url = "jdbc:postgresql://" + POSTGRES_HOST + ":" + container.getMappedPort(POSTGRES_PORT_DEFAULT) + "/" + POSTGRES_DBNAME;
Class.forName("org.postgresql.Driver");
Connection con = DriverManager.getConnection(url, POSTGRES_USER, POSTGRES_PASSWORD);
Statement st = con.createStatement();
Expand All @@ -50,20 +57,8 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept
}
}

public static Integer getMappedPort() {
return container.getMappedPort(POSTGRES_PORT_DEFAULT);
}

@Override
public Map<String, String> start() {
container = new GenericContainer<>(POSTGRES_IMAGE)
.waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2))
.withEnv("POSTGRES_USER", POSTGRES_USER)
.withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
.withEnv("POSTGRES_DB", POSTGRES_DBNAME)
.withEnv("POSTGRES_INITDB_ARGS", "-E UTF8")
.withEnv("LANG", "en_US.utf8")
.withStartupTimeout(Duration.ofSeconds(30));
container.start();

Map<String, String> params = new ConcurrentHashMap<>();
Expand Down
16 changes: 12 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
<release>8</release>
<skipITs>true</skipITs>

<version.groovy>3.0.7</version.groovy>
<version.groovy>3.0.9</version.groovy>
<version.assembly.plugin>3.3.0</version.assembly.plugin>
<version.jackson>2.12.6</version.jackson>
<version.iceberg>0.12.0</version.iceberg>
<version.iceberg>0.12.1</version.iceberg>
<version.spark>3.1.2</version.spark>
<version.hadoop>3.3.1</version.hadoop>
<version.awssdk>2.16.88</version.awssdk>
<version.parquet>1.11.1</version.parquet>
<version.awssdk>2.17.75</version.awssdk>
<version.parquet>1.12.2</version.parquet>
<version.testcontainers>1.15.3</version.testcontainers>
<!-- Debezium -->
<version.debezium>1.8.0.Final</version.debezium>
<version.mysql.driver>8.0.27</version.mysql.driver>
Expand Down Expand Up @@ -136,6 +137,13 @@
<version>${version.jackson}</version>
<optional>true</optional>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.2</version>
<scope>test</scope>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down

0 comments on commit 98cf398

Please sign in to comment.