Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance documentation #61

Merged
merged 2 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,24 @@ This project adds iceberg consumer to [Debezium Server](https://debezium.io/docu
replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform.

More detail available in [documentation page](docs/DOCS.md)
Also, check [caveats](docs/CAVEATS.md) for better understanding the current limitation and proper workaround

![Debezium Iceberg](docs/images/debezium-iceberg.png)

# Install from source
- Requirements:
- JDK 11
- Maven
- Clone from repo: `git clone https://github.com/memiiso/debezium-server-iceberg.git`
- From the root of the project:
- Build and package debezium server: `mvn -Passembly -Dmaven.test.skip package`
- After building, unzip your server distribution: `unzip debezium-server-iceberg-dist/target/debezium-server-iceberg-dist*.zip -d appdist`
- cd into unzipped folder: `cd appdist`
- Create `application.properties` file and config it: `nano conf/application.properties`, you can check the example configuration in [application.properties.example](debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example)
- Run the server using provided script: `bash run.sh`

# Contributing
The Memiiso community welcomes anyone that wants to help out in any way, whether that includes reporting problems, helping with documentation, or contributing code changes to fix bugs, add tests, or implement new features. See [contributing document](CONTRIBUTE.md) for details.
The Memiiso community welcomes anyone that wants to help out in any way, whether that includes reporting problems, helping with documentation, or contributing code changes to fix bugs, add tests, or implement new features. See [contributing document](CONTRIBUTING.md) for details.

### Contributors
<a href="https://github.com/memiiso/debezium-server-iceberg/graphs/contributors">
Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,74 @@
# Use iceberg sink
debezium.sink.type=iceberg
debezium.sink.kinesis.region=eu-central-1
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.include.list=inventory

# configure batch behaviour/size
debezium.source.max.batch.size=2048
debezium.source.poll.interval.ms=10000 # 5 seconds!

# enable schemas

# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat

# Iceberg sink config
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=mycatalog
# Hadoop catalog, you can use other catalog supported by iceberg as well
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse
debezium.sink.iceberg.table-namespace=debeziumevents

# S3 config
debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
debezium.sink.iceberg.fs.s3a.access.key=AWS_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

# enable event schemas - mandate
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# debezium unwrap message
# postgres source
#debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
#debezium.source.offset.storage.file.filename=data/offsets.dat
#debezium.source.offset.flush.interval.ms=0
#debezium.source.database.hostname=localhost
#debezium.source.database.port=5432
#debezium.source.database.user=postgres
#debezium.source.database.password=postgres
#debezium.source.database.dbname=postgres
#debezium.source.database.server.name=tutorial
#debezium.source.schema.include.list=inventory

# sql server source
ismailsimsek marked this conversation as resolved.
Show resolved Hide resolved
#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
#debezium.source.offset.storage.file.filename=data/offsets.dat
#debezium.source.offset.flush.interval.ms=0
#debezium.source.database.hostname=localhost
#debezium.source.database.port=5432
#debezium.source.database.user=debezium
#debezium.source.database.password=debezium
#debezium.source.database.dbname=debezium
#debezium.source.database.server.name=tutorial
#debezium.source.schema.include.list=inventory
# mandate for sql server source, avoid error when snapshot and schema change
#debezium.source.include.schema.changes=false

# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,lsn,source.ts_ms,db
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# iceberg settings
debezium.sink.iceberg.upsert=false
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET
debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog

# set logging levels
# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.category."io.debezium.server.iceberg".level=DEBUG
quarkus.log.category."org.apache.hadoop".level=ERROR
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
quarkus.log.category."org.eclipse.jetty".level=WARN
quarkus.log.category."org.apache.iceberg".level=ERROR
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
17 changes: 17 additions & 0 deletions docs/CAVEATS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Caveats
## Only iceberg V2 table supported
This connector only writes using iceberg table V2 spec (delete events will be written to delete files instead of rewrite data files)

## No automatic schema evolution
Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you had a chance to test it? i think schema change events are not processed and should not cause any error. schema change events are saved to file if its configured with debezium.source.database.history and debezium.source.database.history.file.filename.

I believe we could remove this section:
Schema change events can make the connector throw error. To workaround this, turn off schema change event in source setting.

and link this page to give more details about current schema change behavior
Schema Change Behaviour : https://github.com/memiiso/debezium-server-iceberg/blob/master/docs/DOCS.md#schema-change-behaviour

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I found when using this with SQL Server, you can check the following log

2021-11-28 14:38:51,416 INFO  [io.deb.ser.ice.tab.IcebergTableOperatorUpsert] (pool-7-thread-1) Committed 2047 events to table! s3a://test-iceberg/iceberg_warehouse8/debeziumevents/debeziumcdc_tutorial_dbo_person
2021-11-28 14:38:51,538 WARN  [io.deb.ser.ice.IcebergChangeConsumer] (pool-7-thread-1) Table not found: debeziumevents.debeziumcdc_tutorial
2021-11-28 14:38:51,539 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) {"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"int64","optional":true,"field":"event_serial_no"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":true,"field":"databaseName"},{"type":"string","optional":true,"field":"schemaName"},{"type":"string","optional":true,"field":"ddl"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"field":"id"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"defaultCharsetName"},{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"primaryKeyColumnNames"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"jdbcType"},{"type":"int32","optional":true,"field":"nativeType"},{"type":"string","optional":false,"field":"typeName"},{"type":"string","optional":true,"field":"typeExpression"},{"type":"string","optional":true,"field":"charsetName"},{"type":"int32","optional":true,"field":"length"},{"type":"int32","optional":true,"field":"scale"},{"type":"int32","optional":false,"field":"position"},{"type":"boolean","optional":true,"field":"optional"},{"type":"boolean","optional":true,"field":"autoIncremented"},{"type":"boolean","optional":true,"field":"generated"}],"optional":false,"name":"io.debezium.connector.schema.Column"},"optional":false,"field":"columns"}],"optional":false,"name":"io.debezium.connector.schema.Table","field":"table"}],"optional":false,"name":"io.debezium.connector.schema.Change"},"optional":false,"field":"tableChanges"}],"optional":false,"name":"io.debezium.connector.sqlserver.SchemaChangeValue"}
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Converting Schema of: ::struct
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [1] .source::struct
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Converting Schema of: source::struct
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [2] source.version::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [3] source.connector::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [4] source.name::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [5] source.ts_ms::int64
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [6] source.snapshot::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [7] source.db::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [8] source.sequence::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [9] source.schema::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [10] source.table::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [11] source.change_lsn::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [12] source.commit_lsn::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [13] source.event_serial_no::int64
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [14] .databaseName::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [15] .schemaName::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [16] .ddl::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [17] .tableChanges::array
2021-11-28 14:38:51,541 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) Stopping down connector
2021-11-28 14:40:21,542 WARN  [io.deb.pip.ChangeEventSourceCoordinator] (pool-7-thread-1) Coordinator didn't stop in the expected time, shutting down executor now
2021-11-28 14:40:24,324 WARN  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Snapshot was interrupted before completion
2021-11-28 14:40:24,325 INFO  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Snapshot - Final stage
2021-11-28 14:40:24,325 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Removing locking timeout
2021-11-28 14:40:24,327 WARN  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Change event source executor was interrupted: java.lang.InterruptedException
        at java.base/java.lang.Object.wait(Native Method)
        at io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
        at io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
        at io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:446)
        at io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:176)
        at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:89)
        at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:49)
        at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:165)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:386)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:315)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:135)
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

2021-11-28 14:40:24,328 INFO  [io.deb.pip.met.StreamingChangeEventSourceMetrics] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Connected metrics set to 'false'
2021-11-28 14:40:24,329 INFO  [io.deb.jdb.JdbcConnection] (pool-14-thread-1) Connection gracefully closed
2021-11-28 14:40:24,330 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-7-thread-1) Stopped FileOffsetBackingStore
2021-11-28 14:40:24,331 INFO  [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: 'tableChanges' has Array type, Array type not supported!', error = '{}': java.lang.RuntimeException: 'tableChanges' has Array type, Array type not supported!
        at io.debezium.server.iceberg.IcebergUtil.getIcebergSchema(IcebergUtil.java:74)
        at io.debezium.server.iceberg.IcebergUtil.getIcebergSchema(IcebergUtil.java:35)
        at io.debezium.server.iceberg.IcebergUtil.getIcebergFieldsFromEventSchema(IcebergUtil.java:199)
        at io.debezium.server.iceberg.IcebergChangeConsumer.createIcebergTable(IcebergChangeConsumer.java:199)
        at io.debezium.server.iceberg.IcebergChangeConsumer.lambda$handleBatch$2(IcebergChangeConsumer.java:159)
        at java.base/java.util.Optional.orElseGet(Optional.java:369)
        at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:159)
        at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
        at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:821)
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
        at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:145)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

2021-11-28 14:40:24,348 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2021-11-28 14:40:24,348 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2021-11-28 14:40:24,367 INFO  [io.quarkus] (main) debezium-server-iceberg-dist stopped in 0.035s

This is the config when I didn't turn off schema change capture for SQL Server

# sql server source
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=52.221.232.4
debezium.source.database.port=1433
debezium.source.database.user=debezium
debezium.source.database.password=debezium
debezium.source.database.dbname=dms_sample
debezium.source.database.server.name=tutorial
debezium.source.table.include.list=dbo.person
# mandate for sql server source, avoid error when snapshot and schema change
#debezium.source.include.schema.changes=false

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like consumer is trying to create table debeziumevents.debeziumcdc_tutorial to store schema changes. but its failing because field 'tableChanges' is in Array type. Currently iceberg consumer is not supporting Array data type that's why exception thrown.

documentation

not sure what is the best way to explain it. maybe something like
Schema change topic has unsupported data type Array, its recommended to disable it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like even with array data type support its not possible to save Schema change topic . seems like tableChanges has kind of special type. it makes sense to recommend disable it for all connectors.

its failing with

Cannot deserialize value of type `java.util.LinkedHashMap<java.lang.Object,java.lang.Object>` from Array value (token `JsonToken.START_ARRAY`)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I also checked iceberg document when I found that error and decided to avoid that by turn off schema change event


- For SQL Server, set `debezium.source.include.schema.changes=false`

## Specific tables replication
By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnessesary iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate too many tables that you don't really want to. More on this setting in [Debezium server source](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-table-include-list).

## AWS S3 credentials
You can setup aws credentials in the following ways:
- Option 1: use `debezium.sink.iceberg.fs.s3a.access.key` and `debezium.sink.iceberg.fs.s3a.secret.key` in `application.properties`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we highlight that is possible to pass any iceberg configuration using debezium.sink.iceberg.<my.iceberg.config>=xyz format? .it could be useful to know there are many iceberg configs and its possible to set them with application.properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- Option 2: inject credentials to environment variables `AWS_ACCESS_KEY` and `AWS_SECRET_ACCESS_KEY`
- Option 3: setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3).
29 changes: 3 additions & 26 deletions docs/DOCS.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc
debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.max.batch.size=2048");
debezium.source.max.queue.size=16000");
debezium.source.max.batch.size=2048;
debezium.source.max.queue.size=16000";
debezium.sink.batch.batch-size-wait.max-wait-ms=30000
debezium.sink.batch.batch-size-wait.wait-interval-ms=5000
```
Expand Down Expand Up @@ -115,30 +115,7 @@ debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg!
```

### Example Configuration

```properties
debezium.sink.type=iceberg
# run with append mode
debezium.sink.iceberg.upsert=false
debezium.sink.iceberg.upsert-keep-deletes=true
# iceberg
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.table-namespace=debeziumevents
debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET);
debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.catalog-name=mycatalog
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.value=json
# complex nested data types are not supported, do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
```
Read [application.properties.example](../debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example)

## Schema Change Behaviour

Expand Down