From 32ed5cf528ecc3945e56984524ebdabda892fb97 Mon Sep 17 00:00:00 2001 From: ismail simsek Date: Mon, 16 Aug 2021 11:06:46 +0200 Subject: [PATCH] Add delete test (#21) Add delete test --- BLOGPOST.md | 20 ++++++------- .../debezium/server/iceberg/ConfigSource.java | 3 +- .../IcebergChangeConsumerMysqlTest.java | 30 +++++++++++++++++++ 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/BLOGPOST.md b/BLOGPOST.md index f78b4914..81c6d9ae 100644 --- a/BLOGPOST.md +++ b/BLOGPOST.md @@ -1,6 +1,6 @@ # Using Debezium to Create ACID Data Lake House -Do you need to build flexible Data Lakehouse but dont know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates +Do you need to build flexible Data Lakehouse but don't know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark #### Debezium @@ -10,22 +10,22 @@ Debezium extracts realtime database changes as json, avro, protobuf events and d it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink) #### Apache Iceberg -Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Row level Deletes(Update) [plus many other benefits](https://iceberg.apache.org) +Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Row level Deletes(Update)[it has many other benefits](https://iceberg.apache.org) Apache iceberg has great foundation and flexible API which currently supported by Spark, Presto, Trino, Flink and Hive -## debezium-server-iceberg +## Debezium Server Iceberg [@TODO visual architecture diagram] -This project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination -with this project its becomes possible to use best features from both projects enjoy realtime structured data feed and ACID table format with update support +Debezium Server Iceberg project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination supported by iceberg +Debezium Server Iceberg it is possible to use best features from both projects like realtime structured data pipeline and ACID table format with update support -### Extending Debezium Server with Iceberg sink -debezium-server Iceberg sink to [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation), +Debezium Iceberg sink extends [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation), -debezium-server Iceberg sink received realtime json events converted to iceberg rows and processed using iceberg API -received rows are either appended or updated to destination iceberg table as Parquet files, since iceberg supports many cloud storage its easily possible to configure destination which could be -any of hadoop storage cloud storage location. with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage +Iceberg consumer converts debezium json events to iceberg rows and commits them to destination iceberg table using iceberg API +It's possible to append database events to iceberg tables or do upsert using source table primary key +since iceberg supports many cloud storage its easily possible to configure destination which could be any of hadoop storage cloud storage location. +with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage # update, append Iceberg consumer by default works with upsert mode. When a row updated on source table destination row replaced with up-to-date record. diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index dadb2f5f..a47b25c4 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -52,6 +52,7 @@ public ConfigSource() { config.put("debezium.transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState"); config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db"); config.put("debezium.transforms.unwrap.delete.handling.mode", "rewrite"); + config.put("debezium.transforms.unwrap.drop.tombstones", "true"); // DEBEZIUM SOURCE conf config.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); @@ -63,7 +64,7 @@ public ConfigSource() { config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," + "inventory.table_datatypes,inventory.test_date_table"); config.put("%postgresql.debezium.source.database.whitelist", "inventory"); - config.put("%mysql.debezium.source.table.whitelist", "inventory.customers"); + config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table"); config.put("debezium.source.include.schema.changes", "false"); config.put("quarkus.log.level", "INFO"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java index 77f092d6..9c048a2f 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java @@ -39,6 +39,36 @@ public class IcebergChangeConsumerMysqlTest extends BaseSparkTest { @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000") Integer maxBatchSize; + @Test + public void testTombstoneEvents() throws Exception { + // create test table + String sqlCreate = "CREATE TABLE IF NOT EXISTS inventory.test_delete_table (" + + " c_id INTEGER ," + + " c_id2 INTEGER ," + + " c_data TEXT," + + " PRIMARY KEY (c_id, c_id2)" + + " );"; + String sqlInsert = + "INSERT INTO inventory.test_delete_table (c_id, c_id2, c_data ) " + + "VALUES (1,1,'data'),(1,2,'data'),(1,3,'data'),(1,4,'data') ;"; + String sqlDelete = "DELETE FROM inventory.test_delete_table where c_id = 1 ;"; + + SourceMysqlDB.runSQL(sqlCreate); + SourceMysqlDB.runSQL(sqlInsert); + SourceMysqlDB.runSQL(sqlDelete); + SourceMysqlDB.runSQL(sqlInsert); + + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + Dataset df = getTableData("testc.inventory.test_delete_table"); + df.show(); + return df.count() >= 12; // 4 insert 4 delete 4 insert! + } catch (Exception e) { + return false; + } + }); + } + @Test public void testSimpleUpload() { Testing.Print.enable();