Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update documentation #94

Merged
merged 1 commit into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
![contributions welcome](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat)
![Java CI](https://github.com/memiiso/debezium-server-iceberg/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master)

# Debezium Iceberg Consumers
# Debezium Iceberg Consumer

This project adds iceberg consumer to [Debezium Server](https://debezium.io/documentation/reference/operations/debezium-server.html). It could be used to
replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform.
This project adds iceberg consumer
to [Debezium Server](https://debezium.io/documentation/reference/operations/debezium-server.html). It could be used to
replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or
Streaming platform. It's possible to consume data append or upsert modes.

More detail available in [documentation page](docs/DOCS.md)
More detail available in [Documentation Page](docs/DOCS.md)
Also, check [caveats](docs/CAVEATS.md) for better understanding the current limitation and proper workaround

![Debezium Iceberg](docs/images/debezium-iceberg.png)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String keyFormat;
@ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS")
String defaultFs;
@ConfigProperty(name = "debezium.sink.iceberg.destination-regexp", defaultValue = "")
protected Optional<String> destinationRegexp;
@ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
@ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS")
String defaultFs;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
Expand Down Expand Up @@ -131,10 +129,6 @@ void connect() {
conf.forEach(this.hadoopConf::set);
this.icebergProperties.putAll(conf);

if (warehouseLocation == null || warehouseLocation.trim().isEmpty()) {
warehouseLocation = defaultFs + "/iceberg_warehouse";
}

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);

// create table if not exists
Expand Down
11 changes: 9 additions & 2 deletions docs/CAVEATS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
This connector only writes using iceberg table V2 spec (delete events will be written to delete files instead of rewrite data files)

## No automatic schema evolution
Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting.
Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the
connector throw error. To work around this, turn off schema change event in `source` setting.

- For SQL Server, set `debezium.source.include.schema.changes=false`

## Specific tables replication
By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnessesary iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate too many tables that you don't really want to. More on this setting in [Debezium server source](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-table-include-list).

By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnecessary
iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table,
set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate
too many tables that you don't really want to. More on this setting
in [Debezium server source](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-table-include-list)
.

## AWS S3 credentials
You can setup aws credentials in the following ways:
Expand Down
103 changes: 61 additions & 42 deletions docs/DOCS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,81 @@ Replicates database CDC events to Iceberg(Cloud storage, hdfs) without using Spa

## `iceberg` Consumer

Iceberg consumer replicates debezium CDC events to destination Iceberg tables. It is possible to replicate source database one to one or run it with append mode and keep all change events in iceberg table. When event and key schema
enabled (`debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true`) destination Iceberg
Iceberg consumer replicates database CDC events to destination Iceberg tables. It is possible to replicate source
database upsert or append modes.
When event and key schema enabled (`debezium.format.value.schemas.enable=true`
, `debezium.format.key.schemas.enable=true`) destination Iceberg
tables created automatically with initial job.

#### Configuration properties

| Config | Default | Description |
|----------------------------------------------------|-------------------|------------------------------------------------------------------------------------------------------------------|
| `debezium.sink.iceberg.warehouse` | | The root path of the Iceberg data warehouse |
| `debezium.sink.iceberg.catalog-name` | `default` | User-specified catalog name. |
| `debezium.sink.iceberg.table-namespace` | `default` | A namespace in the catalog. ex: `SELECT * FROM prod.db.table -- catalog: prod, namespace: db, table: table` |
| `debezium.sink.iceberg.table-prefix` | `` | Prefix added to destination iceberg table names. |
| `debezium.sink.iceberg.write.format.default` | `parquet` | Default file format for the table; `parquet`, `avro`, or `orc` |
| `debezium.sink.iceberg.allow-field-addition` | `true` | Allow field addition to target tables |
| `debezium.sink.iceberg.upsert` | `true` | Running upsert mode overwriting updated rows. explained below. |
| `debezium.sink.iceberg.upsert-keep-deletes` | `true` | With upsert mode, keeps deleted rows in target table. |
| `debezium.sink.iceberg.upsert-dedup-column` | `__source_ts_ms` | With upsert mode used to deduplicate data. row with highest `__source_ts_ms` kept. _
dont change!_ |
| `debezium.sink.iceberg.upsert-op-column` | `__op` | Used with upsert mode. _dont
change!_ |
| `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination table. With this its possible to map `table_ptt1`,`table_ptt2` to `table_combined`. |
| `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp Replace part to modify destination table |
| `debezium.sink.batch.batch-size-wait` | `NoBatchSizeWait` | Batch size wait strategy to optimize data files and upload interval. explained below. |
| `debezium.sink.iceberg.{iceberg.prop.name}` | | [Iceberg config](https://iceberg.apache.org/docs/latest/configuration/) passed to Iceberg, and to hadoopConf |

### Upsert

By default, Iceberg consumer is running with upsert mode `debezium.sink.iceberg.upsert=true`.
Upsert mode uses source Primary Key and does upsert on target table(delete followed by insert). For the tables without Primary Key consumer falls back to append mode.
By default, Iceberg consumer is running with upsert mode `debezium.sink.iceberg.upsert=true`.
Upsert mode uses source Primary Key and does upsert on target table(delete followed by insert). For the tables without
Primary Key consumer falls back to append mode.

#### Data Deduplication
#### Upsert Mode Data Deduplication

With upsert mode per batch data deduplication is done. Deduplication is done based on `__source_ts_ms` value and event type `__op`.
its is possible to change field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms`. Currently only
Long field type supported.
With upsert mode data deduplication is done. Deduplication is done based on `__source_ts_ms` value and event type `__op`
.
its is possible to change this field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms` (Currently only
Long field type supported.)

Operation type priorities are `{"c":1, "r":2, "u":3, "d":4}`. When two records with same key and same `__source_ts_ms`
values received then the record with higher `__op` priority is kept and added to destination table and duplicate record is dropped.
values received then the record with higher `__op` priority is kept and added to destination table and duplicate record
is dropped from stream.

### Append
Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append. With append mode data deduplication is not done and all received records are appended to destination table.
Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode

Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append. With append mode data deduplication
is not done and all received records are appended to destination table.
Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode.

#### Keeping Deleted Records

By default `debezium.sink.iceberg.upsert-keep-deletes=true` keeps deletes in the Iceberg table, setting it to false
will remove deleted records from the destination Iceberg table. With this config it's possible to keep last version of a
record in the destination Iceberg table(doing soft delete).
will remove deleted records from the destination Iceberg table too. With this config it's possible to keep last version
of a
record in the destination Iceberg table(doing soft delete for this records `__deleted` is set to `true`).

### Optimizing batch size (or commit interval)

Debezium extracts database events in real time and this could cause too frequent commits or too many small files
which is not optimal for batch processing especially when near realtime data feed is sufficient.
To avoid this problem following batch-size-wait classes are used.

Batch size wait adds delay between consumer calls to increase total number of events received per call and meanwhile events are collected in memory.
This setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties
which is not optimal for batch processing especially when near realtime data feed is sufficient.
To avoid this problem following batch-size-wait classes are available to adjust batch size and interval.

Batch size wait adds delay between consumer calls to increase total number of events consumed per call. Meanwhile,
events are collected in memory.
This setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size`
debezium properties

#### NoBatchSizeWait

This is default configuration by default consumer will not use any wait. All the events are consumed immediately.

#### DynamicBatchSizeWait
**Deprecated**
This wait strategy dynamically adds wait to increase batch size. Wait duration is calculated based on number of processed events in
last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes
are bigger than 90% of `max.batch.size` Wait duration will decrease

This strategy optimizes batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size.

example setup to receive ~2048 events per commit. maximum wait is set to 5 seconds
```properties
debezium.source.max.queue.size=16000
debezium.source.max.batch.size=2048
debezium.sink.batch.batch-size-wait=DynamicBatchSizeWait
debezium.sink.batch.batch-size-wait.max-wait-ms=5000
```
#### MaxBatchSizeWait

MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait.
MaxBatchSizeWait uses debezium metrics to optimize batch size.
MaxBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`.
Maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties.

Expand Down Expand Up @@ -119,16 +134,21 @@ Read [application.properties.example](../debezium-server-iceberg-sink/src/main/r

## Schema Change Behaviour

It is possible to get out of sync schemas between source and target tables. Foexample when the source database change its schema, adds or drops field. Here we documented possible schema changes and current behavior of the Iceberg consumer.
It is possible to get out of sync schemas between source and target tables. For Example when the source database change
its schema, adds or drops field. Below possible schema changes and current behavior of the Iceberg consumer id
documented.

#### Adding new column to source (A column missing in destination iceberg table)
Data of the new column is ignored till same column added to destination iceberg table

Dor example: if a column not found in iceberg table its data is dropped ignored and not copied to target!
When `debezium.sink.iceberg.allow-field-addition` is `false` data of the new column is ignored till the column added to
destination iceberg table.

For example: if a column not found in iceberg table its data ignored and not copied to target!
once iceberg table adds same column then data for this column recognized and populated

#### Removing column from source (An extra column in iceberg table)
These columns are populated with null value

These columns are populated with null value.

#### Renaming column in source
This is combination of above two cases : old column will be populated with null values and new column will not be recognized and populated till it's added to iceberg table
Expand All @@ -143,17 +163,16 @@ If representation cannot be converted to a long (including structured types like

## `icebergevents` Consumer

This is second consumer in this project. This consumer appends CDC events to single Iceberg table as json string.
This table partitioned by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms`
This consumer appends CDC events to single Iceberg table as json string.
This table partitioned by `event_destination,event_sink_timestamptz` ~~WIP and sorted by `event_sink_epoch_ms`~~

#### Example Configuration

````properties
debezium.sink.type=icebergevents
debezium.sink.iceberg.catalog-name=default
````

Iceberg table definition:
Destination table definition:

```java
static final String TABLE_NAME="debezium_events";
Expand Down