Skip to content

Commit

Permalink
Improve documentation and document schema change behaviour (#51)
Browse files Browse the repository at this point in the history
* improve docs
  • Loading branch information
ismailsimsek committed Nov 3, 2021
1 parent 032f0bc commit 95534a6
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 83 deletions.
37 changes: 2 additions & 35 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,41 +1,8 @@
# Contributing
We love your input! We want to make contributing to this project as easy and transparent as possible, whether it's:

- Reporting a bug
- Discussing the current state of the code
- Submitting a fix
- Proposing new features
- Becoming a maintainer
Debezium Iceberg consumer is a very young project and looking for new maintainers. There are definitively many small/big improvements to do, including documentation, adding new features to submitting bug reports.

## We Develop with Github
We use github to host code, to track issues and feature requests, as well as accept pull requests.

## We Use [Github Flow](https://guides.github.com/introduction/flow/index.html), So All Code Changes Happen Through Pull Requests
Pull requests are the best way to propose changes to the codebase. We actively welcome your pull requests:

1. Fork the repo and create your branch from `master`.
2. If you've added code that should be tested, add tests.
3. If you've changed APIs, update the documentation.
4. Ensure the test suite passes.
5. Make sure your code is formatted.
6. Issue that pull request!

## Any contributions you make will be under the Apache 2.0 License
In short, when you submit code changes, your submissions are understood to be under the same [Apache-2.0 License](https://github.com/memiiso/debezium-server-iceberg/blob/master/LICENSE) that covers the project. Feel free to contact the maintainers if that's a concern.

## Report bugs using Github's [issues](https://github.com/memiiso/debezium-server-iceberg/issues)
We use GitHub issues to track public bugs. Report a bug by [opening a new issue](); it's that easy!

## Write bug reports with detail, background, and sample code
**Good Bug Reports** tend to have:

- A quick summary and/or background
- Steps to reproduce
- Be specific!
- Give sample code if you can.
- What you expected would happen
- What actually happens
- Notes (possibly including why you think this might be happening, or stuff you tried that didn't work)
Please feel free to send pull request, report bugs or open feature request.

## License
By contributing, you agree that your contributions will be licensed under Apache 2.0 License.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
This project adds iceberg consumer to [Debezium Server](https://debezium.io/documentation/reference/operations/debezium-server.html). It could be used to
replicate database CDC changes to Iceberg table (Cloud Storage, HDFS) in realtime, without requiring Spark, Kafka or Streaming platform.

More detail available in [Debezium Iceberg Consumer Documentation](docs/DOCS.md)
More detail available in [documentation page](docs/DOCS.md)

![Debezium Iceberg](docs/images/debezium-iceberg.png)

Expand Down
2 changes: 1 addition & 1 deletion debezium-server-dist/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ Copy of
Debezium [debezium-server-dist](https://github.com/debezium/debezium/tree/master/debezium-server/debezium-server-dist)
project

Authors are : Debezium Authors!
Authors : Debezium Authors
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
debezium.sink.type=iceberg
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=mycatalog
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse
debezium.sink.iceberg.table-namespace=debeziumevents
# s3 conf
debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket
debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
debezium.sink.iceberg.fs.s3a.access.key=my-aws-access-key
debezium.sink.iceberg.fs.s3a.secret.key=my-secret-access-key
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
Expand All @@ -32,7 +34,7 @@ debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.include.list=inventory

# complex nested data types are not supported, do event flattening. unwrap message!
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
Expand Down
8 changes: 7 additions & 1 deletion debezium-server-dist/src/main/resources/distro/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ else
JAVA_BINARY="$JAVA_HOME/bin/java"
fi

if [ "$OSTYPE" = "msys" ] || [ "$OSTYPE" = "cygwin" ]; then
PATH_SEP=";"
else
PATH_SEP=":"
fi

RUNNER=$(ls debezium-server-*runner.jar)

exec $JAVA_BINARY $DEBEZIUM_OPTS $JAVA_OPTS -cp "$RUNNER:conf:lib/*" io.debezium.server.Main
exec $JAVA_BINARY $DEBEZIUM_OPTS $JAVA_OPTS -cp "$RUNNER"$PATH_SEP"conf"$PATH_SEP"lib/*" io.debezium.server.Main
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void testSchemaChanges() throws Exception {

// TEST add new columns to iceberg table then check if its data populated!
Table table = getTable("testc.inventory.customers");
// NOTE column list below are in reverse order!! testing the behaviour purpose!
// NOTE column list below are in reverse order!! testing the behaviour!
table.updateSchema()
// NOTE test_date_column is Long type because debezium serializes date type as number
.addColumn("test_date_column", Types.LongType.get())
Expand Down Expand Up @@ -236,6 +236,7 @@ public void testDataTypeChanges() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> {
try {
Dataset<Row> ds = getTableData("testc.inventory.data_type_changes");
ds.printSchema();
ds.show();
return ds.where("__op == 'r'").count() == 19;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public void testNestedGeomJsonRecord() throws JsonProcessingException {
JsonNode jsonSchema = jsonData.get("schema");
List<Types.NestedField> schemaFields = IcebergUtil.getIcebergSchema(jsonSchema);
Schema schema = new Schema(schemaFields);
System.out.println(schema);
assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>"));

GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload);
//System.out.println(schema);
//System.out.println(record);
assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>"));
GenericRecord g = (GenericRecord) record.getField("g");
GenericRecord h = (GenericRecord) record.getField("h");
assertEquals("AQEAAAAAAAAAAADwPwAAAAAAAPA/", g.get(0, Types.StringType.get().typeId().javaClass()));
Expand Down
99 changes: 63 additions & 36 deletions docs/DOCS.md
Original file line number Diff line number Diff line change
@@ -1,60 +1,60 @@
# Debezium Iceberg Consumers

Replicate database changes to Iceberg table(Cloud storage, hdfs) without requiring Spark, Kafka or Streaming platform.
Replicates database CDC events to Iceberg(Cloud storage, hdfs) without using Spark, Kafka or Streaming platform.

![Debezium Iceberg](images/debezium-iceberg.png)

## `iceberg` Consumer

Iceberg consumer appends or upserts debezium events to destination Iceberg tables. When event and key schemas
Iceberg consumer replicates debezium CDC events to destination Iceberg tables. It is possible to replicate source database one to one or run it with append mode and keep all change events in iceberg table. When event and key schema
enabled (`debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true`) destination Iceberg
tables created automatically.
tables created automatically with initial job.

### Upsert

By default, Iceberg consumer is running with upsert mode `debezium.sink.iceberg.upsert=true`.
for the tables with Primary Key definition consumer does upsert, for the tables without Primary Key consumer falls back to append mode
Upsert mode uses source Primary Key and does upsert on target table(delete followed by insert). For the tables without Primary Key consumer falls back to append mode.

#### Data Deduplication

With upsert mode data deduplication is done per batch, deduplication is done based on `__source_ts_ms` value and event type `__op`
its is possible to change field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms`, Currently only
Long field type supported
With upsert mode per batch data deduplication is done. Deduplication is done based on `__source_ts_ms` value and event type `__op`.
its is possible to change field using `debezium.sink.iceberg.upsert-dedup-column=__source_ts_ms`. Currently only
Long field type supported.

operation type priorities are `{"c":1, "r":2, "u":3, "d":4}` when two records with same Key and same `__source_ts_ms`
values received then the record with higher `__op` priority is added to destination table and duplicate record is dropped.
Operation type priorities are `{"c":1, "r":2, "u":3, "d":4}`. When two records with same key and same `__source_ts_ms`
values received then the record with higher `__op` priority is kept and added to destination table and duplicate record is dropped.

### Append
Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append, with append mode data deduplication is not done, all received records are appended to destination table
Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append. With append mode data deduplication is not done and all received records are appended to destination table.
Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode

#### Keeping Deleted Records

By default `debezium.sink.iceberg.upsert-keep-deletes=true` will keep deletes in the Iceberg table, setting it to false
will remove deleted records from the destination Iceberg table. With this config its possible to keep last version of the deleted
record in the table(possible to do soft deletes).
By default `debezium.sink.iceberg.upsert-keep-deletes=true` keeps deletes in the Iceberg table, setting it to false
will remove deleted records from the destination Iceberg table. With this config it's possible to keep last version of a
record in the destination Iceberg table(doing soft delete).

### Optimizing batch size (or commit interval)

Debezium extracts database events in real time and this could cause too frequent commits or too many small files
which is not optimal for batch processing especially when near realtime data feed is sufficient.
To avoid this problem following batch-size-wait classes are used.

Batch size wait adds delay between consumer calls to increase total number of events received per call and meanwhile events are collected in memory
this should be configured with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties
Batch size wait adds delay between consumer calls to increase total number of events received per call and meanwhile events are collected in memory.
This setting should be configured together with `debezium.source.max.queue.size` and `debezium.source.max.batch.size` debezium properties


#### NoBatchSizeWait

This is default configuration by default consumer will not use any batch size wait
This is default configuration by default consumer will not use any wait. All the events are consumed immediately.

#### DynamicBatchSizeWait
**Deprecated**
This wait strategy dynamically adds wait to increase batch size. Wait duration is calculated based on number of processed events in
last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes
are bigger than 90% of `max.batch.size` Wait duration will decrease

This strategy tries to keep batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size.
This strategy optimizes batch size between 85%-90% of the `max.batch.size`, it does not guarantee consistent batch size.

example setup to receive ~2048 events per commit. maximum wait is set to 5 seconds
```properties
Expand All @@ -81,19 +81,19 @@ debezium.sink.batch.batch-size-wait.max-wait-ms=30000
debezium.sink.batch.batch-size-wait.wait-interval-ms=5000
```

### Destination Iceberg Table Names
### Table Name Mapping

Iceberg table names created by following rule : `table-namespace`.`table-prefix``database.server.name`_`database`_`table`
Iceberg tables are named by following rule : `table-namespace`.`table-prefix``database.server.name`_`database`_`table`

For example with following config
For example:

```properties
debezium.sink.iceberg.table-namespace=default
database.server.name=testc
debezium.sink.iceberg.table-prefix=cdc_
```

database table = `inventory.customers` will be replicated to `default.testc_cdc_inventory_customers`
With above config database table = `inventory.customers` is replicated to `default.testc_cdc_inventory_customers`

## Debezium Event Flattening

Expand All @@ -118,28 +118,55 @@ debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg!

```properties
debezium.sink.type=iceberg
# run with append mode
debezium.sink.iceberg.upsert=false
debezium.sink.iceberg.upsert-keep-deletes=true
# iceberg
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.catalog-name=mycatalog
debezium.sink.iceberg.table-namespace=debeziumevents
debezium.sink.iceberg.fs.defaultFS=s3a://MY_S3_BUCKET
debezium.sink.iceberg.warehouse=s3a://MY_S3_BUCKET/iceberg_warehouse
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
debezium.sink.iceberg.fs.s3a.access.key=S3_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.secret.key=S3_SECRET_KEY
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.endpoint=http://localhost:9000 # minio specific setting
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET);
debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.catalog-name=mycatalog
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
# complex nested data types are not supported, do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
```

## Schema Change Behaviour

It is possible to get out of sync schemas between source and target tables. Foexample when the source database change its schema, adds or drops field. Here we documented possible schema changes and current behavior of the Iceberg consumer.

#### Adding new column to source (A column missing in destination iceberg table)
Data of the new column is ignored till same column added to destination iceberg table

Dor example: if a column not found in iceberg table its data is dropped ignored and not copied to target!
once iceberg table adds same column then data for this column recognized and populated

#### Removing column from source (An extra column in iceberg table)
These columns are populated with null value

#### Renaming column in source
This is combination of above two cases : old column will be populated with null values and new column will not be recognized and populated till it's added to iceberg table

#### Different Data Types
This is the scenario when source field type and Target Iceberg field type are different. In this case consumer converts source field value to destination type value. Conversion is done by jackson If representation cannot be converted to destination type then default value is returned!

for example this is conversion rule to Long type:
```Method that will try to convert value of this node to a Java long. Numbers are coerced using default Java rules; booleans convert to 0 (false) and 1 (true), and Strings are parsed using default Java language integer parsing rules.
If representation cannot be converted to a long (including structured types like Objects and Arrays), default value of 0 will be returned; no exceptions are thrown.
```

## `icebergevents` Consumer

This is second consumer developed with this project, This consumer appends CDC events to single Iceberg table as json string.
This is second consumer in this project. This consumer appends CDC events to single Iceberg table as json string.
This table partitioned by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms`

#### Example Configuration
Expand Down

0 comments on commit 95534a6

Please sign in to comment.