From 75844967e07925cf4afa3f6087a9a02c28497bbe Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Thu, 12 Aug 2021 10:27:22 +0200 Subject: [PATCH 1/8] Write Blog Post --- blog1.md | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 blog1.md diff --git a/blog1.md b/blog1.md new file mode 100644 index 00000000..1bade012 --- /dev/null +++ b/blog1.md @@ -0,0 +1,43 @@ + +@TODO cleanup documentation +@TODO add help page contributing page + +# Using Debezium to Create ACID Data Lake House + +Do you need to build flexible Data Lakehouse but dont know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates +its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark + +#### Debezium +Debezium is an open source distributed platform for change data capture. +Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms +(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of them) LINK here, it provides very simple interface to extend and write new sinks + +#### Apache Iceberg +Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Update queries, plus many other features listed here +Link +Apache iceberg has API fundation which supported by Spark and Presto and Trino, + +## debezium-server-iceberg + +[@TODO visual architecture diagram] + +Project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination +with this project its becomes possible to use best features from both projects enjoy realtime structured data feed and ACID table format with update support + +### Extending Debezium Server with custom sink +debezium-server-iceberg adds custom sink to Debezium server quarkus application [link here], +with custom sink received realtime json events converted to iceberg rows and processed using iceberg api +received rows are either appended or updated target table using iceberg api, since iceberg supports many cloud storage its eaily porrible to configure destination which could be +any of hadoop storage cloud storage location, consumed events are added to destination table as parquet files + +# update, append + +# destination, iceberg catalog + +# wait delay batch size + + +@Contribution + + +thanks to two projects \ No newline at end of file From 5a39b2b2e9257111b3146fa07b34f9ded49ea523 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 15 Aug 2021 10:05:27 +0200 Subject: [PATCH 2/8] Write Blog Post --- blog1.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/blog1.md b/blog1.md index 1bade012..82441785 100644 --- a/blog1.md +++ b/blog1.md @@ -15,7 +15,7 @@ Debezium extracts realtime database changes as json, avro, protobuf events and d #### Apache Iceberg Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Update queries, plus many other features listed here Link -Apache iceberg has API fundation which supported by Spark and Presto and Trino, +Apache iceberg has great flexible foundation/API which integrated by Spark, Presto, Trino, Flink and Hive ## debezium-server-iceberg @@ -26,18 +26,18 @@ with this project its becomes possible to use best features from both projects e ### Extending Debezium Server with custom sink debezium-server-iceberg adds custom sink to Debezium server quarkus application [link here], -with custom sink received realtime json events converted to iceberg rows and processed using iceberg api -received rows are either appended or updated target table using iceberg api, since iceberg supports many cloud storage its eaily porrible to configure destination which could be -any of hadoop storage cloud storage location, consumed events are added to destination table as parquet files +with custom sink received realtime json events converted to iceberg rows and processed using iceberg API +received rows are either appended or updated to destination iceberg table as Parquet files, since iceberg supports many cloud storage its easily possible to configure destination which could be +any of hadoop storage cloud storage location. with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage # update, append - -# destination, iceberg catalog - +V 0.12 iceberg +retain deletes as soft delete! # wait delay batch size - -@Contribution +wait by reading debezium metrics! another great feature of debezium +# destination, iceberg catalog +@Contribution thanks to two projects \ No newline at end of file From efff3bfb569dec0415222a2a23b0943c23aad6b5 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 15 Aug 2021 10:06:27 +0200 Subject: [PATCH 3/8] Write Blog Post --- blog1.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/blog1.md b/blog1.md index 82441785..ba746c48 100644 --- a/blog1.md +++ b/blog1.md @@ -1,6 +1,6 @@ @TODO cleanup documentation -@TODO add help page contributing page +@TODO add contributing page # Using Debezium to Create ACID Data Lake House @@ -37,7 +37,7 @@ retain deletes as soft delete! wait by reading debezium metrics! another great feature of debezium # destination, iceberg catalog -@Contribution +@Contribution ..etc -thanks to two projects \ No newline at end of file +thanks to two projects making it possible Apache iceberg[link], Debezium[link] \ No newline at end of file From b58e6d9cdb37163c31b6ed17346a07ad6920e9df Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 15 Aug 2021 16:41:46 +0200 Subject: [PATCH 4/8] Update documentation --- CONTRIBUTING.md | 41 ++++ README.md | 196 ++++++++++-------- .../server/iceberg/IcebergChangeConsumer.java | 2 +- .../iceberg/IcebergEventsChangeConsumer.java | 3 +- .../BatchSparkChangeConsumerMysqlTest.java | 6 +- ...chSparkChangeConsumerMysqlTestProfile.java | 2 +- .../debezium/server/iceberg/ConfigSource.java | 2 + 7 files changed, 157 insertions(+), 95 deletions(-) create mode 100644 CONTRIBUTING.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..e46fb930 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,41 @@ +# Contributing +We love your input! We want to make contributing to this project as easy and transparent as possible, whether it's: + +- Reporting a bug +- Discussing the current state of the code +- Submitting a fix +- Proposing new features +- Becoming a maintainer + +## We Develop with Github +We use github to host code, to track issues and feature requests, as well as accept pull requests. + +## We Use [Github Flow](https://guides.github.com/introduction/flow/index.html), So All Code Changes Happen Through Pull Requests +Pull requests are the best way to propose changes to the codebase. We actively welcome your pull requests: + +1. Fork the repo and create your branch from `master`. +2. If you've added code that should be tested, add tests. +3. If you've changed APIs, update the documentation. +4. Ensure the test suite passes. +5. Make sure your code is formatted. +6. Issue that pull request! + +## Any contributions you make will be under the Apache 2.0 License +In short, when you submit code changes, your submissions are understood to be under the same [Apache-2.0 License](https://github.com/memiiso/debezium-server-iceberg/blob/master/LICENSE) that covers the project. Feel free to contact the maintainers if that's a concern. + +## Report bugs using Github's [issues](https://github.com/memiiso/debezium-server-iceberg/issues) +We use GitHub issues to track public bugs. Report a bug by [opening a new issue](); it's that easy! + +## Write bug reports with detail, background, and sample code +**Good Bug Reports** tend to have: + +- A quick summary and/or background +- Steps to reproduce + - Be specific! + - Give sample code if you can. +- What you expected would happen +- What actually happens +- Notes (possibly including why you think this might be happening, or stuff you tried that didn't work) + +## License +By contributing, you agree that your contributions will be licensed under Apache 2.0 License. diff --git a/README.md b/README.md index 4d821860..f4144ed3 100644 --- a/README.md +++ b/README.md @@ -2,101 +2,153 @@ # Debezium Iceberg Consumers ------ -This project adds iceberg batch consumers -to [debezium server](https://debezium.io/documentation/reference/operations/debezium-server.html). it could be used to -replicate database changes to iceberg table, without requiring Spark, Kafka or Streaming platform. +This project adds iceberg consumer to [debezium server application](https://debezium.io/documentation/reference/operations/debezium-server.html). it could be used to +replicate database changes to iceberg table(Cloud storage, hdfs) without requiring Spark, Kafka or Streaming platform. ## `iceberg` Consumer -Appends json events to destination iceberg tables. Destination tables are created automatically if event and key schemas -enabled `debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true` -when destination table is not exists Consumer will print a warning message and continue replication of other tables +Iceberg consumer appends or upserts debezium events to destination iceberg tables. When event and key schemas +enabled (`debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true`) destination iceberg +tables created automatically. ### Upsert -By default `debezium.sink.iceberg.upsert=true` upsert feature is enabled, for tables with Primary Key definition it will -do upsert, for the tables without Primary Key it falls back to append mode - -Setting `debezium.sink.iceberg.upsert=false` will change insert mode to append. +By default, iceberg consumer is running with upsert mode `debezium.sink.iceberg.upsert=true`. +for the tables with Primary Key definition consumer does upsert, for the tables without Primary Key consumer falls back to append mode #### Data Deduplication -when iceberg consumer is doing upsert it does data deduplication for the batch, deduplication is done based -on `__source_ts_ms` field and event type `__op` -its is possible to change field using `debezium.sink.iceberg.upsert-source-ts-ms-column=__source_ts_ms`, Currently only +With upsert mode data deduplication is done per batch, deduplication is done based on `__source_ts_ms` value and event type `__op` +its is possible to change field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms`, Currently only Long field type supported -operation type priorities are `{"c":1, "r":2, "u":3, "d":4}` when two record with same Key having same `__source_ts_ms` -values then the record with higher `__op` priority is kept +operation type priorities are `{"c":1, "r":2, "u":3, "d":4}` when two records with same Key and same `__source_ts_ms` +values received then the record with higher `__op` priority is added to destination table and duplicate record is dropped. + +### Append +Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append, with append mode data deduplication is not done, all received records are appended to destination table +Note: For the tables without primary key operation mode is append even configuration is set to upsert mode #### Keeping Deleted Records By default `debezium.sink.iceberg.upsert-keep-deletes=true` will keep deletes in the iceberg table, setting it to false -will remove deleted records from the iceberg table too. with this feature its possible to keep last version of the -deleted record. +will remove deleted records from the destination iceberg table. With this config its possible to keep last version of the deleted +record in the table(to do soft deletes). + +### Optimizing batch size (or commit interval) + +Debezium extracts database events in real time and this could cause too frequent commits or too many small files +which is not optimal for batch processing especially when near realtime data feed is sufficient. +To avoid this problem following batch-size-wait classes are used. + +Batch size wait adds delay between consumer calls to increase total number of events received per call and meanwhile events are collected in memory +this should be configured with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties + -### Iceberg Table Names +#### NoBatchSizeWait -iceberg table names are created by following rule : `table-namespace` -.`table-prefix``database.server.name`_`database`_`table` +This is default configuration by default consumer will not use any batch size wait + +#### DynamicBatchSizeWait + +This wait strategy dynamically adds wait to increase batch size. Wait duration is calculated based on number of processed events in +last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes +are bigger than 90% of `max.batch.size` Wait duration will decrease + +This strategy tries to keep batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size. + +example setup to receive ~2048 events per commit. maximum wait is set to 5 seconds +```properties +debezium.source.max.queue.size=16000 +debezium.source.max.batch.size=2048 +debezium.sink.batch.batch-size-wait=DynamicBatchSizeWait +debezium.sink.batch.batch-size-wait.max-wait-ms=5000 +``` +#### MaxBatchSizeWait -For example +MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait +DynamicBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size` +maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties +example setup to receive ~2048 events per commit. maximum wait is set to 30 seconds, streaming queue current size checked every 5 seconds ```properties -debezium.sink.iceberg.table-namespace = default -database.server.name = testc +debezium.sink.batch.batch-size-wait=MaxBatchSizeWait +debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc +debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc +debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +debezium.source.max.batch.size=2048"); +debezium.source.max.queue.size=16000"); +debezium.sink.batch.batch-size-wait.max-wait-ms=30000 +debezium.sink.batch.batch-size-wait.wait-interval-ms=5000 +``` + +### Destination Iceberg Table Names + +iceberg table names created by following rule : `table-namespace`.`table-prefix``database.server.name`_`database`_`table` + +For example with following config + +```properties +debezium.sink.iceberg.table-namespace=default +database.server.name=testc debezium.sink.iceberg.table-prefix=cdc_ ``` database table = `inventory.customers` will be replicated to `default.testc_cdc_inventory_customers` +## Debezium Event Flattening + +Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported + +```properties +debezium.transforms=unwrap +debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState +debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db +debezium.transforms.unwrap.add.headers=db +debezium.transforms.unwrap.delete.handling.mode=rewrite +``` + +### Configuring iceberg + +All the properties starting with `debezium.sink.iceberg.__ICEBERG_CONFIG__` are passed to iceberg, and to hadoopConf + +```properties +debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg! +``` + ### Example Configuration ```properties -debezium.sink.type = iceberg -debezium.sink.iceberg.table-prefix = debeziumcdc_ -debezium.sink.iceberg.table-namespace = default -debezium.sink.iceberg.catalog-name = default -debezium.sink.iceberg.fs.defaultFS = s3a://MY_S3_BUCKET -debezium.sink.iceberg.warehouse = s3a://MY_S3_BUCKET/iceberg_warehouse -debezium.sink.iceberg.user.timezone = UTC -debezium.sink.iceberg.com.amazonaws.services.s3.enableV4 = true -debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4 = true -debezium.sink.iceberg.fs.s3a.aws.credentials.provider = com.amazonaws.auth.DefaultAWSCredentialsProviderChain -debezium.sink.iceberg.fs.s3a.access.key = S3_ACCESS_KEY -debezium.sink.iceberg.fs.s3a.secret.key = S3_SECRET_KEY -debezium.sink.iceberg.fs.s3a.path.style.access = true -debezium.sink.iceberg.fs.s3a.endpoint = http://localhost:9000 # minio specific setting -debezium.sink.iceberg.fs.s3a.impl = org.apache.hadoop.fs.s3a.S3AFileSystem -debezium.sink.iceberg.dynamic-wait=true -debezium.sink.batch.dynamic-wait.max-wait-ms=300000 +debezium.sink.type=iceberg +debezium.sink.iceberg.table-prefix=debeziumcdc_ +debezium.sink.iceberg.catalog-name=mycatalog +debezium.sink.iceberg.table-namespace=debeziumevents +debezium.sink.iceberg.fs.defaultFS=s3a://MY_S3_BUCKET +debezium.sink.iceberg.warehouse=s3a://MY_S3_BUCKET/iceberg_warehouse +debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true +debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true +debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain +debezium.sink.iceberg.fs.s3a.access.key=S3_ACCESS_KEY +debezium.sink.iceberg.fs.s3a.secret.key=S3_SECRET_KEY +debezium.sink.iceberg.fs.s3a.path.style.access=true +debezium.sink.iceberg.fs.s3a.endpoint=http://localhost:9000 # minio specific setting +debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem debezium.sink.iceberg.upsert=true debezium.sink.iceberg.upsert-keep-deletes=true -debezium.sink.iceberg.upsert-op-column=__op -debezium.sink.iceberg.upsert-source-ts-ms-column=__source_ts_ms debezium.format.value.schemas.enable=true debezium.format.key.schemas.enable=true ``` -All the properties starting with `debezium.sink.iceberg.**` are passed to iceberg, and hadoopConf - -```properties -debezium.sink.iceberg.{iceberg.prop.name} = xyz-value # passed to iceberg! -``` - ## `icebergevents` Consumer -This consumer appends CDC events to single iceberg table as json string. This table is partitioned -by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms` +This is second consumer developed with this project, This consumer appends CDC events to single iceberg table as json string. +This table partitioned by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms` #### Example Configuration ````properties -debezium.sink.type = icebergevents -debezium.sink.iceberg.catalog-name = default -debezium.sink.iceberg.dynamic-wait=true -debezium.sink.batch.dynamic-wait.max-wait-ms=300000 +debezium.sink.type=icebergevents +debezium.sink.iceberg.catalog-name=default ```` Iceberg table definition: @@ -118,37 +170,3 @@ static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA) .asc("event_sink_epoch_ms", NullOrder.NULLS_LAST) .build(); ``` - -## Debezium Event Flattening - -Iceberg consumer requires event flattening, Currently nested events and complex data types(like maps) are not supported - -```properties -debezium.transforms = unwrap -debezium.transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState -debezium.transforms.unwrap.add.fields = op,table,lsn,source.ts_ms -debezium.transforms.unwrap.add.headers = db -debezium.transforms.unwrap.delete.handling.mode = rewrite -``` - -## Controlling Batch Size - -`max.batch.size` Positive integer value that specifies the maximum size of each batch of events that should be processed -during each iteration of this connector. Defaults to 2048. - -`debezium.sink.batch.dynamic-wait.max-wait-ms` Positive integer value that specifies the maximum number of milliseconds -dynamic wait could add delay to increase batch size. dynamic wait is calculated based on number of processed events in -last 3 batches. if last batch sizes are lower than `max.batch.size` max-wait-ms will increase and if last batch sizes -are bigger than 90% of `max.batch.size` max-wait-ms will decrease - -it tries to keep batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size. - -Change `debezium.source.max.batch.size` and `debezium.sink.batch.dynamic-wait.max-wait-ms` if you want to have less -frequent commits with larger batch size - -```properties -# Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048. -debezium.source.max.batch.size = 2 -debezium.sink.iceberg.dynamic-wait=true -debezium.sink.batch.dynamic-wait.max-wait-ms=300000 -``` \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 2143e655..b06a0050 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -88,7 +88,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu boolean upsertKeepDeletes; @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") String opColumn; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-source-ts-ms-column", defaultValue = "__source_ts_ms") + @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") String sourceTsMsColumn; @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index 89513ba0..1d81b270 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -103,7 +103,6 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D Instance batchSizeWaitInstances; InterfaceBatchSizeWait batchSizeWait; - private TableIdentifier tableIdentifier; Map icebergProperties = new ConcurrentHashMap<>(); Catalog icebergCatalog; Table eventTable; @@ -120,7 +119,7 @@ void connect() throws InterruptedException { "Supported (debezium.format.key=*) formats are {json,}!"); } - tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events"); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events"); Map conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); conf.forEach(this.hadoopConf::set); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java index 9c888b2e..77f092d6 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java @@ -32,8 +32,8 @@ @QuarkusTest @QuarkusTestResource(S3Minio.class) @QuarkusTestResource(SourceMysqlDB.class) -@TestProfile(BatchSparkChangeConsumerMysqlTestProfile.class) -public class BatchSparkChangeConsumerMysqlTest extends BaseSparkTest { +@TestProfile(IcebergChangeConsumerMysqlTestProfile.class) +public class IcebergChangeConsumerMysqlTest extends BaseSparkTest { @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000") @@ -51,6 +51,8 @@ public void testSimpleUpload() { return false; } }); + + S3Minio.listFiles(); } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java index b1107adf..a951bf99 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java @@ -13,7 +13,7 @@ import java.util.HashMap; import java.util.Map; -public class BatchSparkChangeConsumerMysqlTestProfile implements QuarkusTestProfile { +public class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile { //This method allows us to override configuration properties. @Override diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index 593324d1..df8c35be 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -37,6 +37,8 @@ public ConfigSource() { config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds! // iceberg config.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_"); + config.put("debezium.sink.iceberg.catalog-name", "mycatalog"); + config.put("debezium.sink.iceberg.table-namespace", "debeziumevents"); config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET); config.put("debezium.sink.iceberg.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse"); config.put("debezium.sink.iceberg.type", "hadoop"); From d0802b9d3b6c85f54fb744a7a321f8689206056c Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 15 Aug 2021 16:42:53 +0200 Subject: [PATCH 5/8] Update documentation --- ...ConsumerMysqlTest.java => IcebergChangeConsumerMysqlTest.java} | 0 ...estProfile.java => IcebergChangeConsumerMysqlTestProfile.java} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/{BatchSparkChangeConsumerMysqlTest.java => IcebergChangeConsumerMysqlTest.java} (100%) rename debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/{BatchSparkChangeConsumerMysqlTestProfile.java => IcebergChangeConsumerMysqlTestProfile.java} (100%) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java similarity index 100% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java similarity index 100% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java From f5e30af90db5b4878da3ce8e86aabcc73423d6f7 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 15 Aug 2021 16:49:11 +0200 Subject: [PATCH 6/8] Update documentation --- .../debezium/server/iceberg/testresources/BaseSparkTest.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index 1a80d171..a0609777 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -39,9 +39,6 @@ public class BaseSparkTest { Testing.Files.createTestingFile(ConfigSource.OFFSET_STORE_PATH); } - // @ConfigProperty(name = "debezium.sink.iceberg.bucket-name", defaultValue = "") - String bucket = S3_BUCKET; - @BeforeAll static void setup() { Map appSparkConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), SPARK_PROP_PREFIX); @@ -142,7 +139,7 @@ public static int mysqlLoadTestDataTable(int numRows) throws Exception { } public Dataset getTableData(String table) { - return spark.newSession().sql("SELECT input_file_name() as input_file, * FROM default.debeziumcdc_" + table.replace(".", "_")); + return spark.newSession().sql("SELECT input_file_name() as input_file, * FROM debeziumevents.debeziumcdc_" + table.replace(".", "_")); } } From bd8f674a02bd2ca828f8cd5fe54320022f6c1dcd Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 15 Aug 2021 16:56:24 +0200 Subject: [PATCH 7/8] Update documentation --- .../src/test/java/io/debezium/server/iceberg/ConfigSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index df8c35be..dadb2f5f 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -37,11 +37,11 @@ public ConfigSource() { config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds! // iceberg config.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_"); - config.put("debezium.sink.iceberg.catalog-name", "mycatalog"); config.put("debezium.sink.iceberg.table-namespace", "debeziumevents"); config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET); config.put("debezium.sink.iceberg.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse"); config.put("debezium.sink.iceberg.type", "hadoop"); + config.put("debezium.sink.iceberg.catalog-name", "mycatalog"); config.put("debezium.sink.iceberg.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog"); // enable disable schema From 93cd4781843939de1a38528f1203a7f3fa5c40f0 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Sun, 15 Aug 2021 17:39:19 +0200 Subject: [PATCH 8/8] Update documentation --- BLOGPOST.md | 48 +++++++++++++++++++ blog1.md | 43 ----------------- .../IcebergEventsChangeConsumerTest.java | 2 +- 3 files changed, 49 insertions(+), 44 deletions(-) create mode 100644 BLOGPOST.md delete mode 100644 blog1.md diff --git a/BLOGPOST.md b/BLOGPOST.md new file mode 100644 index 00000000..f78b4914 --- /dev/null +++ b/BLOGPOST.md @@ -0,0 +1,48 @@ +# Using Debezium to Create ACID Data Lake House + +Do you need to build flexible Data Lakehouse but dont know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates +its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark + +#### Debezium +Debezium is an open source distributed platform for change data capture. +Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms +(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of [supported sinks](https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration)), +it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink) + +#### Apache Iceberg +Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Row level Deletes(Update) [plus many other benefits](https://iceberg.apache.org) +Apache iceberg has great foundation and flexible API which currently supported by Spark, Presto, Trino, Flink and Hive + +## debezium-server-iceberg + +[@TODO visual architecture diagram] + +This project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination +with this project its becomes possible to use best features from both projects enjoy realtime structured data feed and ACID table format with update support + +### Extending Debezium Server with Iceberg sink +debezium-server Iceberg sink to [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation), + +debezium-server Iceberg sink received realtime json events converted to iceberg rows and processed using iceberg API +received rows are either appended or updated to destination iceberg table as Parquet files, since iceberg supports many cloud storage its easily possible to configure destination which could be +any of hadoop storage cloud storage location. with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage + +# update, append +Iceberg consumer by default works with upsert mode. When a row updated on source table destination row replaced with up-to-date record. +with upsert mode data at destination is always deduplicate and kept up to date + + +V 0.12 iceberg +retain deletes as soft delete! +# wait delay batch size + +wait by reading debezium metrics! another great feature of debezium +# destination, iceberg catalog + +@Contribution ..etc + +# Links +[Apache iceberg](https://iceberg.apache.org/) +[Apache iceberg Github](https://github.com/apache/iceberg) +[Debezium](https://debezium.io/) +[Debezium Github](https://github.com/debezium/debezium) \ No newline at end of file diff --git a/blog1.md b/blog1.md deleted file mode 100644 index ba746c48..00000000 --- a/blog1.md +++ /dev/null @@ -1,43 +0,0 @@ - -@TODO cleanup documentation -@TODO add contributing page - -# Using Debezium to Create ACID Data Lake House - -Do you need to build flexible Data Lakehouse but dont know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates -its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark - -#### Debezium -Debezium is an open source distributed platform for change data capture. -Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms -(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of them) LINK here, it provides very simple interface to extend and write new sinks - -#### Apache Iceberg -Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Update queries, plus many other features listed here -Link -Apache iceberg has great flexible foundation/API which integrated by Spark, Presto, Trino, Flink and Hive - -## debezium-server-iceberg - -[@TODO visual architecture diagram] - -Project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination -with this project its becomes possible to use best features from both projects enjoy realtime structured data feed and ACID table format with update support - -### Extending Debezium Server with custom sink -debezium-server-iceberg adds custom sink to Debezium server quarkus application [link here], -with custom sink received realtime json events converted to iceberg rows and processed using iceberg API -received rows are either appended or updated to destination iceberg table as Parquet files, since iceberg supports many cloud storage its easily possible to configure destination which could be -any of hadoop storage cloud storage location. with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage - -# update, append -V 0.12 iceberg -retain deletes as soft delete! -# wait delay batch size - -wait by reading debezium metrics! another great feature of debezium -# destination, iceberg catalog - -@Contribution ..etc - -thanks to two projects making it possible Apache iceberg[link], Debezium[link] \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index ddcf79c8..06e42296 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -42,7 +42,7 @@ public void testIcebergEvents() throws Exception { Assertions.assertEquals(sinkType, "icebergevents"); Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { - Dataset ds = spark.newSession().sql("SELECT * FROM default.debezium_events"); + Dataset ds = spark.newSession().sql("SELECT * FROM debeziumevents.debezium_events"); ds.show(); return ds.count() >= 5 && ds.select("event_destination").distinct().count() >= 2;