diff --git a/README.md b/README.md index 773e6e9d..65e41c98 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Also, check [caveats](docs/CAVEATS.md) for better understanding the current limi - cd into unzipped folder: `cd appdist` - Create `application.properties` file and config it: `nano conf/application.properties`, you can check the example configuration - in [application.properties.example](debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example) + in [application.properties.example](debezium-server-iceberg-dist%2Fsrc%2Fmain%2Fresources%2Fdistro%2Fconf%2Fapplication.properties.example) - Run the server using provided script: `bash run.sh` # Debezium python runner diff --git a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example index 3344c46c..aa84ecad 100644 --- a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example +++ b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example @@ -54,8 +54,10 @@ debezium.format.value=json debezium.format.key=json # saving debezium state data to destination, iceberg tables +# see https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table +# see https://debezium.io/documentation/reference/stable/development/engine.html#database-history-properties debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 5e5d73c3..509b754a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -143,7 +143,7 @@ public void handleBatch(List> records, DebeziumEngin throws InterruptedException { Instant start = Instant.now(); - //group events by destination + //group events by destination (per iceberg table) Map> result = records.stream() .map((ChangeEvent e) @@ -167,7 +167,7 @@ public void handleBatch(List> records, DebeziumEngin icebergTableOperator.addToTable(icebergTable, tableEvents.getValue()); } - // workaround! somehow offset is not saved to file unless we call committer.markProcessed + // workaround! somehow offset is not saved to file unless we call committer.markProcessed per event // even it's should be saved to file periodically for (ChangeEvent record : records) { LOGGER.trace("Processed event '{}'", record); @@ -176,13 +176,14 @@ public void handleBatch(List> records, DebeziumEngin committer.markBatchFinished(); this.logConsumerProgress(records.size()); + // waiting to group events as bathes batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); } /** * @param tableId iceberg table identifier * @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found - * @return iceberg table, throws RuntimeException when table not found and it's not possible to create it + * @return iceberg table, throws RuntimeException when table not found, and it's not possible to create it */ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sampleEvent) { return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> { @@ -200,7 +201,9 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample } /** - * @param numUploadedEvents periodically log number of events consumed + * periodically log number of events consumed + * + * @param numUploadedEvents number of events consumed */ protected void logConsumerProgress(long numUploadedEvents) { numConsumedEvents += numUploadedEvents; diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 5a4ff956..eb468f43 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -26,6 +26,9 @@ import java.util.concurrent.atomic.AtomicReference; /** + * + * Converts iceberg json event to Iceberg GenericRecord. Extracts event schema and key fields. Converts event schema to Iceberg Schema. + * * @author Ismail Simsek */ public class IcebergChangeEvent { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java index 60d328f0..46c904d9 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java @@ -9,7 +9,7 @@ package io.debezium.server.iceberg.batchsizewait; /** - * Implementation of the consumer that delivers the messages to iceberg tables. + * When enabled dds waiting to the consumer to control batch size. I will turn the processing to batch processing. * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java index 549f7c9c..a4c4573f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java @@ -53,7 +53,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; /** - * A {@link SchemaHistory} implementation that stores the schema history to database table + * A {@link SchemaHistory} implementation that stores the schema history to Iceberg table * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index f9d945af..8e37e46c 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -55,7 +55,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; /** - * Implementation of OffsetBackingStore that saves data to database table. + * Implementation of OffsetBackingStore that saves data to Iceberg table. */ @Dependent public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implements OffsetBackingStore { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 767c235c..b9f07a3a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory; /** - * Wrapper to perform operations in iceberg tables + * Wrapper to perform operations on iceberg tables * * @author Rafael Acevedo */ diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java index c936be57..ca24631f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java @@ -17,6 +17,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Iceberg Table Writer Factory to get TaskWriter for the table. upsert modes used to return correct writer. + * + */ @Dependent public class IcebergTableWriterFactory { private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class); diff --git a/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example b/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example deleted file mode 100644 index 0733217e..00000000 --- a/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example +++ /dev/null @@ -1,75 +0,0 @@ -# Use iceberg sink -debezium.sink.type=iceberg - -# Run without Kafka, use local file to store checkpoints -debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory -debezium.source.database.history.file.filename=data/status.dat - -# Iceberg sink config -debezium.sink.iceberg.table-prefix=debeziumcdc_ -debezium.sink.iceberg.upsert=true -debezium.sink.iceberg.upsert-keep-deletes=true -debezium.sink.iceberg.write.format.default=parquet -debezium.sink.iceberg.catalog-name=mycatalog -# Hadoop catalog, you can use other catalog supported by iceberg as well -debezium.sink.iceberg.type=hadoop -debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse -debezium.sink.iceberg.table-namespace=debeziumevents - -# S3 config -debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket -debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true -debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true -debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain -debezium.sink.iceberg.fs.s3a.access.key=AWS_ACCESS_KEY -debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY -debezium.sink.iceberg.fs.s3a.path.style.access=true -debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem - -# enable event schemas - mandate -debezium.format.value.schemas.enable=true -debezium.format.key.schemas.enable=true -debezium.format.value=json -debezium.format.key=json - -# postgres source -#debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector -#debezium.source.offset.storage.file.filename=data/offsets.dat -#debezium.source.offset.flush.interval.ms=0 -#debezium.source.database.hostname=localhost -#debezium.source.database.port=5432 -#debezium.source.database.user=postgres -#debezium.source.database.password=postgres -#debezium.source.database.dbname=postgres -#debezium.source.database.server.name=tutorial -#debezium.source.schema.include.list=inventory - -# sql server source -#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector -#debezium.source.offset.storage.file.filename=data/offsets.dat -#debezium.source.offset.flush.interval.ms=0 -#debezium.source.database.hostname=localhost -#debezium.source.database.port=5432 -#debezium.source.database.user=debezium -#debezium.source.database.password=debezium -#debezium.source.database.dbname=debezium -#debezium.source.database.server.name=tutorial -#debezium.source.schema.include.list=inventory -# mandate for sql server source, avoid error when snapshot and schema change -#debezium.source.include.schema.changes=false - -# do event flattening. unwrap message! -debezium.transforms=unwrap -debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState -debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db -debezium.transforms.unwrap.delete.handling.mode=rewrite -debezium.transforms.unwrap.drop.tombstones=true - -# ############ SET LOG LEVELS ############ -quarkus.log.level=INFO -quarkus.log.console.json=false -# hadoop, parquet -quarkus.log.category."org.apache.hadoop".level=WARN -quarkus.log.category."org.apache.parquet".level=WARN -# Ignore messages below warning level from Jetty, because it's a bit verbose -quarkus.log.category."org.eclipse.jetty".level=WARN \ No newline at end of file diff --git a/docs/CAVEATS.md b/docs/CAVEATS.md index 209e622e..c09ca5da 100644 --- a/docs/CAVEATS.md +++ b/docs/CAVEATS.md @@ -1,12 +1,14 @@ # Caveats + ## Only iceberg V2 table supported -This connector only writes using iceberg table V2 spec (delete events will be written to delete files instead of rewrite data files) + +This connector only writes using iceberg table V2 spec (delete events will be written to delete files(merge on read) +instead of rewrite data files) ## No automatic schema evolution -Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the -connector throw error. To work around this, turn off schema change event in `source` setting. -- For SQL Server, set `debezium.source.include.schema.changes=false` +Full schema evaluation is not supported. But sema expansion like field addition is supported, +see `debezium.sink.iceberg.allow-field-addition` setting. ## Specific tables replication @@ -18,7 +20,11 @@ in [Debezium server source](https://debezium.io/documentation/reference/stable/c . ## AWS S3 credentials + You can setup aws credentials in the following ways: -- Option 1: use `debezium.sink.iceberg.fs.s3a.access.key` and `debezium.sink.iceberg.fs.s3a.secret.key` in `application.properties` + +- Option 1: use `debezium.sink.iceberg.fs.s3a.access.key` and `debezium.sink.iceberg.fs.s3a.secret.key` + in `application.properties` - Option 2: inject credentials to environment variables `AWS_ACCESS_KEY` and `AWS_SECRET_ACCESS_KEY` -- Option 3: setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3). \ No newline at end of file +- Option 3: setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be + found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3). \ No newline at end of file diff --git a/docs/DOCS.md b/docs/DOCS.md index e30914ef..b11342ff 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -1,37 +1,40 @@ # Debezium Iceberg Consumers -Replicates database CDC events to Iceberg(Cloud storage, hdfs) without using Spark, Kafka or Streaming platform. +Replicates database CDC events to Iceberg tables(Cloud storage, hdfs) without using Spark, Kafka or Streaming platform +in between. ![Debezium Iceberg](images/debezium-iceberg.png) ## `iceberg` Consumer Iceberg consumer replicates database CDC events to destination Iceberg tables. It is possible to replicate source -database upsert or append modes. +data with upsert or append modes. When event and key schema enabled (`debezium.format.value.schemas.enable=true` , `debezium.format.key.schemas.enable=true`) destination Iceberg -tables created automatically with initial job. +tables created automatically with the first start. #### Configuration properties -| Config | Default | Description | -|----------------------------------------------------|-------------------|------------------------------------------------------------------------------------------------------------------| -| `debezium.sink.iceberg.warehouse` | | The root path of the Iceberg data warehouse | -| `debezium.sink.iceberg.catalog-name` | `default` | User-specified catalog name. | -| `debezium.sink.iceberg.table-namespace` | `default` | A namespace in the catalog. ex: `SELECT * FROM prod.db.table -- catalog: prod, namespace: db, table: table` | -| `debezium.sink.iceberg.table-prefix` | `` | Prefix added to destination iceberg table names. | -| `debezium.sink.iceberg.write.format.default` | `parquet` | Default file format for the table; `parquet`, `avro`, or `orc` | -| `debezium.sink.iceberg.allow-field-addition` | `true` | Allow field addition to target tables | -| `debezium.sink.iceberg.upsert` | `true` | Running upsert mode overwriting updated rows. explained below. | -| `debezium.sink.iceberg.upsert-keep-deletes` | `true` | With upsert mode, keeps deleted rows in target table. | -| `debezium.sink.iceberg.upsert-dedup-column` | `__source_ts_ms` | With upsert mode used to deduplicate data. row with highest `__source_ts_ms` kept. _ -dont change!_ | -| `debezium.sink.iceberg.upsert-op-column` | `__op` | Used with upsert mode. _dont -change!_ | -| `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination table. With this its possible to map `table_ptt1`,`table_ptt2` to `table_combined`. | -| `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp Replace part to modify destination table | -| `debezium.sink.batch.batch-size-wait` | `NoBatchSizeWait` | Batch size wait strategy to optimize data files and upload interval. explained below. | -| `debezium.sink.iceberg.{iceberg.prop.name}` | | [Iceberg config](https://iceberg.apache.org/docs/latest/configuration/) passed to Iceberg, and to hadoopConf | +| Config | Default | Description | +|------------------------------------------------------------------------------------|---------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `debezium.sink.iceberg.warehouse` | | Root path of the Iceberg data warehouse | +| `debezium.sink.iceberg.catalog-name` | `default` | User-specified Iceberg catalog name. | +| `debezium.sink.iceberg.table-namespace` | `default` | A namespace in the catalog. ex: `SELECT * FROM prod.db.table -- catalog: prod, namespace: db, table: table` | +| `debezium.sink.iceberg.table-prefix` | `` | Iceberg table name prefix, Added to destination iceberg table names. | +| `debezium.sink.iceberg.write.format.default` | `parquet` | Default file format for the table; `parquet`, `avro`, or `orc` | +| `debezium.sink.iceberg.allow-field-addition` | `true` | Allow field addition to target tables | +| `debezium.sink.iceberg.upsert` | `true` | Running upsert mode overwriting updated rows. explained below. | +| `debezium.sink.iceberg.upsert-keep-deletes` | `true` | When running with upsert mode, keeps deleted rows in target table. | +| `debezium.sink.iceberg.upsert-dedup-column` | `__source_ts_ms` | With upsert mode used to deduplicate data. row with highest `__source_ts_ms` kept. _dont change!_ | +| `debezium.sink.iceberg.upsert-op-column` | `__op` | Used with upsert mode. _dont change!_ | +| `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination table. With this its possible to map `table_ptt1`,`table_ptt2` to `table_combined`. | +| `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp Replace part to modify destination table | +| `debezium.sink.batch.batch-size-wait` | `NoBatchSizeWait` | Batch size wait strategy to optimize data files and upload interval. explained below. | +| `debezium.sink.iceberg.{iceberg.prop.name}` | | [Iceberg config](https://iceberg.apache.org/docs/latest/configuration/) passed to Iceberg | +| `debezium.source.offset.storage` | `io.debezium.server.iceberg.offset.IcebergOffsetBackingStore` | The name of the Java class that is responsible for persistence of connector offsets. see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming) | +| `debezium.source.offset.storage.iceberg.table-name` | `debezium_offset_storage` | Destination table name to store connector offsets. | +| `debezium.source.schema.history.internal` | `io.debezium.server.iceberg.history.IcebergSchemaHistory` | The name of the Java class that is responsible for persistence of the database schema history. see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#database-history-properties) | +| `debezium.source.schema.history.internal.iceberg.table-name` | `debezium_schema_history_storage` | Destination table name to store database schema history. | ### Upsert @@ -48,25 +51,25 @@ Long field type supported.) Operation type priorities are `{"c":1, "r":2, "u":3, "d":4}`. When two records with same key and same `__source_ts_ms` values received then the record with higher `__op` priority is kept and added to destination table and duplicate record -is dropped from stream. +is dropped from the batch. ### Append Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append. With append mode data deduplication is not done and all received records are appended to destination table. -Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode. +Note: For the tables without primary key operation mode falls back to append even upsert mode is used. #### Keeping Deleted Records By default `debezium.sink.iceberg.upsert-keep-deletes=true` keeps deletes in the Iceberg table, setting it to false will remove deleted records from the destination Iceberg table too. With this config it's possible to keep last version of a -record in the destination Iceberg table(doing soft delete for this records `__deleted` is set to `true`). +deleted record in the destination Iceberg table(doing soft delete for this records `__deleted` is set to `true`). ### Optimizing batch size (or commit interval) -Debezium extracts database events in real time and this could cause too frequent commits or too many small files -which is not optimal for batch processing especially when near realtime data feed is sufficient. +Debezium extracts database events in real time and this could cause too frequent commits and too many small files. Which +is not optimal for batch processing especially when near realtime data feed is sufficient. To avoid this problem following batch-size-wait classes are available to adjust batch size and interval. Batch size wait adds delay between consumer calls to increase total number of events consumed per call. Meanwhile, @@ -81,10 +84,13 @@ This is default configuration by default consumer will not use any wait. All the #### MaxBatchSizeWait MaxBatchSizeWait uses debezium metrics to optimize batch size. -MaxBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`. -Maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties. +MaxBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`. +Maximum wait and check intervals are controlled +by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties. + +example setup to receive 2048 events per commit. maximum wait is set to 30 seconds, streaming queue current size checked +every 5 seconds -example setup to receive 2048 events per commit. maximum wait is set to 30 seconds, streaming queue current size checked every 5 seconds ```properties debezium.sink.batch.batch-size-wait=MaxBatchSizeWait debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector @@ -110,7 +116,7 @@ With above config database table = `inventory.customers` is replicated to `defau ## IcebergOffsetBackingStore Offset Storage -This implementation saves CDC offset to an iceberg table. +This implementation saves CDC offset to an iceberg table. Debezium keeps source offset to track binlog position. ``` debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore @@ -128,7 +134,7 @@ debezium.source.database.history.iceberg.table-name=debezium_database_history_st ## Debezium Event Flattening -Iceberg consumer requires event flattening. +Iceberg consumer requires event flattening. This is required/mandatory configuration. For further details on `Message transformations` please see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#engine-message-transformations) ```properties debezium.transforms=unwrap @@ -138,7 +144,7 @@ debezium.transforms.unwrap.add.headers=db debezium.transforms.unwrap.delete.handling.mode=rewrite ``` -### Configuring iceberg +### Configuring iceberg All the properties starting with `debezium.sink.iceberg.__ICEBERG_CONFIG__` are passed to Iceberg, and to hadoopConf @@ -147,6 +153,7 @@ debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg! ``` ### Example Configuration + Read [application.properties.example](..%2Fdebezium-server-iceberg-dist%2Fsrc%2Fmain%2Fresources%2Fdistro%2Fconf%2Fapplication.properties.example) ## Schema Change Behaviour @@ -155,25 +162,34 @@ It is possible to get out of sync schemas between source and target tables. For its schema, adds or drops field. Below possible schema changes and current behavior of the Iceberg consumer id documented. +**NOTE**: Full schema evaluation is not supported. But sema expansion like field addition is supported, +see `debezium.sink.iceberg.allow-field-addition` setting. + #### Adding new column to source (A column missing in destination iceberg table) When `debezium.sink.iceberg.allow-field-addition` is `false` data of the new column is ignored till the column added to destination iceberg table. For example: if a column not found in iceberg table its data ignored and not copied to target! -once iceberg table adds same column then data for this column recognized and populated +once iceberg table adds same column then data for this column recognized and populated for the new data. #### Removing column from source (An extra column in iceberg table) -These columns are populated with null value. +These column values are populated with null value for the new data. #### Renaming column in source -This is combination of above two cases : old column will be populated with null values and new column will not be recognized and populated till it's added to iceberg table + +This is combination of above two cases : old column will be populated with null values and new column will not be +recognized and populated till it's added to iceberg table #### Different Data Types -This is the scenario when source field type and Target Iceberg field type are different. In this case consumer converts source field value to destination type value. Conversion is done by jackson If representation cannot be converted to destination type then default value is returned! -for example this is conversion rule to Long type: +This is the scenario when source field type and target Iceberg field type are different. In this case consumer converts +source field value to destination type value. Conversion is done by jackson If representation cannot be converted to +destination type then default value is returned! + +for example this is conversion rule for Long type: + ```Method that will try to convert value of this node to a Java long. Numbers are coerced using default Java rules; booleans convert to 0 (false) and 1 (true), and Strings are parsed using default Java language integer parsing rules. If representation cannot be converted to a long (including structured types like Objects and Arrays), default value of 0 will be returned; no exceptions are thrown. ``` @@ -181,8 +197,7 @@ If representation cannot be converted to a long (including structured types like ## `icebergevents` Consumer This consumer appends CDC events to single Iceberg table as json string. -This table partitioned by `event_destination,event_sink_timestamptz` ~~WIP and sorted by `event_sink_epoch_ms`~~ - +This table partitioned by `event_destination,event_sink_timestamptz` ````properties debezium.sink.type=icebergevents @@ -192,19 +207,19 @@ debezium.sink.iceberg.catalog-name=default Destination table definition: ```java -static final String TABLE_NAME="debezium_events"; +static final String TABLE_NAME = "debezium_events"; static final Schema TABLE_SCHEMA = new Schema( - required(1, "event_destination", Types.StringType.get()), - optional(2, "event_key", Types.StringType.get()), - optional(3, "event_value", Types.StringType.get()), - optional(4, "event_sink_epoch_ms", Types.LongType.get()), - optional(5, "event_sink_timestamptz", Types.TimestampType.withZone()) - ); + required(1, "event_destination", Types.StringType.get()), + optional(2, "event_key", Types.StringType.get()), + optional(3, "event_value", Types.StringType.get()), + optional(4, "event_sink_epoch_ms", Types.LongType.get()), + optional(5, "event_sink_timestamptz", Types.TimestampType.withZone()) +); static final PartitionSpec TABLE_PARTITION = PartitionSpec.builderFor(TABLE_SCHEMA) - .identity("event_destination") - .hour("event_sink_timestamptz") - .build(); + .identity("event_destination") + .hour("event_sink_timestamptz") + .build(); static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA) - .asc("event_sink_epoch_ms", NullOrder.NULLS_LAST) - .build(); + .asc("event_sink_epoch_ms", NullOrder.NULLS_LAST) + .build(); ```