Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade debezium and dependencies #70

Merged
merged 1 commit into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.0.3</version>
<version>8.3.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -169,7 +169,7 @@
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.3</version>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -188,7 +188,13 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.15.1</version>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -201,6 +207,12 @@
<artifactId>debezium-connector-mysql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>${version.debezium}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
import java.time.Duration;

import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -47,18 +44,6 @@ public class IcebergChangeConsumerTest extends BaseSparkTest {
@ConfigProperty(name = "debezium.sink.type")
String sinkType;

@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.warehouse")
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

protected org.apache.iceberg.Table getTable(String table) {
HadoopCatalog catalog = getIcebergCatalog();
return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
}

@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.BeforeAll;
import static io.debezium.server.iceberg.ConfigSource.S3_BUCKET;

Expand All @@ -29,12 +32,25 @@
* @author Ismail Simsek
*/
public class BaseSparkTest {

@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.warehouse")
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

protected static final SparkConf sparkconf = new SparkConf()
.setAppName("CDC-S3-Batch-Spark-Sink")
.setMaster("local[2]");
private static final String SPARK_PROP_PREFIX = "debezium.sink.sparkbatch.";
protected static SparkSession spark;

protected org.apache.iceberg.Table getTable(String table) {
HadoopCatalog catalog = getIcebergCatalog();
return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
}

protected HadoopCatalog getIcebergCatalog() {
// loop and set hadoopConf
Configuration hadoopConf = new Configuration();
Expand Down Expand Up @@ -153,7 +169,7 @@ public static int mysqlLoadTestDataTable(int numRows) throws Exception {
}

public Dataset<Row> getTableData(String table) {
return spark.newSession().sql("SELECT input_file_name() as input_file, * FROM debeziumevents.debeziumcdc_" + table.replace(".", "_"));
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM debeziumevents.debeziumcdc_" + table.replace(".", "_"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ public class S3Minio implements QuarkusTestResourceLifecycleManager {
static final String DEFAULT_IMAGE = "minio/minio:latest";
static final String DEFAULT_STORAGE_DIRECTORY = "/data";
static final String HEALTH_ENDPOINT = "/minio/health/ready";
public static MinioClient client;


static private final GenericContainer<?> container = new GenericContainer<>(DockerImageName.parse(DEFAULT_IMAGE))
.withExposedPorts(MINIO_DEFAULT_PORT)
.waitingFor(new HttpWaitStrategy()
.forPath(HEALTH_ENDPOINT)
.forPort(MINIO_DEFAULT_PORT)
Expand All @@ -50,6 +48,7 @@ public class S3Minio implements QuarkusTestResourceLifecycleManager {
.withEnv("MINIO_SECRET_KEY", MINIO_SECRET_KEY)
.withEnv("MINIO_REGION_NAME", ConfigSource.S3_REGION)
.withCommand("server " + DEFAULT_STORAGE_DIRECTORY);
public static MinioClient client;

public static List<Item> getObjectList(String bucketName) {
List<Item> objects = new ArrayList<>();
Expand All @@ -71,7 +70,7 @@ public static void listFiles() {
try {
List<Bucket> bucketList = client.listBuckets();
for (Bucket bucket : bucketList) {
System.out.printf("Bucket:%s ROOT", bucket.name());
System.out.printf("Bucket:%s ROOT", bucket.name());
Iterable<Result<Item>> results = client.listObjects(ListObjectsArgs.builder().bucket(bucket.name()).recursive(true).build());
for (Result<Item> result : results) {
Item item = result.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,22 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager {
public static final String MYSQL_PASSWORD = "mysqlpw";
public static final String MYSQL_DEBEZIUM_USER = "debezium";
public static final String MYSQL_DEBEZIUM_PASSWORD = "dbz";
public static final String MYSQL_IMAGE = "debezium/example-mysql:1.5";
public static final String MYSQL_IMAGE = "debezium/example-mysql:1.7.0.Final";
public static final String MYSQL_HOST = "127.0.0.1";
public static final String MYSQL_DATABASE = "inventory";
public static final Integer MYSQL_PORT_DEFAULT = 3306;
private static final Logger LOGGER = LoggerFactory.getLogger(SourceMysqlDB.class);

static private GenericContainer<?> container;

@Override
public void stop() {
if (container != null) {
container.stop();
}
}
static private final GenericContainer<?> container = new GenericContainer<>(MYSQL_IMAGE)
.waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2))
.withEnv("MYSQL_USER", MYSQL_USER)
.withEnv("MYSQL_PASSWORD", MYSQL_PASSWORD)
.withEnv("MYSQL_ROOT_PASSWORD", MYSQL_ROOT_PASSWORD)
.withStartupTimeout(Duration.ofSeconds(30));

public static void runSQL(String query) throws SQLException, ClassNotFoundException {
try {
String url = "jdbc:mysql://" + MYSQL_HOST + ":" + getMappedPort() + "/" + MYSQL_DATABASE + "?useSSL=false";
String url = "jdbc:mysql://" + MYSQL_HOST + ":" + container.getMappedPort(MYSQL_PORT_DEFAULT) + "/" + MYSQL_DATABASE + "?useSSL=false";
Class.forName("com.mysql.cj.jdbc.Driver");
Connection con = DriverManager.getConnection(url, MYSQL_USER, MYSQL_PASSWORD);
Statement st = con.createStatement();
Expand All @@ -61,12 +59,6 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept

@Override
public Map<String, String> start() {
container = new GenericContainer<>(MYSQL_IMAGE)
.waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2))
.withEnv("MYSQL_USER", MYSQL_USER)
.withEnv("MYSQL_PASSWORD", MYSQL_PASSWORD)
.withEnv("MYSQL_ROOT_PASSWORD", MYSQL_ROOT_PASSWORD)
.withStartupTimeout(Duration.ofSeconds(30));
container.start();

Map<String, String> params = new ConcurrentHashMap<>();
Expand All @@ -78,9 +70,11 @@ public Map<String, String> start() {
return params;
}

public static Integer getMappedPort() {
return container.getMappedPort(MYSQL_PORT_DEFAULT);
@Override
public void stop() {
if (container != null) {
container.stop();
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,24 @@ public class SourcePostgresqlDB implements QuarkusTestResourceLifecycleManager {
public static final String POSTGRES_USER = "postgres";
public static final String POSTGRES_PASSWORD = "postgres";
public static final String POSTGRES_DBNAME = "postgres";
public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.5";
public static final String POSTGRES_IMAGE = "debezium/example-postgres:1.7.0.Final";
public static final String POSTGRES_HOST = "localhost";
public static final Integer POSTGRES_PORT_DEFAULT = 5432;
private static final Logger LOGGER = LoggerFactory.getLogger(SourcePostgresqlDB.class);

private static GenericContainer<?> container;
private static GenericContainer<?> container = new GenericContainer<>(POSTGRES_IMAGE)
.waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2))
.withEnv("POSTGRES_USER", POSTGRES_USER)
.withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
.withEnv("POSTGRES_DB", POSTGRES_DBNAME)
.withEnv("POSTGRES_INITDB_ARGS", "-E UTF8")
.withEnv("LANG", "en_US.utf8")
.withStartupTimeout(Duration.ofSeconds(30));

public static void runSQL(String query) throws SQLException, ClassNotFoundException {
try {

String url = "jdbc:postgresql://" + POSTGRES_HOST + ":" + getMappedPort() + "/" + POSTGRES_DBNAME;
String url = "jdbc:postgresql://" + POSTGRES_HOST + ":" + container.getMappedPort(POSTGRES_PORT_DEFAULT) + "/" + POSTGRES_DBNAME;
Class.forName("org.postgresql.Driver");
Connection con = DriverManager.getConnection(url, POSTGRES_USER, POSTGRES_PASSWORD);
Statement st = con.createStatement();
Expand All @@ -50,20 +57,8 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept
}
}

public static Integer getMappedPort() {
return container.getMappedPort(POSTGRES_PORT_DEFAULT);
}

@Override
public Map<String, String> start() {
container = new GenericContainer<>(POSTGRES_IMAGE)
.waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2))
.withEnv("POSTGRES_USER", POSTGRES_USER)
.withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
.withEnv("POSTGRES_DB", POSTGRES_DBNAME)
.withEnv("POSTGRES_INITDB_ARGS", "-E UTF8")
.withEnv("LANG", "en_US.utf8")
.withStartupTimeout(Duration.ofSeconds(30));
container.start();

Map<String, String> params = new ConcurrentHashMap<>();
Expand Down
16 changes: 12 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
<release>8</release>
<skipITs>true</skipITs>

<version.groovy>3.0.7</version.groovy>
<version.groovy>3.0.9</version.groovy>
<version.assembly.plugin>3.3.0</version.assembly.plugin>
<version.jackson>2.12.6</version.jackson>
<version.iceberg>0.12.0</version.iceberg>
<version.iceberg>0.12.1</version.iceberg>
<version.spark>3.1.2</version.spark>
<version.hadoop>3.3.1</version.hadoop>
<version.awssdk>2.16.88</version.awssdk>
<version.parquet>1.11.1</version.parquet>
<version.awssdk>2.17.75</version.awssdk>
<version.parquet>1.12.2</version.parquet>
<version.testcontainers>1.15.3</version.testcontainers>
<!-- Debezium -->
<version.debezium>1.8.0.Final</version.debezium>
<version.mysql.driver>8.0.27</version.mysql.driver>
Expand Down Expand Up @@ -136,6 +137,13 @@
<version>${version.jackson}</version>
<optional>true</optional>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.2</version>
<scope>test</scope>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down