Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor improvements #20

Merged
merged 16 commits into from
Sep 19, 2021
48 changes: 0 additions & 48 deletions BLOGPOST.md

This file was deleted.

16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
This project adds iceberg consumer to [debezium server application](https://debezium.io/documentation/reference/operations/debezium-server.html). it could be used to
replicate database changes to iceberg table(Cloud storage, hdfs) without requiring Spark, Kafka or Streaming platform.

![Debezium Iceberg](docs/images/Debezium-Iceberg.png)

## `iceberg` Consumer

Iceberg consumer appends or upserts debezium events to destination iceberg tables. When event and key schemas
Expand All @@ -27,13 +29,13 @@ values received then the record with higher `__op` priority is added to destinat

### Append
Setting `debezium.sink.iceberg.upsert=false` will set the operation mode to append, with append mode data deduplication is not done, all received records are appended to destination table
Note: For the tables without primary key operation mode is append even configuration is set to upsert mode
Note: For the tables without primary key operation mode falls back to append even configuration is set to upsert mode

#### Keeping Deleted Records

By default `debezium.sink.iceberg.upsert-keep-deletes=true` will keep deletes in the iceberg table, setting it to false
will remove deleted records from the destination iceberg table. With this config its possible to keep last version of the deleted
record in the table(to do soft deletes).
record in the table(possible to do soft deletes).

### Optimizing batch size (or commit interval)

Expand All @@ -50,7 +52,7 @@ this should be configured with `debezium.source.max.queue.size` and `debezium.so
This is default configuration by default consumer will not use any batch size wait

#### DynamicBatchSizeWait

**Deprecated**
This wait strategy dynamically adds wait to increase batch size. Wait duration is calculated based on number of processed events in
last 3 batches. if last batch sizes are lower than `max.batch.size` Wait duration will increase and if last batch sizes
are bigger than 90% of `max.batch.size` Wait duration will decrease
Expand All @@ -66,9 +68,9 @@ debezium.sink.batch.batch-size-wait.max-wait-ms=5000
```
#### MaxBatchSizeWait

MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait
DynamicBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`
maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties
MaxBatchSizeWait uses debezium metrics to optimize batch size, this strategy is more precise compared to DynamicBatchSizeWait.
MaxBatchSizeWait periodically reads streaming queue current size and waits until it reaches to `max.batch.size`.
Maximum wait and check intervals are controlled by `debezium.sink.batch.batch-size-wait.max-wait-ms`, `debezium.sink.batch.batch-size-wait.wait-interval-ms` properties.

example setup to receive ~2048 events per commit. maximum wait is set to 30 seconds, streaming queue current size checked every 5 seconds
```properties
Expand Down Expand Up @@ -98,7 +100,7 @@ database table = `inventory.customers` will be replicated to `default.testc_cdc_

## Debezium Event Flattening

Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported
Iceberg consumer requires event flattening, Currently nested events and complex data types(like Struct) are not supported.

```properties
debezium.transforms=unwrap
Expand Down
1 change: 0 additions & 1 deletion debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
@Dependent
@Named("DynamicBatchSizeWait")
@Deprecated
public class DynamicBatchSizeWait implements InterfaceBatchSizeWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class);

Expand Down
Binary file added docs/images/Debezium-Iceberg.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 8 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@
<version>${revision}</version>
<packaging>pom</packaging>

<repositories>
<repository>
<id>nexus-orgapacheiceberg</id>
<url>https://repository.apache.org/content/repositories/orgapacheiceberg-1018/</url>
</repository>
</repositories>

<properties>
<revision>0.1.0-SNAPSHOT</revision>

Expand All @@ -43,7 +36,8 @@
<version.awssdk>2.16.88</version.awssdk>
<version.parquet>1.11.1</version.parquet>
<!-- Debezium -->
<version.debezium>1.7.0.Alpha1</version.debezium>
<version.debezium>1.7.0.CR1</version.debezium>
<version.mysql.driver>8.0.26</version.mysql.driver>
<!-- Quarkus -->
<version.quarkus>2.0.3.Final</version.quarkus>
</properties>
Expand Down Expand Up @@ -77,6 +71,12 @@
<scope>import</scope>
</dependency>

<!-- MySQL JDBC Driver, Binlog reader, Geometry support -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${version.mysql.driver}</version>
</dependency>
<!-- debezium server -->
<dependency>
<groupId>io.debezium</groupId>
Expand Down