Skip to content

Commit

Permalink
Write Blog Post
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Sep 19, 2021
1 parent 879ae45 commit 1d3ef18
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 103 deletions.
156 changes: 156 additions & 0 deletions 2021-10-01-debezium-server-iceberg.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
---
layout: post
title: Using Debezium to Create ACID Data Lake
date: 2021-10-01
tags: [ debezium, iceberg, datalake ]
author: isimsek
---

Do you need to build Lakehouse with near realtime data pipeline, do you want it to support ACID transactions and updates on data lake?
It is possible with "Debezium Server Iceberg" without any dependency to kafka or spark applications

+++<!-- more -->+++

==== Debezium

https://debezium.io/[Debezium] is an open source distributed platform for change data capture.
Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms
(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration[supported sinks],
it provides simple interface to https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink[implement new sink]

==== Apache Iceberg

https://iceberg.apache.org/[Apache Iceberg] is an open table format for huge analytic datasets.
Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table.
It supports ACID Inserts, Row level Deletes/Updates. It has flexible foundation and provides https://iceberg.apache.org[many features]

== Debezium Server Iceberg

**Debezium Server Iceberg** project implements new consumer which uses Iceberg Java API to consume events.
Iceberg consumer converts received events to iceberg format and commits them to destination iceberg table. With the consumer it's possible to use cloud storage or catalog supported by iceberg.

on high level data processing works like below ::
* it groups set of events per event destination,
* for each destination, events are converted to iceberg records. at this step event schema used to map data to iceberg record (`debezium.format.value.schemas.enable` should be enabled(true)).
* After events converted to iceberg records, they are saved to iceberg parquet files(iceberg data and delete files(for upsert)).
* as last step these files are committed to destination table(uploaded to destination storage)) as data and delete file using iceberg java API.

If destination table not found in the destination catalog consumer will automatically try to create it using event schema and key schema(record key)

Currently, Iceberg Consumer supports only json events. With json events event schema must be enabled to do correct type conversion to iceberg record.
and complex nested data types are not supported, so event flattening must be enabled.

example configuration::
[source,properties]
----
debezium.sink.type=iceberg
# run with append mode
debezium.sink.iceberg.upsert=false
debezium.sink.iceberg.upsert-keep-deletes=true
# iceberg
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.table-namespace=debeziumevents
debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET);
debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.catalog-name=mycatalog
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.value=json
# unwrap message
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
----

=== Update, Append modes

By default, Iceberg sink is running with upsert mode `debezium.sink.iceberg.upsert=true`. When a row updated on source table destination row replaced with the new updated version, and deleted records are deleted from destination. With upsert mode data at destination kept identical to source data. Update mode uses iceberg equality delete feature and creates delete files using key of the debezium event. With update mode to avoid duplicate data deduplication is done on each batch.

Note: For the tables without record key(PK) operation mode falls back to append even configuration is set to upsert mode

==== Keeping Deleted Records

For some use cases it's useful to keep deleted records as soft deletes, this is possible by setting `debezium.sink.iceberg.upsert-keep-deletes` to true
this setting will keep the latest version of deleted records (`__op=d`) in the iceberg table. Setting it to false will remove deleted records from the destination table.

==== Append mode

this is most straightforward operation mode, setting `debezium.sink.iceberg.upsert` to `false` sets the operation mode to append,
with append mode data deduplication is not done and all received records are appended to destination table

=== Optimizing batch size (commit interval)

Debezium extracts/consumes database events in real time and this could cause too frequent commits( generate too many small files) to iceberg table,
which is not optimal for batch processing especially when near realtime data feed is sufficient.
To avoid this problem it's possible to use following configuration and increase batch size per commit

**MaxBatchSizeWait**: uses debezium metrics to optimize batch size, it periodically reads streaming queue current size and waits until it reaches to `max.batch.size`
maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties
during the wait debezium events are collected in memory (in debezium streaming queue) and this way each commit receives more and consistent batch size
Note: this setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties

Note: It's also possible to do data compaction using iceberg, compacting data and metadata files to get best performance.

example setting::

[source,properties]
----
debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc
debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.max.batch.size=50000
debezium.source.max.queue.size=400000
debezium.sink.batch.batch-size-wait.max-wait-ms=60000
debezium.sink.batch.batch-size-wait.wait-interval-ms=10000
----

=== Destination, iceberg catalog

Iceberg consumer uses iceberg catalog to read and commit data to destination table, destination could be any cloud storage and any catalog supported by iceberg.

== Next Datawarehouse, Curated layer

Now we got perfect raw layer with near realtime data feed which we can build Curated Layer,Analytic Layer or Datawarehouse on top of.

for example i could easily use Spark SQL(or Prestodb, Trino, Flink) and process this data to next layer:)

[source,sql]
----
MERGE INTO dwh.consumers t
USING (
-- new data goes to insert
SELECT customer_id, name, effective_date, to_date('9999-12-31', 'yyyy-MM-dd') as end_date FROM debezium.consumers
UNION ALL
-- update exiting records and close them
SELECT t.customer_id, t.name, t.effective_date, s.effective_date as end_date FROM debezium.consumers s
INNER JOIN dwh.consumers t on s.customer_id = t.customer_id AND t.current = true
) s
ON s.customer_id = t.customer_id AND s.effective_date = t.effective_date
-- close last record.
WHEN MATCHED
THEN UPDATE SET t.current = false, t.end_date = s.end_date
WHEN NOT MATCHED THEN
INSERT(customer_id, name, current, effective_date, end_date)
VALUES(s.customer_id, s.name, true, s.effective_date, s.end_date);
----

its also possible to use delete insert
[source,sql]
----
----

in https://github.com/ismailsimsek/iceberg-examples[iceberg examples] project you could see more examples and experiment with iceberg spark

=== Contribution

This project is new and there are many things to improve, please feel free to test it, give feedback, open feature request or send pull request.

- https://github.com/memiiso/debezium-server-iceberg[For more details please see the project]
- https://github.com/memiiso/debezium-server-iceberg/releases[Releases]
103 changes: 0 additions & 103 deletions BLOGPOST.md

This file was deleted.

0 comments on commit 1d3ef18

Please sign in to comment.