From 477b0c7f605b722df280002dc64c624309a1f6af Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Fri, 12 Aug 2022 22:35:14 +0200 Subject: [PATCH] Update documentation (#94) --- README.md | 10 +- .../server/iceberg/IcebergChangeConsumer.java | 2 - .../iceberg/IcebergEventsChangeConsumer.java | 6 - docs/CAVEATS.md | 11 +- docs/DOCS.md | 103 +++++++++++------- 5 files changed, 76 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index 3f21d24c..2e330ab1 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,14 @@ ![contributions welcome](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat) ![Java CI](https://github.com/memiiso/debezium-server-iceberg/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master) -# Debezium Iceberg Consumers +# Debezium Iceberg Consumer -This project adds iceberg consumer to [Debezium Server](https://debezium.io/documentation/reference/operations/debezium-server.html). It could be used to -replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform. +This project adds iceberg consumer +to [Debezium Server](https://debezium.io/documentation/reference/operations/debezium-server.html). It could be used to +replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or +Streaming platform. It's possible to consume data append or upsert modes. -More detail available in [documentation page](docs/DOCS.md) +More detail available in [Documentation Page](docs/DOCS.md) Also, check [caveats](docs/CAVEATS.md) for better understanding the current limitation and proper workaround ![Debezium Iceberg](docs/images/debezium-iceberg.png) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 3db022bc..9aa00bbc 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -83,8 +83,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String keyFormat; @ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION) String warehouseLocation; - @ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS") - String defaultFs; @ConfigProperty(name = "debezium.sink.iceberg.destination-regexp", defaultValue = "") protected Optional destinationRegexp; @ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "") diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index 431cd557..d1115d60 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -99,8 +99,6 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D String valueFormat; @ConfigProperty(name = "debezium.format.key", defaultValue = "json") String keyFormat; - @ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS") - String defaultFs; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") String namespace; @ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default") @@ -131,10 +129,6 @@ void connect() { conf.forEach(this.hadoopConf::set); this.icebergProperties.putAll(conf); - if (warehouseLocation == null || warehouseLocation.trim().isEmpty()) { - warehouseLocation = defaultFs + "/iceberg_warehouse"; - } - icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf); // create table if not exists diff --git a/docs/CAVEATS.md b/docs/CAVEATS.md index 68821cb0..209e622e 100644 --- a/docs/CAVEATS.md +++ b/docs/CAVEATS.md @@ -3,12 +3,19 @@ This connector only writes using iceberg table V2 spec (delete events will be written to delete files instead of rewrite data files) ## No automatic schema evolution -Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting. +Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the +connector throw error. To work around this, turn off schema change event in `source` setting. - For SQL Server, set `debezium.source.include.schema.changes=false` ## Specific tables replication -By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnessesary iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate too many tables that you don't really want to. More on this setting in [Debezium server source](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-table-include-list). + +By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnecessary +iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, +set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate +too many tables that you don't really want to. More on this setting +in [Debezium server source](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-table-include-list) +. ## AWS S3 credentials You can setup aws credentials in the following ways: diff --git a/docs/DOCS.md b/docs/DOCS.md index f8d600a2..ea6f210e 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -6,66 +6,81 @@ Replicates database CDC events to Iceberg(Cloud storage, hdfs) without using Spa ## `iceberg` Consumer -Iceberg consumer replicates debezium CDC events to destination Iceberg tables. It is possible to replicate source database one to one or run it with append mode and keep all change events in iceberg table. When event and key schema -enabled (`debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true`) destination Iceberg +Iceberg consumer replicates database CDC events to destination Iceberg tables. It is possible to replicate source +database upsert or append modes. +When event and key schema enabled (`debezium.format.value.schemas.enable=true` +, `debezium.format.key.schemas.enable=true`) destination Iceberg tables created automatically with initial job. +#### Configuration properties + +| Config | Default | Description | +|----------------------------------------------------|-------------------|------------------------------------------------------------------------------------------------------------------| +| `debezium.sink.iceberg.warehouse` | | The root path of the Iceberg data warehouse | +| `debezium.sink.iceberg.catalog-name` | `default` | User-specified catalog name. | +| `debezium.sink.iceberg.table-namespace` | `default` | A namespace in the catalog. ex: `SELECT * FROM prod.db.table -- catalog: prod, namespace: db, table: table` | +| `debezium.sink.iceberg.table-prefix` | `` | Prefix added to destination iceberg table names. | +| `debezium.sink.iceberg.write.format.default` | `parquet` | Default file format for the table; `parquet`, `avro`, or `orc` | +| `debezium.sink.iceberg.allow-field-addition` | `true` | Allow field addition to target tables | +| `debezium.sink.iceberg.upsert` | `true` | Running upsert mode overwriting updated rows. explained below. | +| `debezium.sink.iceberg.upsert-keep-deletes` | `true` | With upsert mode, keeps deleted rows in target table. | +| `debezium.sink.iceberg.upsert-dedup-column` | `__source_ts_ms` | With upsert mode used to deduplicate data. row with highest `__source_ts_ms` kept. _ +dont change!_ | +| `debezium.sink.iceberg.upsert-op-column` | `__op` | Used with upsert mode. _dont +change!_ | +| `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination table. With this its possible to map `table_ptt1`,`table_ptt2` to `table_combined`. | +| `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp Replace part to modify destination table | +| `debezium.sink.batch.batch-size-wait` | `NoBatchSizeWait` | Batch size wait strategy to optimize data files and upload interval. explained below. | +| `debezium.sink.iceberg.{iceberg.prop.name}` | | [Iceberg config](https://iceberg.apache.org/docs/latest/configuration/) passed to Iceberg, and to hadoopConf | + ### Upsert -By default, Iceberg consumer is running with upsert mode `debezium.sink.iceberg.upsert=true`. -Upsert mode uses source Primary Key and does upsert on target table(delete followed by insert). For the tables without Primary Key consumer falls back to append mode. +By default, Iceberg consumer is running with upsert mode `debezium.sink.iceberg.upsert=true`. +Upsert mode uses source Primary Key and does upsert on target table(delete followed by insert). For the tables without +Primary Key consumer falls back to append mode. -#### Data Deduplication +#### Upsert Mode Data Deduplication -With upsert mode per batch data deduplication is done. Deduplication is done based on `__source_ts_ms` value and event type `__op`. -its is possible to change field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms`. Currently only -Long field type supported. +With upsert mode data deduplication is done. Deduplication is done based on `__source_ts_ms` value and event type `__op` +. +its is possible to change this field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms` (Currently only +Long field type supported.) Operation type priorities are `{"c":1, "r":2, "u":3, "d":4}`. When two records with same key and same `__source_ts_ms` -values received then the record with higher `__op` priority is kept and added to destination table and duplicate record is dropped. +values received then the record with higher `__op` priority is kept and added to destination table and duplicate record +is dropped from stream. ### Append -Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append. With append mode data deduplication is not done and all received records are appended to destination table. -Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode + +Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append. With append mode data deduplication +is not done and all received records are appended to destination table. +Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode. #### Keeping Deleted Records By default `debezium.sink.iceberg.upsert-keep-deletes=true` keeps deletes in the Iceberg table, setting it to false -will remove deleted records from the destination Iceberg table. With this config it's possible to keep last version of a -record in the destination Iceberg table(doing soft delete). +will remove deleted records from the destination Iceberg table too. With this config it's possible to keep last version +of a +record in the destination Iceberg table(doing soft delete for this records `__deleted` is set to `true`). ### Optimizing batch size (or commit interval) Debezium extracts database events in real time and this could cause too frequent commits or too many small files -which is not optimal for batch processing especially when near realtime data feed is sufficient. -To avoid this problem following batch-size-wait classes are used. - -Batch size wait adds delay between consumer calls to increase total number of events received per call and meanwhile events are collected in memory. -This setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties +which is not optimal for batch processing especially when near realtime data feed is sufficient. +To avoid this problem following batch-size-wait classes are available to adjust batch size and interval. +Batch size wait adds delay between consumer calls to increase total number of events consumed per call. Meanwhile, +events are collected in memory. +This setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` +debezium properties #### NoBatchSizeWait This is default configuration by default consumer will not use any wait. All the events are consumed immediately. -#### DynamicBatchSizeWait -**Deprecated** -This wait strategy dynamically adds wait to increase batch size. Wait duration is calculated based on number of processed events in -last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes -are bigger than 90% of `max.batch.size` Wait duration will decrease - -This strategy optimizes batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size. - -example setup to receive ~2048 events per commit. maximum wait is set to 5 seconds -```properties -debezium.source.max.queue.size=16000 -debezium.source.max.batch.size=2048 -debezium.sink.batch.batch-size-wait=DynamicBatchSizeWait -debezium.sink.batch.batch-size-wait.max-wait-ms=5000 -``` #### MaxBatchSizeWait -MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait. +MaxBatchSizeWait uses debezium metrics to optimize batch size. MaxBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`. Maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties. @@ -119,16 +134,21 @@ Read [application.properties.example](../debezium-server-iceberg-sink/src/main/r ## Schema Change Behaviour -It is possible to get out of sync schemas between source and target tables. Foexample when the source database change its schema, adds or drops field. Here we documented possible schema changes and current behavior of the Iceberg consumer. +It is possible to get out of sync schemas between source and target tables. For Example when the source database change +its schema, adds or drops field. Below possible schema changes and current behavior of the Iceberg consumer id +documented. #### Adding new column to source (A column missing in destination iceberg table) -Data of the new column is ignored till same column added to destination iceberg table -Dor example: if a column not found in iceberg table its data is dropped ignored and not copied to target! +When `debezium.sink.iceberg.allow-field-addition` is `false` data of the new column is ignored till the column added to +destination iceberg table. + +For example: if a column not found in iceberg table its data ignored and not copied to target! once iceberg table adds same column then data for this column recognized and populated #### Removing column from source (An extra column in iceberg table) -These columns are populated with null value + +These columns are populated with null value. #### Renaming column in source This is combination of above two cases : old column will be populated with null values and new column will not be recognized and populated till it's added to iceberg table @@ -143,17 +163,16 @@ If representation cannot be converted to a long (including structured types like ## `icebergevents` Consumer -This is second consumer in this project. This consumer appends CDC events to single Iceberg table as json string. -This table partitioned by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms` +This consumer appends CDC events to single Iceberg table as json string. +This table partitioned by `event_destination,event_sink_timestamptz` ~~WIP and sorted by `event_sink_epoch_ms`~~ -#### Example Configuration ````properties debezium.sink.type=icebergevents debezium.sink.iceberg.catalog-name=default ```` -Iceberg table definition: +Destination table definition: ```java static final String TABLE_NAME="debezium_events";