diff --git a/README.md b/README.md index 0427256f..3f21d24c 100644 --- a/README.md +++ b/README.md @@ -8,11 +8,24 @@ This project adds iceberg consumer to [Debezium Server](https://debezium.io/docu replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform. More detail available in [documentation page](docs/DOCS.md) +Also, check [caveats](docs/CAVEATS.md) for better understanding the current limitation and proper workaround ![Debezium Iceberg](docs/images/debezium-iceberg.png) +# Install from source +- Requirements: + - JDK 11 + - Maven +- Clone from repo: `git clone https://github.com/memiiso/debezium-server-iceberg.git` +- From the root of the project: + - Build and package debezium server: `mvn -Passembly -Dmaven.test.skip package` + - After building, unzip your server distribution: `unzip debezium-server-iceberg-dist/target/debezium-server-iceberg-dist*.zip -d appdist` + - cd into unzipped folder: `cd appdist` + - Create `application.properties` file and config it: `nano conf/application.properties`, you can check the example configuration in [application.properties.example](debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example) + - Run the server using provided script: `bash run.sh` + # Contributing -The Memiiso community welcomes anyone that wants to help out in any way, whether that includes reporting problems, helping with documentation, or contributing code changes to fix bugs, add tests, or implement new features. See [contributing document](CONTRIBUTE.md) for details. +The Memiiso community welcomes anyone that wants to help out in any way, whether that includes reporting problems, helping with documentation, or contributing code changes to fix bugs, add tests, or implement new features. See [contributing document](CONTRIBUTING.md) for details. ### Contributors diff --git a/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example b/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example index 8572f6ab..9b518ddb 100644 --- a/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example +++ b/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example @@ -1,43 +1,74 @@ +# Use iceberg sink debezium.sink.type=iceberg -debezium.sink.kinesis.region=eu-central-1 -debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector -debezium.source.offset.storage.file.filename=data/offsets.dat -debezium.source.offset.flush.interval.ms=0 -debezium.source.database.hostname=localhost -debezium.source.database.port=5432 -debezium.source.database.user=postgres -debezium.source.database.password=postgres -debezium.source.database.dbname=postgres -debezium.source.database.server.name=tutorial -debezium.source.schema.include.list=inventory - -# configure batch behaviour/size -debezium.source.max.batch.size=2048 -debezium.source.poll.interval.ms=10000 # 5 seconds! - -# enable schemas + +# Run without Kafka, use local file to store checkpoints +debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory +debezium.source.database.history.file.filename=data/status.dat + +# Iceberg sink config +debezium.sink.iceberg.table-prefix=debeziumcdc_ +debezium.sink.iceberg.upsert=true +debezium.sink.iceberg.upsert-keep-deletes=true +debezium.sink.iceberg.write.format.default=parquet +debezium.sink.iceberg.catalog-name=mycatalog +# Hadoop catalog, you can use other catalog supported by iceberg as well +debezium.sink.iceberg.type=hadoop +debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse +debezium.sink.iceberg.table-namespace=debeziumevents + +# S3 config +debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket +debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true +debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true +debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain +debezium.sink.iceberg.fs.s3a.access.key=AWS_ACCESS_KEY +debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY +debezium.sink.iceberg.fs.s3a.path.style.access=true +debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem + +# enable event schemas - mandate debezium.format.value.schemas.enable=true debezium.format.key.schemas.enable=true +debezium.format.value=json +debezium.format.key=json -# debezium unwrap message +# postgres source +#debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +#debezium.source.offset.storage.file.filename=data/offsets.dat +#debezium.source.offset.flush.interval.ms=0 +#debezium.source.database.hostname=localhost +#debezium.source.database.port=5432 +#debezium.source.database.user=postgres +#debezium.source.database.password=postgres +#debezium.source.database.dbname=postgres +#debezium.source.database.server.name=tutorial +#debezium.source.schema.include.list=inventory + +# sql server source +#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector +#debezium.source.offset.storage.file.filename=data/offsets.dat +#debezium.source.offset.flush.interval.ms=0 +#debezium.source.database.hostname=localhost +#debezium.source.database.port=5432 +#debezium.source.database.user=debezium +#debezium.source.database.password=debezium +#debezium.source.database.dbname=debezium +#debezium.source.database.server.name=tutorial +#debezium.source.schema.include.list=inventory +# mandate for sql server source, avoid error when snapshot and schema change +#debezium.source.include.schema.changes=false + +# do event flattening. unwrap message! debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState -debezium.transforms.unwrap.add.fields=op,table,lsn,source.ts_ms,db +debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db debezium.transforms.unwrap.delete.handling.mode=rewrite +debezium.transforms.unwrap.drop.tombstones=true -# iceberg settings -debezium.sink.iceberg.upsert=false -debezium.sink.iceberg.upsert-keep-deletes=true -debezium.sink.iceberg.table-prefix=debeziumcdc_ -debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET -debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse -debezium.sink.iceberg.type=hadoop -debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog - -# set logging levels +# ############ SET LOG LEVELS ############ quarkus.log.level=INFO -quarkus.log.category."io.debezium.server.iceberg".level=DEBUG -quarkus.log.category."org.apache.hadoop".level=ERROR +# hadoop, parquet +quarkus.log.category."org.apache.hadoop".level=WARN quarkus.log.category."org.apache.parquet".level=WARN -quarkus.log.category."org.eclipse.jetty".level=WARN -quarkus.log.category."org.apache.iceberg".level=ERROR \ No newline at end of file +# Ignore messages below warning level from Jetty, because it's a bit verbose +quarkus.log.category."org.eclipse.jetty".level=WARN \ No newline at end of file diff --git a/docs/CAVEATS.md b/docs/CAVEATS.md new file mode 100644 index 00000000..68821cb0 --- /dev/null +++ b/docs/CAVEATS.md @@ -0,0 +1,17 @@ +# Caveats +## Only iceberg V2 table supported +This connector only writes using iceberg table V2 spec (delete events will be written to delete files instead of rewrite data files) + +## No automatic schema evolution +Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting. + +- For SQL Server, set `debezium.source.include.schema.changes=false` + +## Specific tables replication +By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnessesary iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate too many tables that you don't really want to. More on this setting in [Debezium server source](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-table-include-list). + +## AWS S3 credentials +You can setup aws credentials in the following ways: +- Option 1: use `debezium.sink.iceberg.fs.s3a.access.key` and `debezium.sink.iceberg.fs.s3a.secret.key` in `application.properties` +- Option 2: inject credentials to environment variables `AWS_ACCESS_KEY` and `AWS_SECRET_ACCESS_KEY` +- Option 3: setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3). \ No newline at end of file diff --git a/docs/DOCS.md b/docs/DOCS.md index ca1f2853..f8d600a2 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -75,8 +75,8 @@ debezium.sink.batch.batch-size-wait=MaxBatchSizeWait debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector -debezium.source.max.batch.size=2048"); -debezium.source.max.queue.size=16000"); +debezium.source.max.batch.size=2048; +debezium.source.max.queue.size=16000"; debezium.sink.batch.batch-size-wait.max-wait-ms=30000 debezium.sink.batch.batch-size-wait.wait-interval-ms=5000 ``` @@ -115,30 +115,7 @@ debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg! ``` ### Example Configuration - -```properties -debezium.sink.type=iceberg -# run with append mode -debezium.sink.iceberg.upsert=false -debezium.sink.iceberg.upsert-keep-deletes=true -# iceberg -debezium.sink.iceberg.table-prefix=debeziumcdc_ -debezium.sink.iceberg.table-namespace=debeziumevents -debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET); -debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse -debezium.sink.iceberg.type=hadoop -debezium.sink.iceberg.catalog-name=mycatalog -debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog -# enable event schemas -debezium.format.value.schemas.enable=true -debezium.format.value=json -# complex nested data types are not supported, do event flattening. unwrap message! -debezium.transforms=unwrap -debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState -debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db -debezium.transforms.unwrap.delete.handling.mode=rewrite -debezium.transforms.unwrap.drop.tombstones=true -``` +Read [application.properties.example](../debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example) ## Schema Change Behaviour