diff --git a/BLOGPOST.md b/BLOGPOST.md deleted file mode 100644 index 81c6d9ae..00000000 --- a/BLOGPOST.md +++ /dev/null @@ -1,48 +0,0 @@ -# Using Debezium to Create ACID Data Lake House - -Do you need to build flexible Data Lakehouse but don't know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates -its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark - -#### Debezium -Debezium is an open source distributed platform for change data capture. -Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms -(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of [supported sinks](https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration)), -it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink) - -#### Apache Iceberg -Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Row level Deletes(Update)[it has many other benefits](https://iceberg.apache.org) -Apache iceberg has great foundation and flexible API which currently supported by Spark, Presto, Trino, Flink and Hive - -## Debezium Server Iceberg - -[@TODO visual architecture diagram] - -Debezium Server Iceberg project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination supported by iceberg -Debezium Server Iceberg it is possible to use best features from both projects like realtime structured data pipeline and ACID table format with update support - -Debezium Iceberg sink extends [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation), - -Iceberg consumer converts debezium json events to iceberg rows and commits them to destination iceberg table using iceberg API -It's possible to append database events to iceberg tables or do upsert using source table primary key -since iceberg supports many cloud storage its easily possible to configure destination which could be any of hadoop storage cloud storage location. -with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage - -# update, append -Iceberg consumer by default works with upsert mode. When a row updated on source table destination row replaced with up-to-date record. -with upsert mode data at destination is always deduplicate and kept up to date - - -V 0.12 iceberg -retain deletes as soft delete! -# wait delay batch size - -wait by reading debezium metrics! another great feature of debezium -# destination, iceberg catalog - -@Contribution ..etc - -# Links -[Apache iceberg](https://iceberg.apache.org/) -[Apache iceberg Github](https://github.com/apache/iceberg) -[Debezium](https://debezium.io/) -[Debezium Github](https://github.com/debezium/debezium) \ No newline at end of file diff --git a/README.md b/README.md index f4144ed3..1d287335 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ This project adds iceberg consumer to [debezium server application](https://debezium.io/documentation/reference/operations/debezium-server.html). it could be used to replicate database changes to iceberg table(Cloud storage, hdfs) without requiring Spark, Kafka or Streaming platform. +![Debezium Iceberg](docs/images/Debezium-Iceberg.png) + ## `iceberg` Consumer Iceberg consumer appends or upserts debezium events to destination iceberg tables. When event and key schemas @@ -27,13 +29,13 @@ values received then the record with higher `__op` priority is added to destinat ### Append Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append, with append mode data deduplication is not done, all received records are appended to destination table -Note: For the tables without primary key operation mode is append even configuration is set to upsert mode +Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode #### Keeping Deleted Records By default `debezium.sink.iceberg.upsert-keep-deletes=true` will keep deletes in the iceberg table, setting it to false will remove deleted records from the destination iceberg table. With this config its possible to keep last version of the deleted -record in the table(to do soft deletes). +record in the table(possible to do soft deletes). ### Optimizing batch size (or commit interval) @@ -50,7 +52,7 @@ this should be configured with `debezium.source.max.queue.size` and `debezium.so This is default configuration by default consumer will not use any batch size wait #### DynamicBatchSizeWait - +**Deprecated** This wait strategy dynamically adds wait to increase batch size. Wait duration is calculated based on number of processed events in last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes are bigger than 90% of `max.batch.size` Wait duration will decrease @@ -66,9 +68,9 @@ debezium.sink.batch.batch-size-wait.max-wait-ms=5000 ``` #### MaxBatchSizeWait -MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait -DynamicBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size` -maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties +MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait. +MaxBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`. +Maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties. example setup to receive ~2048 events per commit. maximum wait is set to 30 seconds, streaming queue current size checked every 5 seconds ```properties @@ -98,7 +100,7 @@ database table = `inventory.customers` will be replicated to `default.testc_cdc_ ## Debezium Event Flattening -Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported +Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported. ```properties debezium.transforms=unwrap diff --git a/debezium-server-iceberg-sink/pom.xml b/debezium-server-iceberg-sink/pom.xml index d84881c8..c5d595df 100644 --- a/debezium-server-iceberg-sink/pom.xml +++ b/debezium-server-iceberg-sink/pom.xml @@ -161,7 +161,6 @@ mysql mysql-connector-java - 8.0.22 test diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java index ef6de4f9..c0733014 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java @@ -25,6 +25,7 @@ */ @Dependent @Named("DynamicBatchSizeWait") +@Deprecated public class DynamicBatchSizeWait implements InterfaceBatchSizeWait { protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class); diff --git a/docs/images/Debezium-Iceberg.png b/docs/images/Debezium-Iceberg.png new file mode 100644 index 00000000..8fadb487 Binary files /dev/null and b/docs/images/Debezium-Iceberg.png differ diff --git a/pom.xml b/pom.xml index 901b25b1..cb26509d 100644 --- a/pom.xml +++ b/pom.xml @@ -17,13 +17,6 @@ ${revision} pom - - - nexus-orgapacheiceberg - https://repository.apache.org/content/repositories/orgapacheiceberg-1018/ - - - 0.1.0-SNAPSHOT @@ -43,7 +36,8 @@ 2.16.88 1.11.1 - 1.7.0.Alpha1 + 1.7.0.CR1 + 8.0.26 2.0.3.Final @@ -77,6 +71,12 @@ import + + + mysql + mysql-connector-java + ${version.mysql.driver} + io.debezium