diff --git a/README.md b/README.md index 54ea9f1c..f4144ed3 100644 --- a/README.md +++ b/README.md @@ -1,52 +1,92 @@ ![Java CI with Maven](https://github.com/memiiso/debezium-server-iceberg/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master) -# Any contribution is welcome - -This is new project and there are many thins to improve, please feel free to send PR or open bug,feature requests - # Debezium Iceberg Consumers ------ -This project adds iceberg batch consumers -to [debezium server](https://debezium.io/documentation/reference/operations/debezium-server.html). it could be used to -replicate database changes to iceberg table, without requiring Spark, Kafka or Streaming platform. +This project adds iceberg consumer to [debezium server application](https://debezium.io/documentation/reference/operations/debezium-server.html). it could be used to +replicate database changes to iceberg table(Cloud storage, hdfs) without requiring Spark, Kafka or Streaming platform. ## `iceberg` Consumer -Appends debezium events to destination iceberg tables. When event and key schemas +Iceberg consumer appends or upserts debezium events to destination iceberg tables. When event and key schemas enabled (`debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true`) destination iceberg -tables created automatically. When destination table is not exists Consumer will print warning message and continue with -replication of other tables. +tables created automatically. ### Upsert -By default (`debezium.sink.iceberg.upsert=true`) upsert feature enabled, for tables with Primary Key definition consumer -will do upsert, for the tables without Primary Key consumer will falls back to append mode - -Setting `debezium.sink.iceberg.upsert=false` will change insert mode to append. +By default, iceberg consumer is running with upsert mode `debezium.sink.iceberg.upsert=true`. +for the tables with Primary Key definition consumer does upsert, for the tables without Primary Key consumer falls back to append mode #### Data Deduplication -When iceberg consumer is doing upsert it does data deduplication for the batch, deduplication is done based -on `__source_ts_ms` field and event type `__op` -its is possible to change field using `debezium.sink.iceberg.upsert-source-ts-ms-column=__source_ts_ms`, Currently only +With upsert mode data deduplication is done per batch, deduplication is done based on `__source_ts_ms` value and event type `__op` +its is possible to change field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms`, Currently only Long field type supported -operation type priorities are `{"c":1, "r":2, "u":3, "d":4}` when two record with same Key having same `__source_ts_ms` -values then the record with higher `__op` priority is kept. +operation type priorities are `{"c":1, "r":2, "u":3, "d":4}` when two records with same Key and same `__source_ts_ms` +values received then the record with higher `__op` priority is added to destination table and duplicate record is dropped. + +### Append +Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append, with append mode data deduplication is not done, all received records are appended to destination table +Note: For the tables without primary key operation mode is append even configuration is set to upsert mode #### Keeping Deleted Records By default `debezium.sink.iceberg.upsert-keep-deletes=true` will keep deletes in the iceberg table, setting it to false -will remove deleted records from the iceberg table. With this config its possible to keep last version of the deleted -record. +will remove deleted records from the destination iceberg table. With this config its possible to keep last version of the deleted +record in the table(to do soft deletes). + +### Optimizing batch size (or commit interval) + +Debezium extracts database events in real time and this could cause too frequent commits or too many small files +which is not optimal for batch processing especially when near realtime data feed is sufficient. +To avoid this problem following batch-size-wait classes are used. + +Batch size wait adds delay between consumer calls to increase total number of events received per call and meanwhile events are collected in memory +this should be configured with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties + + +#### NoBatchSizeWait + +This is default configuration by default consumer will not use any batch size wait + +#### DynamicBatchSizeWait + +This wait strategy dynamically adds wait to increase batch size. Wait duration is calculated based on number of processed events in +last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes +are bigger than 90% of `max.batch.size` Wait duration will decrease + +This strategy tries to keep batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size. + +example setup to receive ~2048 events per commit. maximum wait is set to 5 seconds +```properties +debezium.source.max.queue.size=16000 +debezium.source.max.batch.size=2048 +debezium.sink.batch.batch-size-wait=DynamicBatchSizeWait +debezium.sink.batch.batch-size-wait.max-wait-ms=5000 +``` +#### MaxBatchSizeWait + +MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait +DynamicBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size` +maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties + +example setup to receive ~2048 events per commit. maximum wait is set to 30 seconds, streaming queue current size checked every 5 seconds +```properties +debezium.sink.batch.batch-size-wait=MaxBatchSizeWait +debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc +debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc +debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +debezium.source.max.batch.size=2048"); +debezium.source.max.queue.size=16000"); +debezium.sink.batch.batch-size-wait.max-wait-ms=30000 +debezium.sink.batch.batch-size-wait.wait-interval-ms=5000 +``` -### Iceberg Table Names +### Destination Iceberg Table Names -iceberg table names created by following rule : `table-namespace` -.`table-prefix``database.server.name`_`database`_`table` +iceberg table names created by following rule : `table-namespace`.`table-prefix``database.server.name`_`database`_`table` -For example +For example with following config ```properties debezium.sink.iceberg.table-namespace=default @@ -56,20 +96,35 @@ debezium.sink.iceberg.table-prefix=cdc_ database table = `inventory.customers` will be replicated to `default.testc_cdc_inventory_customers` -### Optimizing batch size (or commit interval) +## Debezium Event Flattening + +Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported + +```properties +debezium.transforms=unwrap +debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState +debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db +debezium.transforms.unwrap.add.headers=db +debezium.transforms.unwrap.delete.handling.mode=rewrite +``` + +### Configuring iceberg -@TODO dynamic wait +All the properties starting with `debezium.sink.iceberg.__ICEBERG_CONFIG__` are passed to iceberg, and to hadoopConf + +```properties +debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg! +``` ### Example Configuration ```properties debezium.sink.type=iceberg debezium.sink.iceberg.table-prefix=debeziumcdc_ -debezium.sink.iceberg.table-namespace=default -debezium.sink.iceberg.catalog-name=default +debezium.sink.iceberg.catalog-name=mycatalog +debezium.sink.iceberg.table-namespace=debeziumevents debezium.sink.iceberg.fs.defaultFS=s3a://MY_S3_BUCKET debezium.sink.iceberg.warehouse=s3a://MY_S3_BUCKET/iceberg_warehouse -debezium.sink.iceberg.user.timezone=UTC debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain @@ -78,86 +133,40 @@ debezium.sink.iceberg.fs.s3a.secret.key=S3_SECRET_KEY debezium.sink.iceberg.fs.s3a.path.style.access=true debezium.sink.iceberg.fs.s3a.endpoint=http://localhost:9000 # minio specific setting debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem -debezium.sink.iceberg.dynamic-wait=true -debezium.sink.batch.dynamic-wait.max-wait-ms=300000 debezium.sink.iceberg.upsert=true debezium.sink.iceberg.upsert-keep-deletes=true -debezium.sink.iceberg.upsert-op-column=__op -debezium.sink.iceberg.upsert-source-ts-ms-column=__source_ts_ms debezium.format.value.schemas.enable=true debezium.format.key.schemas.enable=true ``` -All the properties starting with `debezium.sink.iceberg.**` are passed to iceberg, and hadoopConf - -```properties -debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg! -``` - ## `icebergevents` Consumer -This consumer appends CDC events to single iceberg table as json string. This table partitioned -by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms` +This is second consumer developed with this project, This consumer appends CDC events to single iceberg table as json string. +This table partitioned by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms` #### Example Configuration ````properties debezium.sink.type=icebergevents debezium.sink.iceberg.catalog-name=default -debezium.sink.iceberg.dynamic-wait=true -debezium.sink.batch.dynamic-wait.max-wait-ms=300000 ```` Iceberg table definition: ```java static final String TABLE_NAME="debezium_events"; -static final Schema TABLE_SCHEMA=new Schema( - required(1,"event_destination",Types.StringType.get()), - optional(2,"event_key",Types.StringType.get()), - optional(3,"event_value",Types.StringType.get()), - optional(4,"event_sink_epoch_ms",Types.LongType.get()), - optional(5,"event_sink_timestamptz",Types.TimestampType.withZone()) +static final Schema TABLE_SCHEMA = new Schema( + required(1, "event_destination", Types.StringType.get()), + optional(2, "event_key", Types.StringType.get()), + optional(3, "event_value", Types.StringType.get()), + optional(4, "event_sink_epoch_ms", Types.LongType.get()), + optional(5, "event_sink_timestamptz", Types.TimestampType.withZone()) ); -static final PartitionSpec TABLE_PARTITION=PartitionSpec.builderFor(TABLE_SCHEMA) +static final PartitionSpec TABLE_PARTITION = PartitionSpec.builderFor(TABLE_SCHEMA) .identity("event_destination") .hour("event_sink_timestamptz") .build(); -static final SortOrder TABLE_SORT_ORDER=SortOrder.builderFor(TABLE_SCHEMA) - .asc("event_sink_epoch_ms",NullOrder.NULLS_LAST) +static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA) + .asc("event_sink_epoch_ms", NullOrder.NULLS_LAST) .build(); ``` - -## Debezium Event Flattening - -Iceberg consumer requires event flattening, Currently nested events and complex data types(like maps) are not supported - -```properties -debezium.transforms=unwrap -debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState -debezium.transforms.unwrap.add.fields=op,table,lsn,source.ts_ms -debezium.transforms.unwrap.add.headers=db -debezium.transforms.unwrap.delete.handling.mode=rewrite -``` - -## Controlling Batch Size - -`max.batch.size` Positive integer value that specifies the maximum size of each batch of events that should be processed -during each iteration of this connector. Defaults to 2048. - -`debezium.sink.batch.dynamic-wait.max-wait-ms` Positive integer value that specifies the maximum number of milliseconds -dynamic wait could add delay to increase batch size. dynamic wait is calculated based on number of processed events in -last 3 batches. if last batch sizes are lower than `max.batch.size` max-wait-ms will increase and if last batch sizes -are bigger than 90% of `max.batch.size` max-wait-ms will decrease - -it tries to keep batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size. - -Change `debezium.source.max.batch.size` and `debezium.sink.batch.dynamic-wait.max-wait-ms` if you want to have less -frequent commits with larger batch size - -```properties -# Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048. -debezium.source.max.batch.size=2 -debezium.sink.iceberg.dynamic-wait=true -debezium.sink.batch.dynamic-wait.max-wait-ms=300000 -``` \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 2143e655..b06a0050 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -88,7 +88,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu boolean upsertKeepDeletes; @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") String opColumn; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-source-ts-ms-column", defaultValue = "__source_ts_ms") + @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") String sourceTsMsColumn; @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index 593324d1..df8c35be 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -37,6 +37,8 @@ public ConfigSource() { config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds! // iceberg config.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_"); + config.put("debezium.sink.iceberg.catalog-name", "mycatalog"); + config.put("debezium.sink.iceberg.table-namespace", "debeziumevents"); config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET); config.put("debezium.sink.iceberg.warehouse", "s3a://" + S3_BUCKET + "/iceberg_warehouse"); config.put("debezium.sink.iceberg.type", "hadoop"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java similarity index 90% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java index 9c888b2e..77f092d6 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java @@ -32,8 +32,8 @@ @QuarkusTest @QuarkusTestResource(S3Minio.class) @QuarkusTestResource(SourceMysqlDB.class) -@TestProfile(BatchSparkChangeConsumerMysqlTestProfile.class) -public class BatchSparkChangeConsumerMysqlTest extends BaseSparkTest { +@TestProfile(IcebergChangeConsumerMysqlTestProfile.class) +public class IcebergChangeConsumerMysqlTest extends BaseSparkTest { @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000") @@ -51,6 +51,8 @@ public void testSimpleUpload() { return false; } }); + + S3Minio.listFiles(); } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java similarity index 89% rename from debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java rename to debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java index b1107adf..a951bf99 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/BatchSparkChangeConsumerMysqlTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTestProfile.java @@ -13,7 +13,7 @@ import java.util.HashMap; import java.util.Map; -public class BatchSparkChangeConsumerMysqlTestProfile implements QuarkusTestProfile { +public class IcebergChangeConsumerMysqlTestProfile implements QuarkusTestProfile { //This method allows us to override configuration properties. @Override