Skip to content

Commit

Permalink
Write Blog Post
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Sep 1, 2021
1 parent 30ee24c commit 879ae45
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 32 deletions.
58 changes: 26 additions & 32 deletions BLOGPOST.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,33 @@
# Using Debezium to Create ACID Data Lake

Do you need to build flexible Data Lake? do you want your data pipeline to be near realtime and support ACID transactions, support updates on data lake?
Now its possible with Debezium Server Iceberg project( build on "Debezium" and "Apache Iceberg" projects) without any dependency to kafka or spark applications
Do you need to build Data Lake with near realtime data pipeline, do you want it to support ACID transactions and updates on data lake?
It is possible with Debezium Server Iceberg( build using "Debezium" and "Apache Iceberg" projects). and it has no dependency to kafka or spark applications

#### Debezium
Debezium is an open source distributed platform for change data capture.
[Debezium](https://debezium.io/) is an open source distributed platform for change data capture.
Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms
(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of [supported sinks](https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration)),
it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink)

#### Apache Iceberg
Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes. It supports Insert and Row level Deletes(Update)[it has many other benefits](https://iceberg.apache.org)
Apache iceberg has great foundation and flexible API which currently integrated by Spark, Presto, Trino, Flink and Hive engines
[Apache Iceberg](https://iceberg.apache.org/) is an open table format for huge analytic datasets.
Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table.
It supports Insert, Row level Deletes/Updates. It has great and flexible foundation, its has [many other features](https://iceberg.apache.org)

## Debezium Server Iceberg

[@TODO visual architecture diagram]
**Debezium Server Iceberg** project ads Iceberg consumer to debezium server.
Iceberg consumer converts received events to iceberg format and commits them to destination iceberg table. With the consumer it's possible to configure any supported iceberg destination/catalog.
If destination table not found in the destination catalog consumer will automatically try to create it using event schema and key schema(record key)

**Debezium Server Iceberg** project ads Iceberg consumer,
Iceberg consumer processes received events and then commits them to destination iceberg table.
Its possible to configure any supported iceberg destination/catalog.
If destination table not found in the destination catalog consumer will try to create it using event(table schema) and key schema(record key)
on high level iceberg consumer
groups batch of events to event destination,
for each destination, events are converted to iceberg records. at this step event schema used to do type mapping to iceberg record that's why `debezium.format.value.schemas.enable` should be enabled(true).
After debezium events converted to iceberg records, they are saved to iceberg parquet files(iceberg data and delete files(for upsert)),
as last step these files are committed to destination table as data and delete file using iceberg java API.

Consumer groups batch of events to event destination,
for each destination events are converted to iceberg records, event schema used to do data type mapping to iceberg record.
After debezium events converted to iceberg records, they are saved to iceberg parquet files(data, delete files),
as last step these files are committed to destination table using iceberg java API.

Iceberg Consumer is based on json events it requires event schema to do data type conversion,
and currently nested data types are not supported, so it requires flattening.
Currently, Iceberg Consumer works only with json events. With json events it requires event schema to do data type conversion.
Currently, nested data types are not supported, so it requires event flattening.

example configuration
```properties
Expand Down Expand Up @@ -58,6 +57,7 @@ debezium.transforms.unwrap.drop.tombstones=true
### update, append
By default, Iceberg sink is running with upsert mode `debezium.sink.iceberg.upsert=true`. When a row updated on source table destination row replaced with the new updated version.
With upsert mode data at destination kept identical to source data. Update mode uses iceberg equality delete feature and creates delete files using record key of target table
With update mode to avoid duplicate data deduplication is done on each batch

Note: For the tables without record key operation mode falls back to append even configuration is set to upsert mode

Expand All @@ -67,19 +67,18 @@ For some use cases it's useful to keep deleted records as soft deletes, this is
this setting will keep the latest version of deleted records (`__op=d`) in the iceberg table. Setting it to false will remove deleted records from the destination table.

### Append
Setting `debezium.sink.iceberg.upsert` to false sets the operation mode to append, with append mode data deduplication is not done and all received records are appended to destination table

Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode
this is most straightforward operation mode, setting `debezium.sink.iceberg.upsert` to false sets the operation mode to append,
with append mode data deduplication is not done and all received records are appended to destination table

### Optimizing batch size (commit interval)

Debezium extracts/consumes database events in real time and this could cause too frequent commits(too many small files) to iceberg table,
Debezium extracts/consumes database events in real time and this could cause too frequent commits( and too many small files) to iceberg table,
which is not optimal for batch processing especially when near realtime data feed is sufficient.
To avoid this problem its possible to use following config and increase batch size per commit
To avoid this problem it's possible to use following configuration and increase batch size per commit

**MaxBatchSizeWait**: This setting adds delay based on debezium metrics,
it periodically monitors streaming queue size, and it starts processing events when it reaches `debezium.source.max.batch.size` value
during wait debezium events are collected in memory (in debezium streaming queue)
during the wait debezium events are collected in memory (in debezium streaming queue) and this way each commit receives more and consistent batch size
this setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties

example setting:
Expand All @@ -95,15 +94,10 @@ debezium.sink.batch.batch-size-wait.wait-interval-ms=10000
```

### destination, iceberg catalog
The consumer uses iceberg catalog to read and commit data to destination table, all the catalog types and storage types used by Iceberg are supported.

### Contribution
This project is very new and there are many improvements to do new features, please feel free to test it , give feedback, open feature request or send pull request.

- [Project](https://github.com/memiiso/debezium-server-iceberg)
- [Releases](https://github.com/memiiso/debezium-server-iceberg/releases)
This project is very new and there are many things to improve, please feel free to test it, give feedback, open feature request or send pull request.

## Links
- [Apache iceberg](https://iceberg.apache.org/)
- [Apache iceberg Github](https://github.com/apache/iceberg)
- [Debezium](https://debezium.io/)
- [Debezium Github](https://github.com/debezium/debezium)
- [for more details please see the project](https://github.com/memiiso/debezium-server-iceberg)
- [Releases](https://github.com/memiiso/debezium-server-iceberg/releases)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
@Dependent
@Named("DynamicBatchSizeWait")
@Deprecated
public class DynamicBatchSizeWait implements InterfaceBatchSizeWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class);

Expand Down

0 comments on commit 879ae45

Please sign in to comment.