Skip to content

Commit

Permalink
improve documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 30, 2021
1 parent e71c447 commit 494b0da
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 48 deletions.
101 changes: 55 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
![Java CI with Maven](https://github.com/memiiso/debezium-server-iceberg/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master)

# Any contribution is welcome

This is new project and there are many thins to improve, please feel free to send PR or open bug,feature requests

# Debezium Iceberg Consumers

-----
Expand All @@ -9,66 +13,71 @@ replicate database changes to iceberg table, without requiring Spark, Kafka or S

## `iceberg` Consumer

Appends json events to destination iceberg tables. Destination tables are created automatically if event and key schemas
enabled `debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true`
when destination table is not exists Consumer will print a warning message and continue replication of other tables
Appends debezium events to destination iceberg tables. When event and key schemas
enabled (`debezium.format.value.schemas.enable=true`, `debezium.format.key.schemas.enable=true`) destination iceberg
tables created automatically. When destination table is not exists Consumer will print warning message and continue with
replication of other tables.

### Upsert

By default `debezium.sink.iceberg.upsert=true` upsert feature is enabled, for tables with Primary Key definition it will
do upsert, for the tables without Primary Key it falls back to append mode
By default (`debezium.sink.iceberg.upsert=true`) upsert feature enabled, for tables with Primary Key definition consumer
will do upsert, for the tables without Primary Key consumer will falls back to append mode

Setting `debezium.sink.iceberg.upsert=false` will change insert mode to append.

#### Data Deduplication

when iceberg consumer is doing upsert it does data deduplication for the batch, deduplication is done based
When iceberg consumer is doing upsert it does data deduplication for the batch, deduplication is done based
on `__source_ts_ms` field and event type `__op`
its is possible to change field using `debezium.sink.iceberg.upsert-source-ts-ms-column=__source_ts_ms`, Currently only
Long field type supported

operation type priorities are `{"c":1, "r":2, "u":3, "d":4}` when two record with same Key having same `__source_ts_ms`
values then the record with higher `__op` priority is kept
values then the record with higher `__op` priority is kept.

#### Keeping Deleted Records

By default `debezium.sink.iceberg.upsert-keep-deletes=true` will keep deletes in the iceberg table, setting it to false
will remove deleted records from the iceberg table too. with this feature its possible to keep last version of the
deleted record.
will remove deleted records from the iceberg table. With this config its possible to keep last version of the deleted
record.

### Iceberg Table Names

iceberg table names are created by following rule : `table-namespace`
iceberg table names created by following rule : `table-namespace`
.`table-prefix``database.server.name`_`database`_`table`

For example

```properties
debezium.sink.iceberg.table-namespace = default
database.server.name = testc
debezium.sink.iceberg.table-namespace=default
database.server.name=testc
debezium.sink.iceberg.table-prefix=cdc_
```

database table = `inventory.customers` will be replicated to `default.testc_cdc_inventory_customers`

### Optimizing batch size (or commit interval)

@TODO dynamic wait

### Example Configuration

```properties
debezium.sink.type = iceberg
debezium.sink.iceberg.table-prefix = debeziumcdc_
debezium.sink.iceberg.table-namespace = default
debezium.sink.iceberg.catalog-name = default
debezium.sink.iceberg.fs.defaultFS = s3a://MY_S3_BUCKET
debezium.sink.iceberg.warehouse = s3a://MY_S3_BUCKET/iceberg_warehouse
debezium.sink.iceberg.user.timezone = UTC
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4 = true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4 = true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider = com.amazonaws.auth.DefaultAWSCredentialsProviderChain
debezium.sink.iceberg.fs.s3a.access.key = S3_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.secret.key = S3_SECRET_KEY
debezium.sink.iceberg.fs.s3a.path.style.access = true
debezium.sink.iceberg.fs.s3a.endpoint = http://localhost:9000 # minio specific setting
debezium.sink.iceberg.fs.s3a.impl = org.apache.hadoop.fs.s3a.S3AFileSystem
debezium.sink.type=iceberg
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.table-namespace=default
debezium.sink.iceberg.catalog-name=default
debezium.sink.iceberg.fs.defaultFS=s3a://MY_S3_BUCKET
debezium.sink.iceberg.warehouse=s3a://MY_S3_BUCKET/iceberg_warehouse
debezium.sink.iceberg.user.timezone=UTC
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
debezium.sink.iceberg.fs.s3a.access.key=S3_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.secret.key=S3_SECRET_KEY
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.endpoint=http://localhost:9000 # minio specific setting
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
debezium.sink.iceberg.dynamic-wait=true
debezium.sink.batch.dynamic-wait.max-wait-ms=300000
debezium.sink.iceberg.upsert=true
Expand All @@ -82,19 +91,19 @@ debezium.format.key.schemas.enable=true
All the properties starting with `debezium.sink.iceberg.**` are passed to iceberg, and hadoopConf

```properties
debezium.sink.iceberg.{iceberg.prop.name} = xyz-value # passed to iceberg!
debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg!
```

## `icebergevents` Consumer

This consumer appends CDC events to single iceberg table as json string. This table is partitioned
This consumer appends CDC events to single iceberg table as json string. This table partitioned
by `event_destination,event_sink_timestamptz` and sorted by `event_sink_epoch_ms`

#### Example Configuration

````properties
debezium.sink.type = icebergevents
debezium.sink.iceberg.catalog-name = default
debezium.sink.type=icebergevents
debezium.sink.iceberg.catalog-name=default
debezium.sink.iceberg.dynamic-wait=true
debezium.sink.batch.dynamic-wait.max-wait-ms=300000
````
Expand All @@ -103,19 +112,19 @@ Iceberg table definition:

```java
static final String TABLE_NAME="debezium_events";
static final Schema TABLE_SCHEMA = new Schema(
required(1, "event_destination", Types.StringType.get()),
optional(2, "event_key", Types.StringType.get()),
optional(3, "event_value", Types.StringType.get()),
optional(4, "event_sink_epoch_ms", Types.LongType.get()),
optional(5, "event_sink_timestamptz", Types.TimestampType.withZone())
static final Schema TABLE_SCHEMA=new Schema(
required(1,"event_destination",Types.StringType.get()),
optional(2,"event_key",Types.StringType.get()),
optional(3,"event_value",Types.StringType.get()),
optional(4,"event_sink_epoch_ms",Types.LongType.get()),
optional(5,"event_sink_timestamptz",Types.TimestampType.withZone())
);
static final PartitionSpec TABLE_PARTITION = PartitionSpec.builderFor(TABLE_SCHEMA)
static final PartitionSpec TABLE_PARTITION=PartitionSpec.builderFor(TABLE_SCHEMA)
.identity("event_destination")
.hour("event_sink_timestamptz")
.build();
static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA)
.asc("event_sink_epoch_ms", NullOrder.NULLS_LAST)
static final SortOrder TABLE_SORT_ORDER=SortOrder.builderFor(TABLE_SCHEMA)
.asc("event_sink_epoch_ms",NullOrder.NULLS_LAST)
.build();
```

Expand All @@ -124,11 +133,11 @@ static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA)
Iceberg consumer requires event flattening, Currently nested events and complex data types(like maps) are not supported

```properties
debezium.transforms = unwrap
debezium.transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields = op,table,lsn,source.ts_ms
debezium.transforms.unwrap.add.headers = db
debezium.transforms.unwrap.delete.handling.mode = rewrite
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
debezium.transforms.unwrap.add.headers=db
debezium.transforms.unwrap.delete.handling.mode=rewrite
```

## Controlling Batch Size
Expand All @@ -148,7 +157,7 @@ frequent commits with larger batch size

```properties
# Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.
debezium.source.max.batch.size = 2
debezium.source.max.batch.size=2
debezium.sink.iceberg.dynamic-wait=true
debezium.sink.batch.dynamic-wait.max-wait-ms=300000
```
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
@Inject
BatchDynamicWait dynamicWait;

private TableIdentifier tableIdentifier;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Catalog icebergCatalog;
Table eventTable;
Expand All @@ -110,7 +109,7 @@ void connect() throws InterruptedException {
throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
}

tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events");
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events");

Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(this.hadoopConf::set);
Expand Down

0 comments on commit 494b0da

Please sign in to comment.