From 1d3ef185df787b9a6379bc287e35bf1bca48a3d5 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Wed, 1 Sep 2021 13:37:30 +0200 Subject: [PATCH] Write Blog Post --- 2021-10-01-debezium-server-iceberg.adoc | 156 ++++++++++++++++++++++++ BLOGPOST.md | 103 ---------------- 2 files changed, 156 insertions(+), 103 deletions(-) create mode 100644 2021-10-01-debezium-server-iceberg.adoc delete mode 100644 BLOGPOST.md diff --git a/2021-10-01-debezium-server-iceberg.adoc b/2021-10-01-debezium-server-iceberg.adoc new file mode 100644 index 00000000..3cfaddbd --- /dev/null +++ b/2021-10-01-debezium-server-iceberg.adoc @@ -0,0 +1,156 @@ +--- +layout: post +title: Using Debezium to Create ACID Data Lake +date: 2021-10-01 +tags: [ debezium, iceberg, datalake ] +author: isimsek +--- + +Do you need to build Lakehouse with near realtime data pipeline, do you want it to support ACID transactions and updates on data lake? +It is possible with "Debezium Server Iceberg" without any dependency to kafka or spark applications + +++++++ + +==== Debezium + +https://debezium.io/[Debezium] is an open source distributed platform for change data capture. +Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms +(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration[supported sinks], +it provides simple interface to https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink[implement new sink] + +==== Apache Iceberg + +https://iceberg.apache.org/[Apache Iceberg] is an open table format for huge analytic datasets. +Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table. +It supports ACID Inserts, Row level Deletes/Updates. It has flexible foundation and provides https://iceberg.apache.org[many features] + +== Debezium Server Iceberg + +**Debezium Server Iceberg** project implements new consumer which uses Iceberg Java API to consume events. +Iceberg consumer converts received events to iceberg format and commits them to destination iceberg table. With the consumer it's possible to use cloud storage or catalog supported by iceberg. + +on high level data processing works like below :: +* it groups set of events per event destination, +* for each destination, events are converted to iceberg records. at this step event schema used to map data to iceberg record (`debezium.format.value.schemas.enable` should be enabled(true)). +* After events converted to iceberg records, they are saved to iceberg parquet files(iceberg data and delete files(for upsert)). +* as last step these files are committed to destination table(uploaded to destination storage)) as data and delete file using iceberg java API. + +If destination table not found in the destination catalog consumer will automatically try to create it using event schema and key schema(record key) + +Currently, Iceberg Consumer supports only json events. With json events event schema must be enabled to do correct type conversion to iceberg record. +and complex nested data types are not supported, so event flattening must be enabled. + +example configuration:: +[source,properties] +---- +debezium.sink.type=iceberg +# run with append mode +debezium.sink.iceberg.upsert=false +debezium.sink.iceberg.upsert-keep-deletes=true +# iceberg +debezium.sink.iceberg.table-prefix=debeziumcdc_ +debezium.sink.iceberg.table-namespace=debeziumevents +debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET); +debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse +debezium.sink.iceberg.type=hadoop +debezium.sink.iceberg.catalog-name=mycatalog +debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog +# enable event schemas +debezium.format.value.schemas.enable=true +debezium.format.value=json +# unwrap message +debezium.transforms=unwrap +debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState +debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db +debezium.transforms.unwrap.delete.handling.mode=rewrite +debezium.transforms.unwrap.drop.tombstones=true +---- + +=== Update, Append modes + +By default, Iceberg sink is running with upsert mode `debezium.sink.iceberg.upsert=true`. When a row updated on source table destination row replaced with the new updated version, and deleted records are deleted from destination. With upsert mode data at destination kept identical to source data. Update mode uses iceberg equality delete feature and creates delete files using key of the debezium event. With update mode to avoid duplicate data deduplication is done on each batch. + +Note: For the tables without record key(PK) operation mode falls back to append even configuration is set to upsert mode + +==== Keeping Deleted Records + +For some use cases it's useful to keep deleted records as soft deletes, this is possible by setting `debezium.sink.iceberg.upsert-keep-deletes` to true +this setting will keep the latest version of deleted records (`__op=d`) in the iceberg table. Setting it to false will remove deleted records from the destination table. + +==== Append mode + +this is most straightforward operation mode, setting `debezium.sink.iceberg.upsert` to `false` sets the operation mode to append, +with append mode data deduplication is not done and all received records are appended to destination table + +=== Optimizing batch size (commit interval) + +Debezium extracts/consumes database events in real time and this could cause too frequent commits( generate too many small files) to iceberg table, +which is not optimal for batch processing especially when near realtime data feed is sufficient. +To avoid this problem it's possible to use following configuration and increase batch size per commit + +**MaxBatchSizeWait**: uses debezium metrics to optimize batch size, it periodically reads streaming queue current size and waits until it reaches to `max.batch.size` +maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties +during the wait debezium events are collected in memory (in debezium streaming queue) and this way each commit receives more and consistent batch size +Note: this setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties + +Note: It's also possible to do data compaction using iceberg, compacting data and metadata files to get best performance. + +example setting:: + +[source,properties] +---- +debezium.sink.batch.batch-size-wait=MaxBatchSizeWait +debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc +debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc +debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +debezium.source.max.batch.size=50000 +debezium.source.max.queue.size=400000 +debezium.sink.batch.batch-size-wait.max-wait-ms=60000 +debezium.sink.batch.batch-size-wait.wait-interval-ms=10000 +---- + +=== Destination, iceberg catalog + +Iceberg consumer uses iceberg catalog to read and commit data to destination table, destination could be any cloud storage and any catalog supported by iceberg. + +== Next Datawarehouse, Curated layer + +Now we got perfect raw layer with near realtime data feed which we can build Curated Layer,Analytic Layer or Datawarehouse on top of. + +for example i could easily use Spark SQL(or Prestodb, Trino, Flink) and process this data to next layer:) + +[source,sql] +---- +MERGE INTO dwh.consumers t + USING ( + -- new data goes to insert + SELECT customer_id, name, effective_date, to_date('9999-12-31', 'yyyy-MM-dd') as end_date FROM debezium.consumers + UNION ALL + -- update exiting records and close them + SELECT t.customer_id, t.name, t.effective_date, s.effective_date as end_date FROM debezium.consumers s + INNER JOIN dwh.consumers t on s.customer_id = t.customer_id AND t.current = true + + ) s + ON s.customer_id = t.customer_id AND s.effective_date = t.effective_date + -- close last record. + WHEN MATCHED + THEN UPDATE SET t.current = false, t.end_date = s.end_date + WHEN NOT MATCHED THEN + INSERT(customer_id, name, current, effective_date, end_date) + VALUES(s.customer_id, s.name, true, s.effective_date, s.end_date); +---- + +its also possible to use delete insert +[source,sql] +---- + +---- + +in https://github.com/ismailsimsek/iceberg-examples[iceberg examples] project you could see more examples and experiment with iceberg spark + +=== Contribution + +This project is new and there are many things to improve, please feel free to test it, give feedback, open feature request or send pull request. + +- https://github.com/memiiso/debezium-server-iceberg[For more details please see the project] +- https://github.com/memiiso/debezium-server-iceberg/releases[Releases] \ No newline at end of file diff --git a/BLOGPOST.md b/BLOGPOST.md deleted file mode 100644 index b286c5ea..00000000 --- a/BLOGPOST.md +++ /dev/null @@ -1,103 +0,0 @@ -# Using Debezium to Create ACID Data Lake - -Do you need to build Data Lake with near realtime data pipeline, do you want it to support ACID transactions and updates on data lake? -It is possible with Debezium Server Iceberg( build using "Debezium" and "Apache Iceberg" projects). and it has no dependency to kafka or spark applications - -#### Debezium -[Debezium](https://debezium.io/) is an open source distributed platform for change data capture. -Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms -(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of [supported sinks](https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration)), -it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink) - -#### Apache Iceberg -[Apache Iceberg](https://iceberg.apache.org/) is an open table format for huge analytic datasets. -Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table. -It supports Insert, Row level Deletes/Updates. It has great and flexible foundation, its has [many other features](https://iceberg.apache.org) - -## Debezium Server Iceberg - -**Debezium Server Iceberg** project ads Iceberg consumer to debezium server. -Iceberg consumer converts received events to iceberg format and commits them to destination iceberg table. With the consumer it's possible to configure any supported iceberg destination/catalog. -If destination table not found in the destination catalog consumer will automatically try to create it using event schema and key schema(record key) - -on high level iceberg consumer -groups batch of events to event destination, -for each destination, events are converted to iceberg records. at this step event schema used to do type mapping to iceberg record that's why `debezium.format.value.schemas.enable` should be enabled(true). -After debezium events converted to iceberg records, they are saved to iceberg parquet files(iceberg data and delete files(for upsert)), -as last step these files are committed to destination table as data and delete file using iceberg java API. - -Currently, Iceberg Consumer works only with json events. With json events it requires event schema to do data type conversion. -Currently, nested data types are not supported, so it requires event flattening. - -example configuration -```properties -debezium.sink.type=iceberg -# run with append mode -debezium.sink.iceberg.upsert=false -debezium.sink.iceberg.upsert-keep-deletes=true -# iceberg -debezium.sink.iceberg.table-prefix=debeziumcdc_ -debezium.sink.iceberg.table-namespace=debeziumevents -debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET); -debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse -debezium.sink.iceberg.type=hadoop -debezium.sink.iceberg.catalog-name=mycatalog -debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog -# enable event schemas -debezium.format.value.schemas.enable=true -debezium.format.value=json -# unwrap message -debezium.transforms=unwrap -debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState -debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db -debezium.transforms.unwrap.delete.handling.mode=rewrite -debezium.transforms.unwrap.drop.tombstones=true -``` - -### update, append -By default, Iceberg sink is running with upsert mode `debezium.sink.iceberg.upsert=true`. When a row updated on source table destination row replaced with the new updated version. -With upsert mode data at destination kept identical to source data. Update mode uses iceberg equality delete feature and creates delete files using record key of target table -With update mode to avoid duplicate data deduplication is done on each batch - -Note: For the tables without record key operation mode falls back to append even configuration is set to upsert mode - -#### Keeping Deleted Records - -For some use cases it's useful to keep deleted records as soft deletes, this is possible by setting `debezium.sink.iceberg.upsert-keep-deletes` to true -this setting will keep the latest version of deleted records (`__op=d`) in the iceberg table. Setting it to false will remove deleted records from the destination table. - -### Append -this is most straightforward operation mode, setting `debezium.sink.iceberg.upsert` to false sets the operation mode to append, -with append mode data deduplication is not done and all received records are appended to destination table - -### Optimizing batch size (commit interval) - -Debezium extracts/consumes database events in real time and this could cause too frequent commits( and too many small files) to iceberg table, -which is not optimal for batch processing especially when near realtime data feed is sufficient. -To avoid this problem it's possible to use following configuration and increase batch size per commit - -**MaxBatchSizeWait**: This setting adds delay based on debezium metrics, -it periodically monitors streaming queue size, and it starts processing events when it reaches `debezium.source.max.batch.size` value -during the wait debezium events are collected in memory (in debezium streaming queue) and this way each commit receives more and consistent batch size -this setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties - -example setting: -```properties -debezium.sink.batch.batch-size-wait=MaxBatchSizeWait -debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc -debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc -debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector -debezium.source.max.batch.size=50000 -debezium.source.max.queue.size=400000 -debezium.sink.batch.batch-size-wait.max-wait-ms=60000 -debezium.sink.batch.batch-size-wait.wait-interval-ms=10000 -``` - -### destination, iceberg catalog -The consumer uses iceberg catalog to read and commit data to destination table, all the catalog types and storage types used by Iceberg are supported. - -### Contribution -This project is very new and there are many things to improve, please feel free to test it, give feedback, open feature request or send pull request. - -- [for more details please see the project](https://github.com/memiiso/debezium-server-iceberg) -- [Releases](https://github.com/memiiso/debezium-server-iceberg/releases) \ No newline at end of file