Skip to content

Commit

Permalink
Add delete test (#21)
Browse files Browse the repository at this point in the history
Add delete test
  • Loading branch information
ismailsimsek committed Aug 16, 2021
1 parent 0429ef6 commit 32ed5cf
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 11 deletions.
20 changes: 10 additions & 10 deletions BLOGPOST.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Using Debezium to Create ACID Data Lake House

Do you need to build flexible Data Lakehouse but dont know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates
Do you need to build flexible Data Lakehouse but don't know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates
its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark

#### Debezium
Expand All @@ -10,22 +10,22 @@ Debezium extracts realtime database changes as json, avro, protobuf events and d
it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink)

#### Apache Iceberg
Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Row level Deletes(Update) [plus many other benefits](https://iceberg.apache.org)
Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Row level Deletes(Update)[it has many other benefits](https://iceberg.apache.org)
Apache iceberg has great foundation and flexible API which currently supported by Spark, Presto, Trino, Flink and Hive

## debezium-server-iceberg
## Debezium Server Iceberg

[@TODO visual architecture diagram]

This project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination
with this project its becomes possible to use best features from both projects enjoy realtime structured data feed and ACID table format with update support
Debezium Server Iceberg project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination supported by iceberg
Debezium Server Iceberg it is possible to use best features from both projects like realtime structured data pipeline and ACID table format with update support

### Extending Debezium Server with Iceberg sink
debezium-server Iceberg sink to [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation),
Debezium Iceberg sink extends [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation),

debezium-server Iceberg sink received realtime json events converted to iceberg rows and processed using iceberg API
received rows are either appended or updated to destination iceberg table as Parquet files, since iceberg supports many cloud storage its easily possible to configure destination which could be
any of hadoop storage cloud storage location. with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage
Iceberg consumer converts debezium json events to iceberg rows and commits them to destination iceberg table using iceberg API
It's possible to append database events to iceberg tables or do upsert using source table primary key
since iceberg supports many cloud storage its easily possible to configure destination which could be any of hadoop storage cloud storage location.
with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage

# update, append
Iceberg consumer by default works with upsert mode. When a row updated on source table destination row replaced with up-to-date record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public ConfigSource() {
config.put("debezium.transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState");
config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db");
config.put("debezium.transforms.unwrap.delete.handling.mode", "rewrite");
config.put("debezium.transforms.unwrap.drop.tombstones", "true");

// DEBEZIUM SOURCE conf
config.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
Expand All @@ -63,7 +64,7 @@ public ConfigSource() {
config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," +
"inventory.table_datatypes,inventory.test_date_table");
config.put("%postgresql.debezium.source.database.whitelist", "inventory");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
config.put("debezium.source.include.schema.changes", "false");

config.put("quarkus.log.level", "INFO");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,36 @@ public class IcebergChangeConsumerMysqlTest extends BaseSparkTest {
@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000")
Integer maxBatchSize;

@Test
public void testTombstoneEvents() throws Exception {
// create test table
String sqlCreate = "CREATE TABLE IF NOT EXISTS inventory.test_delete_table (" +
" c_id INTEGER ," +
" c_id2 INTEGER ," +
" c_data TEXT," +
" PRIMARY KEY (c_id, c_id2)" +
" );";
String sqlInsert =
"INSERT INTO inventory.test_delete_table (c_id, c_id2, c_data ) " +
"VALUES (1,1,'data'),(1,2,'data'),(1,3,'data'),(1,4,'data') ;";
String sqlDelete = "DELETE FROM inventory.test_delete_table where c_id = 1 ;";

SourceMysqlDB.runSQL(sqlCreate);
SourceMysqlDB.runSQL(sqlInsert);
SourceMysqlDB.runSQL(sqlDelete);
SourceMysqlDB.runSQL(sqlInsert);

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.test_delete_table");
df.show();
return df.count() >= 12; // 4 insert 4 delete 4 insert!
} catch (Exception e) {
return false;
}
});
}

@Test
public void testSimpleUpload() {
Testing.Print.enable();
Expand Down

0 comments on commit 32ed5cf

Please sign in to comment.