Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor cleanup and code improvements #190

Merged
merged 2 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin

// consume list of events for each destination table
for (Map.Entry<String, List<IcebergChangeEvent>> tableEvents : result.entrySet()) {
Table icebergTable = this.loadIcebergTable(icebergCatalog, mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0));
Table icebergTable = this.loadIcebergTable(mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0));
icebergTableOperator.addToTable(icebergTable, tableEvents.getValue());
}

Expand All @@ -180,12 +180,11 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
}

/**
* @param icebergCatalog iceberg catalog
* @param tableId iceberg table identifier
* @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found
* @param tableId iceberg table identifier
* @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found
* @return iceberg table, throws RuntimeException when table not found and it's not possible to create it
*/
public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, IcebergChangeEvent sampleEvent) {
public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sampleEvent) {
return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> {
if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.Lists;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

Expand All @@ -37,18 +38,6 @@ public class IcebergChangeConsumerMysqlTest extends BaseSparkTest {

@Test
public void testSimpleUpload() throws Exception {

Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.customers");
return df.filter("id is not null").count() >= 4;
} catch (Exception e) {
return false;
}
});
// S3Minio.listFiles();

// create test table
String sqlCreate = "CREATE TABLE IF NOT EXISTS inventory.test_delete_table (" +
" c_id INTEGER ," +
" c_id2 INTEGER ," +
Expand All @@ -62,16 +51,27 @@ public void testSimpleUpload() throws Exception {

SourceMysqlDB.runSQL(sqlCreate);
SourceMysqlDB.runSQL(sqlInsert);
Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {

CloseableIterable<Record> result = getTableDataV2("testc.inventory.test_delete_table");
return Lists.newArrayList(result).size() == 4;
} catch (Exception e) {
return false;
}
});

SourceMysqlDB.runSQL(sqlDelete);
SourceMysqlDB.runSQL(sqlInsert);
SourceMysqlDB.runSQL(sqlDelete);
SourceMysqlDB.runSQL(sqlInsert);

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.test_delete_table");
df.show();
return df.count() == 20; // 4X3 insert 4X2 delete!
CloseableIterable<Record> result = getTableDataV2("testc.inventory.test_delete_table");
//result.forEach(System.out::println);
//System.out.println("======================");
return Lists.newArrayList(result).size() >= 20;
} catch (Exception e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import com.google.common.collect.Lists;
import org.apache.iceberg.catalog.TableIdentifier;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

Expand All @@ -39,8 +39,7 @@ public class IcebergSchemaHistoryTest extends BaseSparkTest {
public void testSimpleUpload() {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.customers");
return ds.count() >= 3;
return Lists.newArrayList(getTableDataV2("testc.inventory.customers")).size() >= 3;
} catch (Exception e) {
return false;
}
Expand All @@ -49,9 +48,7 @@ public void testSimpleUpload() {
// test nested data(struct) consumed
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = spark.newSession().sql("SELECT * FROM mycatalog.debezium_database_history_storage_test");
ds.show(false);
return ds.count() >= 5;
return Lists.newArrayList(getTableDataV2(TableIdentifier.of("mycatalog", "debezium_database_history_storage_test"))).size() >= 5;
} catch (Exception e) {
e.printStackTrace();
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@

package io.debezium.server.iceberg.testresources;

import io.debezium.server.iceberg.IcebergChangeConsumer;
import io.debezium.server.iceberg.IcebergUtil;

import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.BeforeAll;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET;

Expand All @@ -38,12 +42,8 @@ public class BaseSparkTest {
.setMaster("local[2]");
private static final String SPARK_PROP_PREFIX = "debezium.sink.sparkbatch.";
protected static SparkSession spark;
@ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "")
String tablePrefix;
@ConfigProperty(name = "debezium.sink.iceberg.warehouse")
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;
@Inject
IcebergChangeConsumer consumer;

@BeforeAll
static void setup() {
Expand Down Expand Up @@ -87,10 +87,6 @@ public static void PGCreateTestDataTable() throws Exception {
SourcePostgresqlDB.runSQL(sql);
}

public static int PGLoadTestDataTable(int numRows) throws Exception {
return PGLoadTestDataTable(numRows, false);
}

public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) {
int numInsert = 0;
do {
Expand Down Expand Up @@ -118,37 +114,6 @@ public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) {
return numInsert;
}

public static void mysqlCreateTestDataTable() throws Exception {
// create test table
String sql = "\n" +
" CREATE TABLE IF NOT EXISTS inventory.test_data (\n" +
" c_id INTEGER ,\n" +
" c_text TEXT,\n" +
" c_varchar TEXT\n" +
" );";
SourceMysqlDB.runSQL(sql);
}

public static int mysqlLoadTestDataTable(int numRows) throws Exception {
int numInsert = 0;
do {
String sql = "INSERT INTO inventory.test_data (c_id, c_text, c_varchar ) " +
"VALUES ";
StringBuilder values = new StringBuilder("\n(" + TestUtil.randomInt(15, 32) + ", '" + TestUtil.randomString(524) + "', '" + TestUtil.randomString(524) + "')");
for (int i = 0; i < 10; i++) {
values.append("\n,(").append(TestUtil.randomInt(15, 32)).append(", '").append(TestUtil.randomString(524)).append("', '").append(TestUtil.randomString(524)).append("')");
}
SourceMysqlDB.runSQL(sql + values);
numInsert += 10;
} while (numInsert <= numRows);
return numInsert;
}

protected org.apache.iceberg.Table getTable(String table) {
HadoopCatalog catalog = getIcebergCatalog();
return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_")));
}

protected HadoopCatalog getIcebergCatalog() {
// loop and set hadoopConf
Configuration hadoopConf = new Configuration();
Expand All @@ -173,4 +138,18 @@ public Dataset<Row> getTableData(String table) {
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table);
}

public CloseableIterable<Record> getTableDataV2(String table) {
return getTableDataV2("debeziumevents", table);
}

public CloseableIterable<Record> getTableDataV2(String catalog, String table) {
String tableName = "debeziumcdc_" + table.replace(".", "_");
return getTableDataV2(TableIdentifier.of(catalog, tableName));
}

public CloseableIterable<Record> getTableDataV2(TableIdentifier table) {
Table iceTable = consumer.loadIcebergTable(table, null);
return IcebergGenerics.read(iceTable).build();
}

}