Skip to content

Commit

Permalink
Minor cleanup and code improvements (#190)
Browse files Browse the repository at this point in the history
* Minor code and test improvements

* Minor code and test code improvements
  • Loading branch information
ismailsimsek committed Apr 27, 2023
1 parent f000a1e commit 2c41f46
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin

// consume list of events for each destination table
for (Map.Entry<String, List<IcebergChangeEvent>> tableEvents : result.entrySet()) {
Table icebergTable = this.loadIcebergTable(icebergCatalog, mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0));
Table icebergTable = this.loadIcebergTable(mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0));
icebergTableOperator.addToTable(icebergTable, tableEvents.getValue());
}

Expand All @@ -180,12 +180,11 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
}

/**
* @param icebergCatalog iceberg catalog
* @param tableId iceberg table identifier
* @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found
* @param tableId iceberg table identifier
* @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found
* @return iceberg table, throws RuntimeException when table not found and it's not possible to create it
*/
public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, IcebergChangeEvent sampleEvent) {
public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sampleEvent) {
return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> {
if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.Lists;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

Expand All @@ -37,18 +38,6 @@ public class IcebergChangeConsumerMysqlTest extends BaseSparkTest {

@Test
public void testSimpleUpload() throws Exception {

Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.customers");
return df.filter("id is not null").count() >= 4;
} catch (Exception e) {
return false;
}
});
// S3Minio.listFiles();

// create test table
String sqlCreate = "CREATE TABLE IF NOT EXISTS inventory.test_delete_table (" +
" c_id INTEGER ," +
" c_id2 INTEGER ," +
Expand All @@ -62,16 +51,27 @@ public void testSimpleUpload() throws Exception {

SourceMysqlDB.runSQL(sqlCreate);
SourceMysqlDB.runSQL(sqlInsert);
Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {

CloseableIterable<Record> result = getTableDataV2("testc.inventory.test_delete_table");
return Lists.newArrayList(result).size() == 4;
} catch (Exception e) {
return false;
}
});

SourceMysqlDB.runSQL(sqlDelete);
SourceMysqlDB.runSQL(sqlInsert);
SourceMysqlDB.runSQL(sqlDelete);
SourceMysqlDB.runSQL(sqlInsert);

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.test_delete_table");
df.show();
return df.count() == 20; // 4X3 insert 4X2 delete!
CloseableIterable<Record> result = getTableDataV2("testc.inventory.test_delete_table");
//result.forEach(System.out::println);
//System.out.println("======================");
return Lists.newArrayList(result).size() >= 20;
} catch (Exception e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.Lists;
import org.apache.iceberg.catalog.TableIdentifier;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

Expand All @@ -39,8 +39,7 @@ public class IcebergSchemaHistoryTest extends BaseSparkTest {
public void testSimpleUpload() {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
return ds.count() >= 3;
return Lists.newArrayList(getTableDataV2("testc.inventory.customers")).size() >= 3;
} catch (Exception e) {
return false;
}
Expand All @@ -49,9 +48,7 @@ public void testSimpleUpload() {
// test nested data(struct) consumed
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = spark.newSession().sql("SELECT * FROM mycatalog.debezium_database_history_storage_test");
ds.show(false);
return ds.count() >= 5;
return Lists.newArrayList(getTableDataV2(TableIdentifier.of("mycatalog", "debezium_database_history_storage_test"))).size() >= 5;
} catch (Exception e) {
e.printStackTrace();
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@

package io.debezium.server.iceberg.testresources;

import io.debezium.server.iceberg.IcebergChangeConsumer;
import io.debezium.server.iceberg.IcebergUtil;

import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.BeforeAll;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET;

Expand All @@ -38,12 +42,8 @@ public class BaseSparkTest {
.setMaster("local[2]");
private static final String SPARK_PROP_PREFIX = "debezium.sink.sparkbatch.";
protected static SparkSession spark;
@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.warehouse")
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;
@Inject
IcebergChangeConsumer consumer;

@BeforeAll
static void setup() {
Expand Down Expand Up @@ -87,10 +87,6 @@ public static void PGCreateTestDataTable() throws Exception {
SourcePostgresqlDB.runSQL(sql);
}

public static int PGLoadTestDataTable(int numRows) throws Exception {
return PGLoadTestDataTable(numRows, false);
}

public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) {
int numInsert = 0;
do {
Expand Down Expand Up @@ -118,37 +114,6 @@ public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) {
return numInsert;
}

public static void mysqlCreateTestDataTable() throws Exception {
// create test table
String sql = "\n" +
" CREATE TABLE IF NOT EXISTS inventory.test_data (\n" +
" c_id INTEGER ,\n" +
" c_text TEXT,\n" +
" c_varchar TEXT\n" +
" );";
SourceMysqlDB.runSQL(sql);
}

public static int mysqlLoadTestDataTable(int numRows) throws Exception {
int numInsert = 0;
do {
String sql = "INSERT INTO inventory.test_data (c_id, c_text, c_varchar ) " +
"VALUES ";
StringBuilder values = new StringBuilder("\n(" + TestUtil.randomInt(15, 32) + ", '" + TestUtil.randomString(524) + "', '" + TestUtil.randomString(524) + "')");
for (int i = 0; i < 10; i++) {
values.append("\n,(").append(TestUtil.randomInt(15, 32)).append(", '").append(TestUtil.randomString(524)).append("', '").append(TestUtil.randomString(524)).append("')");
}
SourceMysqlDB.runSQL(sql + values);
numInsert += 10;
} while (numInsert <= numRows);
return numInsert;
}

protected org.apache.iceberg.Table getTable(String table) {
HadoopCatalog catalog = getIcebergCatalog();
return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
}

protected HadoopCatalog getIcebergCatalog() {
// loop and set hadoopConf
Configuration hadoopConf = new Configuration();
Expand All @@ -173,4 +138,18 @@ public Dataset<Row> getTableData(String table) {
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table);
}

public CloseableIterable<Record> getTableDataV2(String table) {
return getTableDataV2("debeziumevents", table);
}

public CloseableIterable<Record> getTableDataV2(String catalog, String table) {
String tableName = "debeziumcdc_" + table.replace(".", "_");
return getTableDataV2(TableIdentifier.of(catalog, tableName));
}

public CloseableIterable<Record> getTableDataV2(TableIdentifier table) {
Table iceTable = consumer.loadIcebergTable(table, null);
return IcebergGenerics.read(iceTable).build();
}

}

0 comments on commit 2c41f46

Please sign in to comment.