From b98f42cafa1baf75e42c2243bd3131894697e022 Mon Sep 17 00:00:00 2001 From: Ninh Chu Date: Sat, 27 Nov 2021 22:08:43 +0700 Subject: [PATCH 1/2] enhance documentation --- README.md | 14 +++- .../conf/application.properties.example | 78 +++++++++++++------ docs/CAVEATS.md | 17 ++++ docs/DOCS.md | 29 +------ 4 files changed, 87 insertions(+), 51 deletions(-) create mode 100644 docs/CAVEATS.md diff --git a/README.md b/README.md index 0427256f..81f17916 100644 --- a/README.md +++ b/README.md @@ -8,11 +8,23 @@ This project adds iceberg consumer to [Debezium Server](https://debezium.io/docu replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform. More detail available in [documentation page](docs/DOCS.md) +Also, check [caveats](docs/CAVEATS.md) for better understanding the current limitation and proper workaround ![Debezium Iceberg](docs/images/debezium-iceberg.png) +# Install from source +- Requirements: + - JDK 11 + - Maven +- Clone from repo: `git clone https://github.com/memiiso/debezium-server-iceberg.git` +- From the root of the project: + - Build and package debezium server: `mvn -Passembly -Dmaven.test.skip package` + - After building, unzip your server distribution: `debezium-server-iceberg-dist/target/debezium-server-iceberg-dist*.zip -d appdist` + - cd into unzipped folder: `cd appdist` + - Run the server using provided script: `bash run.sh` + # Contributing -The Memiiso community welcomes anyone that wants to help out in any way, whether that includes reporting problems, helping with documentation, or contributing code changes to fix bugs, add tests, or implement new features. See [contributing document](CONTRIBUTE.md) for details. +The Memiiso community welcomes anyone that wants to help out in any way, whether that includes reporting problems, helping with documentation, or contributing code changes to fix bugs, add tests, or implement new features. See [contributing document](CONTRIBUTING.md) for details. ### Contributors diff --git a/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example b/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example index 8572f6ab..58fc8384 100644 --- a/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example +++ b/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example @@ -1,5 +1,37 @@ +# Use iceberg sink debezium.sink.type=iceberg -debezium.sink.kinesis.region=eu-central-1 + +# Run without Kafka, use local file to store checkpoints +debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory +debezium.source.database.history.file.filename=data/status.dat + +# Iceberg sink config +debezium.sink.iceberg.table-prefix=debeziumcdc_ +debezium.sink.iceberg.upsert=true +debezium.sink.iceberg.upsert-keep-deletes=true +debezium.sink.iceberg.write.format.default=parquet +debezium.sink.iceberg.catalog-name=mycatalog +debezium.sink.iceberg.type=hadoop +debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse +debezium.sink.iceberg.table-namespace=debeziumevents + +# S3 config +debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket +debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true +debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true +debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain +debezium.sink.iceberg.fs.s3a.access.key=my-aws-access-key +debezium.sink.iceberg.fs.s3a.secret.key=my-secret-access-key +debezium.sink.iceberg.fs.s3a.path.style.access=true +debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem + +# enable event schemas - mandate +debezium.format.value.schemas.enable=true +debezium.format.key.schemas.enable=true +debezium.format.value=json +debezium.format.key=json + +# postgres source debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 @@ -11,33 +43,31 @@ debezium.source.database.dbname=postgres debezium.source.database.server.name=tutorial debezium.source.schema.include.list=inventory -# configure batch behaviour/size -debezium.source.max.batch.size=2048 -debezium.source.poll.interval.ms=10000 # 5 seconds! - -# enable schemas -debezium.format.value.schemas.enable=true -debezium.format.key.schemas.enable=true +# sql server source +debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector +debezium.source.offset.storage.file.filename=data/offsets.dat +debezium.source.offset.flush.interval.ms=0 +debezium.source.database.hostname=localhost +debezium.source.database.port=5432 +debezium.source.database.user=debezium +debezium.source.database.password=debezium +debezium.source.database.dbname=debezium +debezium.source.database.server.name=tutorial +debezium.source.schema.include.list=inventory +# mandate for sql server source, avoid error when snapshot and schema change +debezium.source.include.schema.changes=false -# debezium unwrap message +# do event flattening. unwrap message! debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState -debezium.transforms.unwrap.add.fields=op,table,lsn,source.ts_ms,db +debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db debezium.transforms.unwrap.delete.handling.mode=rewrite +debezium.transforms.unwrap.drop.tombstones=true -# iceberg settings -debezium.sink.iceberg.upsert=false -debezium.sink.iceberg.upsert-keep-deletes=true -debezium.sink.iceberg.table-prefix=debeziumcdc_ -debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET -debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse -debezium.sink.iceberg.type=hadoop -debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog - -# set logging levels +# ############ SET LOG LEVELS ############ quarkus.log.level=INFO -quarkus.log.category."io.debezium.server.iceberg".level=DEBUG -quarkus.log.category."org.apache.hadoop".level=ERROR +# hadoop, parquet +quarkus.log.category."org.apache.hadoop".level=WARN quarkus.log.category."org.apache.parquet".level=WARN -quarkus.log.category."org.eclipse.jetty".level=WARN -quarkus.log.category."org.apache.iceberg".level=ERROR \ No newline at end of file +# Ignore messages below warning level from Jetty, because it's a bit verbose +quarkus.log.category."org.eclipse.jetty".level=WARN \ No newline at end of file diff --git a/docs/CAVEATS.md b/docs/CAVEATS.md new file mode 100644 index 00000000..97ffaf7e --- /dev/null +++ b/docs/CAVEATS.md @@ -0,0 +1,17 @@ +# Caveats +## Only iceberg V2 table supported +This connector only writes using iceberg table V2 spec (no rewrite for delete and upsert - merge on read only) + +## Only hadoop catalog supported +This connector only packages with support for `hadoop` catalog. + +## No automatic schema evolution +Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting. + +- For SQL Server, set `debezium.source.include.schema.changes=false` + +## Specific tables replication +By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnessesary iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate too many table that you don't really want to. + +## AWS S3 credentials +You should inject environment variables `AWS_ACCESS_KEY` and `AWS_SECRET_ACCESS_KEY` to write to S3 or setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3). \ No newline at end of file diff --git a/docs/DOCS.md b/docs/DOCS.md index ca1f2853..f8d600a2 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -75,8 +75,8 @@ debezium.sink.batch.batch-size-wait=MaxBatchSizeWait debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector -debezium.source.max.batch.size=2048"); -debezium.source.max.queue.size=16000"); +debezium.source.max.batch.size=2048; +debezium.source.max.queue.size=16000"; debezium.sink.batch.batch-size-wait.max-wait-ms=30000 debezium.sink.batch.batch-size-wait.wait-interval-ms=5000 ``` @@ -115,30 +115,7 @@ debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg! ``` ### Example Configuration - -```properties -debezium.sink.type=iceberg -# run with append mode -debezium.sink.iceberg.upsert=false -debezium.sink.iceberg.upsert-keep-deletes=true -# iceberg -debezium.sink.iceberg.table-prefix=debeziumcdc_ -debezium.sink.iceberg.table-namespace=debeziumevents -debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET); -debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse -debezium.sink.iceberg.type=hadoop -debezium.sink.iceberg.catalog-name=mycatalog -debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog -# enable event schemas -debezium.format.value.schemas.enable=true -debezium.format.value=json -# complex nested data types are not supported, do event flattening. unwrap message! -debezium.transforms=unwrap -debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState -debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db -debezium.transforms.unwrap.delete.handling.mode=rewrite -debezium.transforms.unwrap.drop.tombstones=true -``` +Read [application.properties.example](../debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example) ## Schema Change Behaviour From 2dac340fd528f36ddb3b37e8527e7ffa37f437b8 Mon Sep 17 00:00:00 2001 From: Ninh Chu Date: Sun, 28 Nov 2021 21:57:36 +0700 Subject: [PATCH 2/2] more clarification and explaination --- README.md | 3 +- .../conf/application.properties.example | 47 ++++++++++--------- docs/CAVEATS.md | 12 ++--- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 81f17916..3f21d24c 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,9 @@ Also, check [caveats](docs/CAVEATS.md) for better understanding the current limi - Clone from repo: `git clone https://github.com/memiiso/debezium-server-iceberg.git` - From the root of the project: - Build and package debezium server: `mvn -Passembly -Dmaven.test.skip package` - - After building, unzip your server distribution: `debezium-server-iceberg-dist/target/debezium-server-iceberg-dist*.zip -d appdist` + - After building, unzip your server distribution: `unzip debezium-server-iceberg-dist/target/debezium-server-iceberg-dist*.zip -d appdist` - cd into unzipped folder: `cd appdist` + - Create `application.properties` file and config it: `nano conf/application.properties`, you can check the example configuration in [application.properties.example](debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example) - Run the server using provided script: `bash run.sh` # Contributing diff --git a/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example b/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example index 58fc8384..9b518ddb 100644 --- a/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example +++ b/debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example @@ -11,6 +11,7 @@ debezium.sink.iceberg.upsert=true debezium.sink.iceberg.upsert-keep-deletes=true debezium.sink.iceberg.write.format.default=parquet debezium.sink.iceberg.catalog-name=mycatalog +# Hadoop catalog, you can use other catalog supported by iceberg as well debezium.sink.iceberg.type=hadoop debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse debezium.sink.iceberg.table-namespace=debeziumevents @@ -20,8 +21,8 @@ debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain -debezium.sink.iceberg.fs.s3a.access.key=my-aws-access-key -debezium.sink.iceberg.fs.s3a.secret.key=my-secret-access-key +debezium.sink.iceberg.fs.s3a.access.key=AWS_ACCESS_KEY +debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY debezium.sink.iceberg.fs.s3a.path.style.access=true debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem @@ -32,30 +33,30 @@ debezium.format.value=json debezium.format.key=json # postgres source -debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector -debezium.source.offset.storage.file.filename=data/offsets.dat -debezium.source.offset.flush.interval.ms=0 -debezium.source.database.hostname=localhost -debezium.source.database.port=5432 -debezium.source.database.user=postgres -debezium.source.database.password=postgres -debezium.source.database.dbname=postgres -debezium.source.database.server.name=tutorial -debezium.source.schema.include.list=inventory +#debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +#debezium.source.offset.storage.file.filename=data/offsets.dat +#debezium.source.offset.flush.interval.ms=0 +#debezium.source.database.hostname=localhost +#debezium.source.database.port=5432 +#debezium.source.database.user=postgres +#debezium.source.database.password=postgres +#debezium.source.database.dbname=postgres +#debezium.source.database.server.name=tutorial +#debezium.source.schema.include.list=inventory # sql server source -debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector -debezium.source.offset.storage.file.filename=data/offsets.dat -debezium.source.offset.flush.interval.ms=0 -debezium.source.database.hostname=localhost -debezium.source.database.port=5432 -debezium.source.database.user=debezium -debezium.source.database.password=debezium -debezium.source.database.dbname=debezium -debezium.source.database.server.name=tutorial -debezium.source.schema.include.list=inventory +#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector +#debezium.source.offset.storage.file.filename=data/offsets.dat +#debezium.source.offset.flush.interval.ms=0 +#debezium.source.database.hostname=localhost +#debezium.source.database.port=5432 +#debezium.source.database.user=debezium +#debezium.source.database.password=debezium +#debezium.source.database.dbname=debezium +#debezium.source.database.server.name=tutorial +#debezium.source.schema.include.list=inventory # mandate for sql server source, avoid error when snapshot and schema change -debezium.source.include.schema.changes=false +#debezium.source.include.schema.changes=false # do event flattening. unwrap message! debezium.transforms=unwrap diff --git a/docs/CAVEATS.md b/docs/CAVEATS.md index 97ffaf7e..68821cb0 100644 --- a/docs/CAVEATS.md +++ b/docs/CAVEATS.md @@ -1,9 +1,6 @@ # Caveats ## Only iceberg V2 table supported -This connector only writes using iceberg table V2 spec (no rewrite for delete and upsert - merge on read only) - -## Only hadoop catalog supported -This connector only packages with support for `hadoop` catalog. +This connector only writes using iceberg table V2 spec (delete events will be written to delete files instead of rewrite data files) ## No automatic schema evolution Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting. @@ -11,7 +8,10 @@ Currently, there is no handler to detect schema changes and auto evolve the sche - For SQL Server, set `debezium.source.include.schema.changes=false` ## Specific tables replication -By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnessesary iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate too many table that you don't really want to. +By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnessesary iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate too many tables that you don't really want to. More on this setting in [Debezium server source](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-table-include-list). ## AWS S3 credentials -You should inject environment variables `AWS_ACCESS_KEY` and `AWS_SECRET_ACCESS_KEY` to write to S3 or setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3). \ No newline at end of file +You can setup aws credentials in the following ways: +- Option 1: use `debezium.sink.iceberg.fs.s3a.access.key` and `debezium.sink.iceberg.fs.s3a.secret.key` in `application.properties` +- Option 2: inject credentials to environment variables `AWS_ACCESS_KEY` and `AWS_SECRET_ACCESS_KEY` +- Option 3: setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3). \ No newline at end of file