diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e46fb930..e4c793c1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,41 +1,8 @@ # Contributing -We love your input! We want to make contributing to this project as easy and transparent as possible, whether it's: -- Reporting a bug -- Discussing the current state of the code -- Submitting a fix -- Proposing new features -- Becoming a maintainer +Debezium Iceberg consumer is a very young project and looking for new maintainers. There are definitively many small/big improvements to do, including documentation, adding new features to submitting bug reports. -## We Develop with Github -We use github to host code, to track issues and feature requests, as well as accept pull requests. - -## We Use [Github Flow](https://guides.github.com/introduction/flow/index.html), So All Code Changes Happen Through Pull Requests -Pull requests are the best way to propose changes to the codebase. We actively welcome your pull requests: - -1. Fork the repo and create your branch from `master`. -2. If you've added code that should be tested, add tests. -3. If you've changed APIs, update the documentation. -4. Ensure the test suite passes. -5. Make sure your code is formatted. -6. Issue that pull request! - -## Any contributions you make will be under the Apache 2.0 License -In short, when you submit code changes, your submissions are understood to be under the same [Apache-2.0 License](https://github.com/memiiso/debezium-server-iceberg/blob/master/LICENSE) that covers the project. Feel free to contact the maintainers if that's a concern. - -## Report bugs using Github's [issues](https://github.com/memiiso/debezium-server-iceberg/issues) -We use GitHub issues to track public bugs. Report a bug by [opening a new issue](); it's that easy! - -## Write bug reports with detail, background, and sample code -**Good Bug Reports** tend to have: - -- A quick summary and/or background -- Steps to reproduce - - Be specific! - - Give sample code if you can. -- What you expected would happen -- What actually happens -- Notes (possibly including why you think this might be happening, or stuff you tried that didn't work) +Please feel free to send pull request, report bugs or open feature request. ## License By contributing, you agree that your contributions will be licensed under Apache 2.0 License. diff --git a/README.md b/README.md index 4bf3278c..0427256f 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ This project adds iceberg consumer to [Debezium Server](https://debezium.io/documentation/reference/operations/debezium-server.html). It could be used to replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform. -More detail available in [Debezium Iceberg Consumer Documentation](docs/DOCS.md) +More detail available in [documentation page](docs/DOCS.md) ![Debezium Iceberg](docs/images/debezium-iceberg.png) diff --git a/debezium-server-dist/README.md b/debezium-server-dist/README.md index 9407d479..74e22f72 100644 --- a/debezium-server-dist/README.md +++ b/debezium-server-dist/README.md @@ -2,4 +2,4 @@ Copy of Debezium [debezium-server-dist](https://github.com/debezium/debezium/tree/master/debezium-server/debezium-server-dist) project -Authors are : Debezium Authors! \ No newline at end of file +Authors : Debezium Authors \ No newline at end of file diff --git a/debezium-server-dist/src/main/resources/distro/conf/application.properties.example b/debezium-server-dist/src/main/resources/distro/conf/application.properties.example index 3a1d93d5..70c9297b 100644 --- a/debezium-server-dist/src/main/resources/distro/conf/application.properties.example +++ b/debezium-server-dist/src/main/resources/distro/conf/application.properties.example @@ -1,10 +1,14 @@ debezium.sink.type=iceberg debezium.sink.iceberg.table-prefix=debeziumcdc_ +debezium.sink.iceberg.upsert=true +debezium.sink.iceberg.upsert-keep-deletes=true +debezium.sink.iceberg.write.format.default=parquet debezium.sink.iceberg.catalog-name=mycatalog +debezium.sink.iceberg.type=hadoop +debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse debezium.sink.iceberg.table-namespace=debeziumevents +# s3 conf debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket -debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse -debezium.sink.iceberg.type=hadoop debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain @@ -12,8 +16,6 @@ debezium.sink.iceberg.fs.s3a.access.key=my-aws-access-key debezium.sink.iceberg.fs.s3a.secret.key=my-secret-access-key debezium.sink.iceberg.fs.s3a.path.style.access=true debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem -debezium.sink.iceberg.upsert=true -debezium.sink.iceberg.upsert-keep-deletes=true # enable event schemas debezium.format.value.schemas.enable=true debezium.format.key.schemas.enable=true @@ -32,7 +34,7 @@ debezium.source.database.dbname=postgres debezium.source.database.server.name=tutorial debezium.source.schema.include.list=inventory -# complex nested data types are not supported, do event flattening. unwrap message! +# do event flattening. unwrap message! debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db diff --git a/debezium-server-dist/src/main/resources/distro/run.sh b/debezium-server-dist/src/main/resources/distro/run.sh index 989963fc..c23cfea2 100755 --- a/debezium-server-dist/src/main/resources/distro/run.sh +++ b/debezium-server-dist/src/main/resources/distro/run.sh @@ -13,6 +13,12 @@ else JAVA_BINARY="$JAVA_HOME/bin/java" fi +if [ "$OSTYPE" = "msys" ] || [ "$OSTYPE" = "cygwin" ]; then + PATH_SEP=";" +else + PATH_SEP=":" +fi + RUNNER=$(ls debezium-server-*runner.jar) -exec $JAVA_BINARY $DEBEZIUM_OPTS $JAVA_OPTS -cp "$RUNNER:conf:lib/*" io.debezium.server.Main +exec $JAVA_BINARY $DEBEZIUM_OPTS $JAVA_OPTS -cp "$RUNNER"$PATH_SEP"conf"$PATH_SEP"lib/*" io.debezium.server.Main diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 5cbc4587..c394c0bc 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -150,7 +150,7 @@ public void testSchemaChanges() throws Exception { // TEST add new columns to iceberg table then check if its data populated! Table table = getTable("testc.inventory.customers"); - // NOTE column list below are in reverse order!! testing the behaviour purpose! + // NOTE column list below are in reverse order!! testing the behaviour! table.updateSchema() // NOTE test_date_column is Long type because debezium serializes date type as number .addColumn("test_date_column", Types.LongType.get()) @@ -236,6 +236,7 @@ public void testDataTypeChanges() throws Exception { Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { try { Dataset ds = getTableData("testc.inventory.data_type_changes"); + ds.printSchema(); ds.show(); return ds.where("__op == 'r'").count() == 19; } catch (Exception e) { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index d706be38..4670edac 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -54,10 +54,10 @@ public void testNestedGeomJsonRecord() throws JsonProcessingException { JsonNode jsonSchema = jsonData.get("schema"); List schemaFields = IcebergUtil.getIcebergSchema(jsonSchema); Schema schema = new Schema(schemaFields); - System.out.println(schema); - assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>")); - GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); + //System.out.println(schema); + //System.out.println(record); + assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>")); GenericRecord g = (GenericRecord) record.getField("g"); GenericRecord h = (GenericRecord) record.getField("h"); assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass())); diff --git a/docs/DOCS.md b/docs/DOCS.md index 5509afed..ca1f2853 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -1,38 +1,38 @@ # Debezium Iceberg Consumers -Replicate database changes to Iceberg table(Cloud storage, hdfs) without requiring Spark, Kafka or Streaming platform. +Replicates database CDC events to Iceberg(Cloud storage, hdfs) without using Spark, Kafka or Streaming platform. ![Debezium Iceberg](images/debezium-iceberg.png) ## `iceberg` Consumer -Iceberg consumer appends or upserts debezium events to destination Iceberg tables. When event and key schemas +Iceberg consumer replicates debezium CDC events to destination Iceberg tables. It is possible to replicate source database one to one or run it with append mode and keep all change events in iceberg table. When event and key schema enabled (`debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true`) destination Iceberg -tables created automatically. +tables created automatically with initial job. ### Upsert By default, Iceberg consumer is running with upsert mode `debezium.sink.iceberg.upsert=true`. -for the tables with Primary Key definition consumer does upsert, for the tables without Primary Key consumer falls back to append mode +Upsert mode uses source Primary Key and does upsert on target table(delete followed by insert). For the tables without Primary Key consumer falls back to append mode. #### Data Deduplication -With upsert mode data deduplication is done per batch, deduplication is done based on `__source_ts_ms` value and event type `__op` -its is possible to change field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms`, Currently only -Long field type supported +With upsert mode per batch data deduplication is done. Deduplication is done based on `__source_ts_ms` value and event type `__op`. +its is possible to change field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms`. Currently only +Long field type supported. -operation type priorities are `{"c":1, "r":2, "u":3, "d":4}` when two records with same Key and same `__source_ts_ms` -values received then the record with higher `__op` priority is added to destination table and duplicate record is dropped. +Operation type priorities are `{"c":1, "r":2, "u":3, "d":4}`. When two records with same key and same `__source_ts_ms` +values received then the record with higher `__op` priority is kept and added to destination table and duplicate record is dropped. ### Append -Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append, with append mode data deduplication is not done, all received records are appended to destination table +Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append. With append mode data deduplication is not done and all received records are appended to destination table. Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode #### Keeping Deleted Records -By default `debezium.sink.iceberg.upsert-keep-deletes=true` will keep deletes in the Iceberg table, setting it to false -will remove deleted records from the destination Iceberg table. With this config its possible to keep last version of the deleted -record in the table(possible to do soft deletes). +By default `debezium.sink.iceberg.upsert-keep-deletes=true` keeps deletes in the Iceberg table, setting it to false +will remove deleted records from the destination Iceberg table. With this config it's possible to keep last version of a +record in the destination Iceberg table(doing soft delete). ### Optimizing batch size (or commit interval) @@ -40,13 +40,13 @@ Debezium extracts database events in real time and this could cause too frequent which is not optimal for batch processing especially when near realtime data feed is sufficient. To avoid this problem following batch-size-wait classes are used. -Batch size wait adds delay between consumer calls to increase total number of events received per call and meanwhile events are collected in memory -this should be configured with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties +Batch size wait adds delay between consumer calls to increase total number of events received per call and meanwhile events are collected in memory. +This setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties #### NoBatchSizeWait -This is default configuration by default consumer will not use any batch size wait +This is default configuration by default consumer will not use any wait. All the events are consumed immediately. #### DynamicBatchSizeWait **Deprecated** @@ -54,7 +54,7 @@ This wait strategy dynamically adds wait to increase batch size. Wait duration i last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes are bigger than 90% of `max.batch.size` Wait duration will decrease -This strategy tries to keep batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size. +This strategy optimizes batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size. example setup to receive ~2048 events per commit. maximum wait is set to 5 seconds ```properties @@ -81,11 +81,11 @@ debezium.sink.batch.batch-size-wait.max-wait-ms=30000 debezium.sink.batch.batch-size-wait.wait-interval-ms=5000 ``` -### Destination Iceberg Table Names +### Table Name Mapping -Iceberg table names created by following rule : `table-namespace`.`table-prefix``database.server.name`_`database`_`table` +Iceberg tables are named by following rule : `table-namespace`.`table-prefix``database.server.name`_`database`_`table` -For example with following config +For example: ```properties debezium.sink.iceberg.table-namespace=default @@ -93,7 +93,7 @@ database.server.name=testc debezium.sink.iceberg.table-prefix=cdc_ ``` -database table = `inventory.customers` will be replicated to `default.testc_cdc_inventory_customers` +With above config database table = `inventory.customers` is replicated to `default.testc_cdc_inventory_customers` ## Debezium Event Flattening @@ -118,28 +118,55 @@ debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg! ```properties debezium.sink.type=iceberg +# run with append mode +debezium.sink.iceberg.upsert=false +debezium.sink.iceberg.upsert-keep-deletes=true +# iceberg debezium.sink.iceberg.table-prefix=debeziumcdc_ -debezium.sink.iceberg.catalog-name=mycatalog debezium.sink.iceberg.table-namespace=debeziumevents -debezium.sink.iceberg.fs.defaultFS=s3a://MY_S3_BUCKET -debezium.sink.iceberg.warehouse=s3a://MY_S3_BUCKET/iceberg_warehouse -debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true -debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true -debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain -debezium.sink.iceberg.fs.s3a.access.key=S3_ACCESS_KEY -debezium.sink.iceberg.fs.s3a.secret.key=S3_SECRET_KEY -debezium.sink.iceberg.fs.s3a.path.style.access=true -debezium.sink.iceberg.fs.s3a.endpoint=http://localhost:9000 # minio specific setting -debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem -debezium.sink.iceberg.upsert=true -debezium.sink.iceberg.upsert-keep-deletes=true +debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET); +debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse +debezium.sink.iceberg.type=hadoop +debezium.sink.iceberg.catalog-name=mycatalog +debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog +# enable event schemas debezium.format.value.schemas.enable=true -debezium.format.key.schemas.enable=true +debezium.format.value=json +# complex nested data types are not supported, do event flattening. unwrap message! +debezium.transforms=unwrap +debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState +debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db +debezium.transforms.unwrap.delete.handling.mode=rewrite +debezium.transforms.unwrap.drop.tombstones=true +``` + +## Schema Change Behaviour + +It is possible to get out of sync schemas between source and target tables. Foexample when the source database change its schema, adds or drops field. Here we documented possible schema changes and current behavior of the Iceberg consumer. + +#### Adding new column to source (A column missing in destination iceberg table) +Data of the new column is ignored till same column added to destination iceberg table + +Dor example: if a column not found in iceberg table its data is dropped ignored and not copied to target! +once iceberg table adds same column then data for this column recognized and populated + +#### Removing column from source (An extra column in iceberg table) +These columns are populated with null value + +#### Renaming column in source +This is combination of above two cases : old column will be populated with null values and new column will not be recognized and populated till it's added to iceberg table + +#### Different Data Types +This is the scenario when source field type and Target Iceberg field type are different. In this case consumer converts source field value to destination type value. Conversion is done by jackson If representation cannot be converted to destination type then default value is returned! + +for example this is conversion rule to Long type: +```Method that will try to convert value of this node to a Java long. Numbers are coerced using default Java rules; booleans convert to 0 (false) and 1 (true), and Strings are parsed using default Java language integer parsing rules. +If representation cannot be converted to a long (including structured types like Objects and Arrays), default value of 0 will be returned; no exceptions are thrown. ``` ## `icebergevents` Consumer -This is second consumer developed with this project, This consumer appends CDC events to single Iceberg table as json string. +This is second consumer in this project. This consumer appends CDC events to single Iceberg table as json string. This table partitioned by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms` #### Example Configuration