diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 245c8eb0..b78a1de4 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -163,7 +163,7 @@ public void handleBatch(List> records, DebeziumEngin // consume list of events for each destination table for (Map.Entry> tableEvents : result.entrySet()) { - Table icebergTable = this.loadIcebergTable(icebergCatalog, mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0)); + Table icebergTable = this.loadIcebergTable(mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0)); icebergTableOperator.addToTable(icebergTable, tableEvents.getValue()); } @@ -180,12 +180,11 @@ public void handleBatch(List> records, DebeziumEngin } /** - * @param icebergCatalog iceberg catalog - * @param tableId iceberg table identifier - * @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found + * @param tableId iceberg table identifier + * @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found * @return iceberg table, throws RuntimeException when table not found and it's not possible to create it */ - public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, IcebergChangeEvent sampleEvent) { + public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sampleEvent) { return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> { if (!eventSchemaEnabled) { throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java index 9d926f30..4d81de9c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java @@ -20,8 +20,9 @@ import java.util.HashMap; import java.util.Map; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; +import com.google.common.collect.Lists; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; @@ -37,18 +38,6 @@ public class IcebergChangeConsumerMysqlTest extends BaseSparkTest { @Test public void testSimpleUpload() throws Exception { - - Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> { - try { - Dataset df = getTableData("testc.inventory.customers"); - return df.filter("id is not null").count() >= 4; - } catch (Exception e) { - return false; - } - }); - // S3Minio.listFiles(); - - // create test table String sqlCreate = "CREATE TABLE IF NOT EXISTS inventory.test_delete_table (" + " c_id INTEGER ," + " c_id2 INTEGER ," + @@ -62,6 +51,16 @@ public void testSimpleUpload() throws Exception { SourceMysqlDB.runSQL(sqlCreate); SourceMysqlDB.runSQL(sqlInsert); + Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> { + try { + + CloseableIterable result = getTableDataV2("testc.inventory.test_delete_table"); + return Lists.newArrayList(result).size() == 4; + } catch (Exception e) { + return false; + } + }); + SourceMysqlDB.runSQL(sqlDelete); SourceMysqlDB.runSQL(sqlInsert); SourceMysqlDB.runSQL(sqlDelete); @@ -69,9 +68,10 @@ public void testSimpleUpload() throws Exception { Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { - Dataset df = getTableData("testc.inventory.test_delete_table"); - df.show(); - return df.count() == 20; // 4X3 insert 4X2 delete! + CloseableIterable result = getTableDataV2("testc.inventory.test_delete_table"); + //result.forEach(System.out::println); + //System.out.println("======================"); + return Lists.newArrayList(result).size() >= 20; } catch (Exception e) { return false; } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java index be6939b0..67d7d152 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java @@ -20,8 +20,8 @@ import java.util.HashMap; import java.util.Map; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; +import com.google.common.collect.Lists; +import org.apache.iceberg.catalog.TableIdentifier; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; @@ -39,8 +39,7 @@ public class IcebergSchemaHistoryTest extends BaseSparkTest { public void testSimpleUpload() { Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { - Dataset ds = getTableData("testc.inventory.customers"); - return ds.count() >= 3; + return Lists.newArrayList(getTableDataV2("testc.inventory.customers")).size() >= 3; } catch (Exception e) { return false; } @@ -49,9 +48,7 @@ public void testSimpleUpload() { // test nested data(struct) consumed Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { - Dataset ds = spark.newSession().sql("SELECT * FROM mycatalog.debezium_database_history_storage_test"); - ds.show(false); - return ds.count() >= 5; + return Lists.newArrayList(getTableDataV2(TableIdentifier.of("mycatalog", "debezium_database_history_storage_test"))).size() >= 5; } catch (Exception e) { e.printStackTrace(); return false; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index 31dd2953..0067827c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -8,21 +8,25 @@ package io.debezium.server.iceberg.testresources; +import io.debezium.server.iceberg.IcebergChangeConsumer; import io.debezium.server.iceberg.IcebergUtil; import java.util.HashMap; import java.util.Map; +import javax.inject.Inject; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.CloseableIterable; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.eclipse.microprofile.config.ConfigProvider; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.junit.jupiter.api.BeforeAll; import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET; @@ -38,12 +42,8 @@ public class BaseSparkTest { .setMaster("local[2]"); private static final String SPARK_PROP_PREFIX = "debezium.sink.sparkbatch."; protected static SparkSession spark; - @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") - String tablePrefix; - @ConfigProperty(name = "debezium.sink.iceberg.warehouse") - String warehouseLocation; - @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") - String namespace; + @Inject + IcebergChangeConsumer consumer; @BeforeAll static void setup() { @@ -87,10 +87,6 @@ public static void PGCreateTestDataTable() throws Exception { SourcePostgresqlDB.runSQL(sql); } - public static int PGLoadTestDataTable(int numRows) throws Exception { - return PGLoadTestDataTable(numRows, false); - } - public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) { int numInsert = 0; do { @@ -118,37 +114,6 @@ public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) { return numInsert; } - public static void mysqlCreateTestDataTable() throws Exception { - // create test table - String sql = "\n" + - " CREATE TABLE IF NOT EXISTS inventory.test_data (\n" + - " c_id INTEGER ,\n" + - " c_text TEXT,\n" + - " c_varchar TEXT\n" + - " );"; - SourceMysqlDB.runSQL(sql); - } - - public static int mysqlLoadTestDataTable(int numRows) throws Exception { - int numInsert = 0; - do { - String sql = "INSERT INTO inventory.test_data (c_id, c_text, c_varchar ) " + - "VALUES "; - StringBuilder values = new StringBuilder("\n(" + TestUtil.randomInt(15, 32) + ", '" + TestUtil.randomString(524) + "', '" + TestUtil.randomString(524) + "')"); - for (int i = 0; i < 10; i++) { - values.append("\n,(").append(TestUtil.randomInt(15, 32)).append(", '").append(TestUtil.randomString(524)).append("', '").append(TestUtil.randomString(524)).append("')"); - } - SourceMysqlDB.runSQL(sql + values); - numInsert += 10; - } while (numInsert <= numRows); - return numInsert; - } - - protected org.apache.iceberg.Table getTable(String table) { - HadoopCatalog catalog = getIcebergCatalog(); - return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_"))); - } - protected HadoopCatalog getIcebergCatalog() { // loop and set hadoopConf Configuration hadoopConf = new Configuration(); @@ -173,4 +138,18 @@ public Dataset getTableData(String table) { return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table); } + public CloseableIterable getTableDataV2(String table) { + return getTableDataV2("debeziumevents", table); + } + + public CloseableIterable getTableDataV2(String catalog, String table) { + String tableName = "debeziumcdc_" + table.replace(".", "_"); + return getTableDataV2(TableIdentifier.of(catalog, tableName)); + } + + public CloseableIterable getTableDataV2(TableIdentifier table) { + Table iceTable = consumer.loadIcebergTable(table, null); + return IcebergGenerics.read(iceTable).build(); + } + }