Skip to content

Commit

Permalink
reworked partition field implementation.
Browse files Browse the repository at this point in the history
Partitioning is now decoupled from upsert config key. Any field can be specified.

based on memiiso/debezium-server-iceberg#108
  • Loading branch information
wobu committed Nov 21, 2022
1 parent b627277 commit 76ad0b8
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 336 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## [Unreleased]

- removed 'table.write-format', can be replaced with 'iceberg.table-default.write.format.default'
- Reworked partition field implementation. Added config key 'table.partition-field'. Removed dependency on 'upsert' feature

## [0.2.0] - 2022-11-16

Expand Down
45 changes: 19 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@ mvn clean package

### Configuration reference

| Key | Type | Default value | Description |
|-------------------------|---------|----------------|----------------------------------------------------------------------------------------------------------------------------------------|
| upsert | boolean | true | When *true* Iceberg rows will be updated based on table primary key. When *false* all modification will be added as separate rows. |
| upsert.keep-deletes | boolean | true | When *true* delete operation will leave a tombstone that will have only a primary key and *__deleted** flag set to true |
| upsert.dedup-column | String | __source_ts_ms | Column used to check which state is newer during upsert |
| upsert.op-column | String | __op | Column used to check which state is newer during upsert when *upsert.dedup-column* is not enough to resolve |
| allow-field-addition | boolean | true | When *true* sink will be adding new columns to Iceberg tables on schema changes |
| table.auto-create | boolean | false | When *true* sink will automatically create new Iceberg tables |
| table.namespace | String | default | Table namespace. In Glue it will be used as database name |
| table.prefix | String | *empty string* | Prefix added to all table names |
| iceberg.name | String | default | Iceberg catalog name |
| iceberg.catalog-impl | String | *null* | Iceberg catalog implementation (Only one of iceberg.catalog-impl and iceberg.type can be set to non null value at the same time |
| iceberg.type | String | *null* | Iceberg catalog type (Only one of iceberg.catalog-impl and iceberg.type can be set to non null value at the same time) |
| iceberg.* | | | All properties with this prefix will be passed to Iceberg Catalog implementation |
| iceberg.table-default.* | | | Iceberg specific table settings can be changed with this prefix, e.g. 'iceberg.table-default.write.format.default' can be set to 'orc' |
| Key | Type | Default value | Description |
|-------------------------|---------|----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| upsert | boolean | true | When *true* Iceberg rows will be updated based on table primary key. When *false* all modification will be added as separate rows. |
| upsert.keep-deletes | boolean | true | When *true* delete operation will leave a tombstone that will have only a primary key and *__deleted** flag set to true |
| upsert.dedup-column | String | __source_ts_ms | Column used to check which state is newer during upsert |
| upsert.op-column | String | __op | Column used to check which state is newer during upsert when *upsert.dedup-column* is not enough to resolve |
| allow-field-addition | boolean | true | When *true* sink will be adding new columns to Iceberg tables on schema changes |
| table.auto-create | boolean | false | When *true* sink will automatically create new Iceberg tables |
| table.namespace | String | default | Table namespace. In Glue it will be used as database name |
| table.prefix | String | *empty string* | Prefix added to all table names |
| table.partition-field | String | *null* | Field name used for partitioning. When a Timestamp or Date field is used, the partitioning is transformed by 'day()', otherwise 'identity()' is used. |
| iceberg.name | String | default | Iceberg catalog name |
| iceberg.catalog-impl | String | *null* | Iceberg catalog implementation (Only one of iceberg.catalog-impl and iceberg.type can be set to non null value at the same time |
| iceberg.type | String | *null* | Iceberg catalog type (Only one of iceberg.catalog-impl and iceberg.type can be set to non null value at the same time) |
| iceberg.* | | | All properties with this prefix will be passed to Iceberg Catalog implementation |
| iceberg.table-default.* | | | Iceberg specific table settings can be changed with this prefix, e.g. 'iceberg.table-default.write.format.default' can be set to 'orc' |


### REST / Manual based installation
Expand Down Expand Up @@ -302,18 +303,10 @@ Rows cannot be updated nor removed unless primary key is defined. In case of del

### Iceberg partitioning support

Currently, partitioning is done automatically based on event time. Partitioning only works when Debezium is configured in append-only mode (`upsert: false`).
In case when the Iceberg table gets auto created, a partitioning spec can be configured by setting config key `table.partition-field` to the field which shall be partitioned by.
When the field is of [type](https://iceberg.apache.org/spec/#partition-transforms) `date` or `timestamp` the [partition transformation](https://iceberg.apache.org/spec/#partition-transforms) `day()` is applied, otherwise `identity()` is used.

Any event produced by debezium source contains a source time at which the transaction was committed:

```sql
"sourceOffset": {
...
"ts_ms": "1482918357011"
}
```

From this value day part is extracted and used as partition.
Note: At the moment only the fields `__ts_ms`, `__source_ts_ms` are converted to the type `timestamp`. This fields are normally added by debezium transformed, e.g. [New Record State Extraction](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html#_adding_metadata)

## Debezium change event format support

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId,
if (!configuration.isTableAutoCreate()) {
throw new ConnectException(String.format("Table '%s' not found! Set '%s' to true to create tables automatically!", tableId, TABLE_AUTO_CREATE));
}
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), !configuration.isUpsert());
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), configuration.getTablePartitionField());
});
}
}
Loading

0 comments on commit 76ad0b8

Please sign in to comment.