Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve documentation #15

Merged
merged 9 commits into from
Aug 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions BLOGPOST.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Using Debezium to Create ACID Data Lake House

Do you need to build flexible Data Lakehouse but dont know where to start, do you want your data pipeline to be near realtime and support ACID transactions and updates
its possible using two great projects Debezium and Apache Iceberg without any dependency to kafka or spark

#### Debezium
Debezium is an open source distributed platform for change data capture.
Debezium extracts realtime database changes as json, avro, protobuf events and delivers to event streaming platforms
(Kafka, Kinesis, Google Pub/Sub, Pulsar are just some of [supported sinks](https://debezium.io/documentation/reference/operations/debezium-server.html#_sink_configuration)),
it provides simple interface to [implement new sink](https://debezium.io/documentation/reference/operations/debezium-server.html#_implementation_of_a_new_sink)

#### Apache Iceberg
Apache Iceberg is an open table format for huge analytic datasets, with Concurrent ACID writes, it supports Insert and Row level Deletes(Update) [plus many other benefits](https://iceberg.apache.org)
Apache iceberg has great foundation and flexible API which currently supported by Spark, Presto, Trino, Flink and Hive

## debezium-server-iceberg

[@TODO visual architecture diagram]

This project puts both projects together and enables realtime data pipeline to any cloud storage, hdfs destination
with this project its becomes possible to use best features from both projects enjoy realtime structured data feed and ACID table format with update support

### Extending Debezium Server with Iceberg sink
debezium-server Iceberg sink to [Debezium server quarkus application](https://debezium.io/documentation/reference/operations/debezium-server.html#_installation),

debezium-server Iceberg sink received realtime json events converted to iceberg rows and processed using iceberg API
received rows are either appended or updated to destination iceberg table as Parquet files, since iceberg supports many cloud storage its easily possible to configure destination which could be
any of hadoop storage cloud storage location. with debezium-server-iceberg its easily possible to replicate your RDBMS to cloud storage

# update, append
Iceberg consumer by default works with upsert mode. When a row updated on source table destination row replaced with up-to-date record.
with upsert mode data at destination is always deduplicate and kept up to date


V 0.12 iceberg
retain deletes as soft delete!
# wait delay batch size

wait by reading debezium metrics! another great feature of debezium
# destination, iceberg catalog

@Contribution ..etc

# Links
[Apache iceberg](https://iceberg.apache.org/)
[Apache iceberg Github](https://github.com/apache/iceberg)
[Debezium](https://debezium.io/)
[Debezium Github](https://github.com/debezium/debezium)
41 changes: 41 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Contributing
We love your input! We want to make contributing to this project as easy and transparent as possible, whether it's:

- Reporting a bug
- Discussing the current state of the code
- Submitting a fix
- Proposing new features
- Becoming a maintainer

## We Develop with Github
We use github to host code, to track issues and feature requests, as well as accept pull requests.

## We Use [Github Flow](https://guides.github.com/introduction/flow/index.html), So All Code Changes Happen Through Pull Requests
Pull requests are the best way to propose changes to the codebase. We actively welcome your pull requests:

1. Fork the repo and create your branch from `master`.
2. If you've added code that should be tested, add tests.
3. If you've changed APIs, update the documentation.
4. Ensure the test suite passes.
5. Make sure your code is formatted.
6. Issue that pull request!

## Any contributions you make will be under the Apache 2.0 License
In short, when you submit code changes, your submissions are understood to be under the same [Apache-2.0 License](https://github.com/memiiso/debezium-server-iceberg/blob/master/LICENSE) that covers the project. Feel free to contact the maintainers if that's a concern.

## Report bugs using Github's [issues](https://github.com/memiiso/debezium-server-iceberg/issues)
We use GitHub issues to track public bugs. Report a bug by [opening a new issue](); it's that easy!

## Write bug reports with detail, background, and sample code
**Good Bug Reports** tend to have:

- A quick summary and/or background
- Steps to reproduce
- Be specific!
- Give sample code if you can.
- What you expected would happen
- What actually happens
- Notes (possibly including why you think this might be happening, or stuff you tried that didn't work)

## License
By contributing, you agree that your contributions will be licensed under Apache 2.0 License.
196 changes: 107 additions & 89 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,101 +2,153 @@

# Debezium Iceberg Consumers

-----
This project adds iceberg batch consumers
to [debezium server](https://debezium.io/documentation/reference/operations/debezium-server.html). it could be used to
replicate database changes to iceberg table, without requiring Spark, Kafka or Streaming platform.
This project adds iceberg consumer to [debezium server application](https://debezium.io/documentation/reference/operations/debezium-server.html). it could be used to
replicate database changes to iceberg table(Cloud storage, hdfs) without requiring Spark, Kafka or Streaming platform.

## `iceberg` Consumer

Appends json events to destination iceberg tables. Destination tables are created automatically if event and key schemas
enabled `debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true`
when destination table is not exists Consumer will print a warning message and continue replication of other tables
Iceberg consumer appends or upserts debezium events to destination iceberg tables. When event and key schemas
enabled (`debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true`) destination iceberg
tables created automatically.

### Upsert

By default `debezium.sink.iceberg.upsert=true` upsert feature is enabled, for tables with Primary Key definition it will
do upsert, for the tables without Primary Key it falls back to append mode

Setting `debezium.sink.iceberg.upsert=false` will change insert mode to append.
By default, iceberg consumer is running with upsert mode `debezium.sink.iceberg.upsert=true`.
for the tables with Primary Key definition consumer does upsert, for the tables without Primary Key consumer falls back to append mode

#### Data Deduplication

when iceberg consumer is doing upsert it does data deduplication for the batch, deduplication is done based
on `__source_ts_ms` field and event type `__op`
its is possible to change field using `debezium.sink.iceberg.upsert-source-ts-ms-column=__source_ts_ms`, Currently only
With upsert mode data deduplication is done per batch, deduplication is done based on `__source_ts_ms` value and event type `__op`
its is possible to change field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms`, Currently only
Long field type supported

operation type priorities are `{"c":1, "r":2, "u":3, "d":4}` when two record with same Key having same `__source_ts_ms`
values then the record with higher `__op` priority is kept
operation type priorities are `{"c":1, "r":2, "u":3, "d":4}` when two records with same Key and same `__source_ts_ms`
values received then the record with higher `__op` priority is added to destination table and duplicate record is dropped.

### Append
Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append, with append mode data deduplication is not done, all received records are appended to destination table
Note: For the tables without primary key operation mode is append even configuration is set to upsert mode

#### Keeping Deleted Records

By default `debezium.sink.iceberg.upsert-keep-deletes=true` will keep deletes in the iceberg table, setting it to false
will remove deleted records from the iceberg table too. with this feature its possible to keep last version of the
deleted record.
will remove deleted records from the destination iceberg table. With this config its possible to keep last version of the deleted
record in the table(to do soft deletes).

### Optimizing batch size (or commit interval)

Debezium extracts database events in real time and this could cause too frequent commits or too many small files
which is not optimal for batch processing especially when near realtime data feed is sufficient.
To avoid this problem following batch-size-wait classes are used.

Batch size wait adds delay between consumer calls to increase total number of events received per call and meanwhile events are collected in memory
this should be configured with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties


### Iceberg Table Names
#### NoBatchSizeWait

iceberg table names are created by following rule : `table-namespace`
.`table-prefix``database.server.name`_`database`_`table`
This is default configuration by default consumer will not use any batch size wait

#### DynamicBatchSizeWait

This wait strategy dynamically adds wait to increase batch size. Wait duration is calculated based on number of processed events in
last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes
are bigger than 90% of `max.batch.size` Wait duration will decrease

This strategy tries to keep batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size.

example setup to receive ~2048 events per commit. maximum wait is set to 5 seconds
```properties
debezium.source.max.queue.size=16000
debezium.source.max.batch.size=2048
debezium.sink.batch.batch-size-wait=DynamicBatchSizeWait
debezium.sink.batch.batch-size-wait.max-wait-ms=5000
```
#### MaxBatchSizeWait

For example
MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait
DynamicBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`
maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties

example setup to receive ~2048 events per commit. maximum wait is set to 30 seconds, streaming queue current size checked every 5 seconds
```properties
debezium.sink.iceberg.table-namespace = default
database.server.name = testc
debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc
debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.max.batch.size=2048");
debezium.source.max.queue.size=16000");
debezium.sink.batch.batch-size-wait.max-wait-ms=30000
debezium.sink.batch.batch-size-wait.wait-interval-ms=5000
```

### Destination Iceberg Table Names

iceberg table names created by following rule : `table-namespace`.`table-prefix``database.server.name`_`database`_`table`

For example with following config

```properties
debezium.sink.iceberg.table-namespace=default
database.server.name=testc
debezium.sink.iceberg.table-prefix=cdc_
```

database table = `inventory.customers` will be replicated to `default.testc_cdc_inventory_customers`

## Debezium Event Flattening

Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported

```properties
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.add.headers=db
debezium.transforms.unwrap.delete.handling.mode=rewrite
```

### Configuring iceberg

All the properties starting with `debezium.sink.iceberg.__ICEBERG_CONFIG__` are passed to iceberg, and to hadoopConf

```properties
debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg!
```

### Example Configuration

```properties
debezium.sink.type = iceberg
debezium.sink.iceberg.table-prefix = debeziumcdc_
debezium.sink.iceberg.table-namespace = default
debezium.sink.iceberg.catalog-name = default
debezium.sink.iceberg.fs.defaultFS = s3a://MY_S3_BUCKET
debezium.sink.iceberg.warehouse = s3a://MY_S3_BUCKET/iceberg_warehouse
debezium.sink.iceberg.user.timezone = UTC
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4 = true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4 = true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider = com.amazonaws.auth.DefaultAWSCredentialsProviderChain
debezium.sink.iceberg.fs.s3a.access.key = S3_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.secret.key = S3_SECRET_KEY
debezium.sink.iceberg.fs.s3a.path.style.access = true
debezium.sink.iceberg.fs.s3a.endpoint = http://localhost:9000 # minio specific setting
debezium.sink.iceberg.fs.s3a.impl = org.apache.hadoop.fs.s3a.S3AFileSystem
debezium.sink.iceberg.dynamic-wait=true
debezium.sink.batch.dynamic-wait.max-wait-ms=300000
debezium.sink.type=iceberg
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.catalog-name=mycatalog
debezium.sink.iceberg.table-namespace=debeziumevents
debezium.sink.iceberg.fs.defaultFS=s3a://MY_S3_BUCKET
debezium.sink.iceberg.warehouse=s3a://MY_S3_BUCKET/iceberg_warehouse
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
debezium.sink.iceberg.fs.s3a.access.key=S3_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.secret.key=S3_SECRET_KEY
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.endpoint=http://localhost:9000 # minio specific setting
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.upsert-op-column=__op
debezium.sink.iceberg.upsert-source-ts-ms-column=__source_ts_ms
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
```

All the properties starting with `debezium.sink.iceberg.**` are passed to iceberg, and hadoopConf

```properties
debezium.sink.iceberg.{iceberg.prop.name} = xyz-value # passed to iceberg!
```

## `icebergevents` Consumer

This consumer appends CDC events to single iceberg table as json string. This table is partitioned
by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms`
This is second consumer developed with this project, This consumer appends CDC events to single iceberg table as json string.
This table partitioned by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms`

#### Example Configuration

````properties
debezium.sink.type = icebergevents
debezium.sink.iceberg.catalog-name = default
debezium.sink.iceberg.dynamic-wait=true
debezium.sink.batch.dynamic-wait.max-wait-ms=300000
debezium.sink.type=icebergevents
debezium.sink.iceberg.catalog-name=default
````

Iceberg table definition:
Expand All @@ -118,37 +170,3 @@ static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA)
.asc("event_sink_epoch_ms", NullOrder.NULLS_LAST)
.build();
```

## Debezium Event Flattening

Iceberg consumer requires event flattening, Currently nested events and complex data types(like maps) are not supported

```properties
debezium.transforms = unwrap
debezium.transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields = op,table,lsn,source.ts_ms
debezium.transforms.unwrap.add.headers = db
debezium.transforms.unwrap.delete.handling.mode = rewrite
```

## Controlling Batch Size

`max.batch.size` Positive integer value that specifies the maximum size of each batch of events that should be processed
during each iteration of this connector. Defaults to 2048.

`debezium.sink.batch.dynamic-wait.max-wait-ms` Positive integer value that specifies the maximum number of milliseconds
dynamic wait could add delay to increase batch size. dynamic wait is calculated based on number of processed events in
last 3 batches. if last batch sizes are lower than `max.batch.size` max-wait-ms will increase and if last batch sizes
are bigger than 90% of `max.batch.size` max-wait-ms will decrease

it tries to keep batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size.

Change `debezium.source.max.batch.size` and `debezium.sink.batch.dynamic-wait.max-wait-ms` if you want to have less
frequent commits with larger batch size

```properties
# Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.
debezium.source.max.batch.size = 2
debezium.sink.iceberg.dynamic-wait=true
debezium.sink.batch.dynamic-wait.max-wait-ms=300000
```
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
boolean upsertKeepDeletes;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op")
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-source-ts-ms-column", defaultValue = "__source_ts_ms")
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

private TableIdentifier tableIdentifier;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Catalog icebergCatalog;
Table eventTable;
Expand All @@ -120,7 +119,7 @@ void connect() throws InterruptedException {
"Supported (debezium.format.key=*) formats are {json,}!");
}

tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events");
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events");

Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(this.hadoopConf::set);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ public ConfigSource() {
config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds!
// iceberg
config.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_");
config.put("debezium.sink.iceberg.table-namespace", "debeziumevents");
config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET);
config.put("debezium.sink.iceberg.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse");
config.put("debezium.sink.iceberg.type", "hadoop");
config.put("debezium.sink.iceberg.catalog-name", "mycatalog");
config.put("debezium.sink.iceberg.catalog-impl", "org.apache.iceberg.hadoop.HadoopCatalog");

// enable disable schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
@QuarkusTest
@QuarkusTestResource(S3Minio.class)
@QuarkusTestResource(SourceMysqlDB.class)
@TestProfile(BatchSparkChangeConsumerMysqlTestProfile.class)
public class BatchSparkChangeConsumerMysqlTest extends BaseSparkTest {
@TestProfile(IcebergChangeConsumerMysqlTestProfile.class)
public class IcebergChangeConsumerMysqlTest extends BaseSparkTest {


@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000")
Expand All @@ -51,6 +51,8 @@ public void testSimpleUpload() {
return false;
}
});

S3Minio.listFiles();
}

}
Loading