-
Notifications
You must be signed in to change notification settings - Fork 35
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
4 changed files
with
98 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
97 changes: 64 additions & 33 deletions
97
debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,43 +1,74 @@ | ||
# Use iceberg sink | ||
debezium.sink.type=iceberg | ||
debezium.sink.kinesis.region=eu-central-1 | ||
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector | ||
debezium.source.offset.storage.file.filename=data/offsets.dat | ||
debezium.source.offset.flush.interval.ms=0 | ||
debezium.source.database.hostname=localhost | ||
debezium.source.database.port=5432 | ||
debezium.source.database.user=postgres | ||
debezium.source.database.password=postgres | ||
debezium.source.database.dbname=postgres | ||
debezium.source.database.server.name=tutorial | ||
debezium.source.schema.include.list=inventory | ||
|
||
# configure batch behaviour/size | ||
debezium.source.max.batch.size=2048 | ||
debezium.source.poll.interval.ms=10000 # 5 seconds! | ||
|
||
# enable schemas | ||
|
||
# Run without Kafka, use local file to store checkpoints | ||
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory | ||
debezium.source.database.history.file.filename=data/status.dat | ||
|
||
# Iceberg sink config | ||
debezium.sink.iceberg.table-prefix=debeziumcdc_ | ||
debezium.sink.iceberg.upsert=true | ||
debezium.sink.iceberg.upsert-keep-deletes=true | ||
debezium.sink.iceberg.write.format.default=parquet | ||
debezium.sink.iceberg.catalog-name=mycatalog | ||
# Hadoop catalog, you can use other catalog supported by iceberg as well | ||
debezium.sink.iceberg.type=hadoop | ||
debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse | ||
debezium.sink.iceberg.table-namespace=debeziumevents | ||
|
||
# S3 config | ||
debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket | ||
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true | ||
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true | ||
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain | ||
debezium.sink.iceberg.fs.s3a.access.key=AWS_ACCESS_KEY | ||
debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY | ||
debezium.sink.iceberg.fs.s3a.path.style.access=true | ||
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem | ||
|
||
# enable event schemas - mandate | ||
debezium.format.value.schemas.enable=true | ||
debezium.format.key.schemas.enable=true | ||
debezium.format.value=json | ||
debezium.format.key=json | ||
|
||
# debezium unwrap message | ||
# postgres source | ||
#debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector | ||
#debezium.source.offset.storage.file.filename=data/offsets.dat | ||
#debezium.source.offset.flush.interval.ms=0 | ||
#debezium.source.database.hostname=localhost | ||
#debezium.source.database.port=5432 | ||
#debezium.source.database.user=postgres | ||
#debezium.source.database.password=postgres | ||
#debezium.source.database.dbname=postgres | ||
#debezium.source.database.server.name=tutorial | ||
#debezium.source.schema.include.list=inventory | ||
|
||
# sql server source | ||
#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector | ||
#debezium.source.offset.storage.file.filename=data/offsets.dat | ||
#debezium.source.offset.flush.interval.ms=0 | ||
#debezium.source.database.hostname=localhost | ||
#debezium.source.database.port=5432 | ||
#debezium.source.database.user=debezium | ||
#debezium.source.database.password=debezium | ||
#debezium.source.database.dbname=debezium | ||
#debezium.source.database.server.name=tutorial | ||
#debezium.source.schema.include.list=inventory | ||
# mandate for sql server source, avoid error when snapshot and schema change | ||
#debezium.source.include.schema.changes=false | ||
|
||
# do event flattening. unwrap message! | ||
debezium.transforms=unwrap | ||
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState | ||
debezium.transforms.unwrap.add.fields=op,table,lsn,source.ts_ms,db | ||
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db | ||
debezium.transforms.unwrap.delete.handling.mode=rewrite | ||
debezium.transforms.unwrap.drop.tombstones=true | ||
|
||
# iceberg settings | ||
debezium.sink.iceberg.upsert=false | ||
debezium.sink.iceberg.upsert-keep-deletes=true | ||
debezium.sink.iceberg.table-prefix=debeziumcdc_ | ||
debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET | ||
debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse | ||
debezium.sink.iceberg.type=hadoop | ||
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog | ||
|
||
# set logging levels | ||
# ############ SET LOG LEVELS ############ | ||
quarkus.log.level=INFO | ||
quarkus.log.category."io.debezium.server.iceberg".level=DEBUG | ||
quarkus.log.category."org.apache.hadoop".level=ERROR | ||
# hadoop, parquet | ||
quarkus.log.category."org.apache.hadoop".level=WARN | ||
quarkus.log.category."org.apache.parquet".level=WARN | ||
quarkus.log.category."org.eclipse.jetty".level=WARN | ||
quarkus.log.category."org.apache.iceberg".level=ERROR | ||
# Ignore messages below warning level from Jetty, because it's a bit verbose | ||
quarkus.log.category."org.eclipse.jetty".level=WARN |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Caveats | ||
## Only iceberg V2 table supported | ||
This connector only writes using iceberg table V2 spec (delete events will be written to delete files instead of rewrite data files) | ||
|
||
## No automatic schema evolution | ||
Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting. | ||
|
||
- For SQL Server, set `debezium.source.include.schema.changes=false` | ||
|
||
## Specific tables replication | ||
By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnessesary iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate too many tables that you don't really want to. More on this setting in [Debezium server source](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-table-include-list). | ||
|
||
## AWS S3 credentials | ||
You can setup aws credentials in the following ways: | ||
- Option 1: use `debezium.sink.iceberg.fs.s3a.access.key` and `debezium.sink.iceberg.fs.s3a.secret.key` in `application.properties` | ||
- Option 2: inject credentials to environment variables `AWS_ACCESS_KEY` and `AWS_SECRET_ACCESS_KEY` | ||
- Option 3: setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters