Skip to content

Commit

Permalink
Minor improvements (#20)
Browse files Browse the repository at this point in the history
Minor improvements
  • Loading branch information
ismailsimsek committed Sep 19, 2021
1 parent 32ed5cf commit b9e3ec9
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 64 deletions.
48 changes: 0 additions & 48 deletions BLOGPOST.md

This file was deleted.

16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
This project adds iceberg consumer to [debezium server application](https://debezium.io/documentation/reference/operations/debezium-server.html). it could be used to
replicate database changes to iceberg table(Cloud storage, hdfs) without requiring Spark, Kafka or Streaming platform.

![Debezium Iceberg](docs/images/Debezium-Iceberg.png)

## `iceberg` Consumer

Iceberg consumer appends or upserts debezium events to destination iceberg tables. When event and key schemas
Expand All @@ -27,13 +29,13 @@ values received then the record with higher `__op` priority is added to destinat

### Append
Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append, with append mode data deduplication is not done, all received records are appended to destination table
Note: For the tables without primary key operation mode is append even configuration is set to upsert mode
Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode

#### Keeping Deleted Records

By default `debezium.sink.iceberg.upsert-keep-deletes=true` will keep deletes in the iceberg table, setting it to false
will remove deleted records from the destination iceberg table. With this config its possible to keep last version of the deleted
record in the table(to do soft deletes).
record in the table(possible to do soft deletes).

### Optimizing batch size (or commit interval)

Expand All @@ -50,7 +52,7 @@ this should be configured with `debezium.source.max.queue.size` and `debezium.so
This is default configuration by default consumer will not use any batch size wait

#### DynamicBatchSizeWait

**Deprecated**
This wait strategy dynamically adds wait to increase batch size. Wait duration is calculated based on number of processed events in
last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes
are bigger than 90% of `max.batch.size` Wait duration will decrease
Expand All @@ -66,9 +68,9 @@ debezium.sink.batch.batch-size-wait.max-wait-ms=5000
```
#### MaxBatchSizeWait

MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait
DynamicBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`
maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties
MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait.
MaxBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`.
Maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties.

example setup to receive ~2048 events per commit. maximum wait is set to 30 seconds, streaming queue current size checked every 5 seconds
```properties
Expand Down Expand Up @@ -98,7 +100,7 @@ database table = `inventory.customers` will be replicated to `default.testc_cdc_

## Debezium Event Flattening

Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported
Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported.

```properties
debezium.transforms=unwrap
Expand Down
1 change: 0 additions & 1 deletion debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
@Dependent
@Named("DynamicBatchSizeWait")
@Deprecated
public class DynamicBatchSizeWait implements InterfaceBatchSizeWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class);

Expand Down
Binary file added docs/images/Debezium-Iceberg.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 8 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@
<version>${revision}</version>
<packaging>pom</packaging>

<repositories>
<repository>
<id>nexus-orgapacheiceberg</id>
<url>https://repository.apache.org/content/repositories/orgapacheiceberg-1018/</url>
</repository>
</repositories>

<properties>
<revision>0.1.0-SNAPSHOT</revision>

Expand All @@ -43,7 +36,8 @@
<version.awssdk>2.16.88</version.awssdk>
<version.parquet>1.11.1</version.parquet>
<!-- Debezium -->
<version.debezium>1.7.0.Alpha1</version.debezium>
<version.debezium>1.7.0.CR1</version.debezium>
<version.mysql.driver>8.0.26</version.mysql.driver>
<!-- Quarkus -->
<version.quarkus>2.0.3.Final</version.quarkus>
</properties>
Expand Down Expand Up @@ -77,6 +71,12 @@
<scope>import</scope>
</dependency>

<!-- MySQL JDBC Driver, Binlog reader, Geometry support -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${version.mysql.driver}</version>
</dependency>
<!-- debezium server -->
<dependency>
<groupId>io.debezium</groupId>
Expand Down

0 comments on commit b9e3ec9

Please sign in to comment.