Skip to content

Commit

Permalink
Improve documentation (#258)
Browse files Browse the repository at this point in the history
* Improve code comments

* Improve code comments

* Improve code comments

* Improve documentation

* Improve documentation

* Improve documentation

(cherry picked from commit 514b4d0)
  • Loading branch information
ismailsimsek committed Jan 1, 2024
1 parent fe69d7d commit 6cbd9b8
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 141 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Also, check [caveats](docs/CAVEATS.md) for better understanding the current limi
- cd into unzipped folder: `cd appdist`
- Create `application.properties` file and config it: `nano conf/application.properties`, you can check the example
configuration
in [application.properties.example](debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example)
in [application.properties.example](debezium-server-iceberg-dist%2Fsrc%2Fmain%2Fresources%2Fdistro%2Fconf%2Fapplication.properties.example)
- Run the server using provided script: `bash run.sh`

# Debezium python runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ debezium.format.value=json
debezium.format.key=json

# saving debezium state data to destination, iceberg tables
# see https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
# see https://debezium.io/documentation/reference/stable/development/engine.html#database-history-properties
debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
throws InterruptedException {
Instant start = Instant.now();

//group events by destination
//group events by destination (per iceberg table)
Map<String, List<IcebergChangeEvent>> result =
records.stream()
.map((ChangeEvent<Object, Object> e)
Expand All @@ -167,7 +167,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
icebergTableOperator.addToTable(icebergTable, tableEvents.getValue());
}

// workaround! somehow offset is not saved to file unless we call committer.markProcessed
// workaround! somehow offset is not saved to file unless we call committer.markProcessed per event
// even it's should be saved to file periodically
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Processed event '{}'", record);
Expand All @@ -176,13 +176,14 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
committer.markBatchFinished();
this.logConsumerProgress(records.size());

// waiting to group events as bathes
batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());
}

/**
* @param tableId iceberg table identifier
* @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found
* @return iceberg table, throws RuntimeException when table not found and it's not possible to create it
* @return iceberg table, throws RuntimeException when table not found, and it's not possible to create it
*/
public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sampleEvent) {
return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> {
Expand All @@ -200,7 +201,9 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample
}

/**
* @param numUploadedEvents periodically log number of events consumed
* periodically log number of events consumed
*
* @param numUploadedEvents number of events consumed
*/
protected void logConsumerProgress(long numUploadedEvents) {
numConsumedEvents += numUploadedEvents;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.util.concurrent.atomic.AtomicReference;

/**
*
* Converts iceberg json event to Iceberg GenericRecord. Extracts event schema and key fields. Converts event schema to Iceberg Schema.
*
* @author Ismail Simsek
*/
public class IcebergChangeEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package io.debezium.server.iceberg.batchsizewait;

/**
* Implementation of the consumer that delivers the messages to iceberg tables.
* When enabled dds waiting to the consumer to control batch size. I will turn the processing to batch processing.
*
* @author Ismail Simsek
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import static org.apache.iceberg.types.Types.NestedField.required;

/**
* A {@link SchemaHistory} implementation that stores the schema history to database table
* A {@link SchemaHistory} implementation that stores the schema history to Iceberg table
*
* @author Ismail Simsek
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import static org.apache.iceberg.types.Types.NestedField.required;

/**
* Implementation of OffsetBackingStore that saves data to database table.
* Implementation of OffsetBackingStore that saves data to Iceberg table.
*/
@Dependent
public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implements OffsetBackingStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.LoggerFactory;

/**
* Wrapper to perform operations in iceberg tables
* Wrapper to perform operations on iceberg tables
*
* @author Rafael Acevedo
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Iceberg Table Writer Factory to get TaskWriter for the table. upsert modes used to return correct writer.
*
*/
@Dependent
public class IcebergTableWriterFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
Expand Down

This file was deleted.

18 changes: 12 additions & 6 deletions docs/CAVEATS.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# Caveats

## Only iceberg V2 table supported
This connector only writes using iceberg table V2 spec (delete events will be written to delete files instead of rewrite data files)

This connector only writes using iceberg table V2 spec (delete events will be written to delete files(merge on read)
instead of rewrite data files)

## No automatic schema evolution
Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the
connector throw error. To work around this, turn off schema change event in `source` setting.

- For SQL Server, set `debezium.source.include.schema.changes=false`
Full schema evaluation is not supported. But sema expansion like field addition is supported,
see `debezium.sink.iceberg.allow-field-addition` setting.

## Specific tables replication

Expand All @@ -18,7 +20,11 @@ in [Debezium server source](https://debezium.io/documentation/reference/stable/c
.

## AWS S3 credentials

You can setup aws credentials in the following ways:
- Option 1: use `debezium.sink.iceberg.fs.s3a.access.key` and `debezium.sink.iceberg.fs.s3a.secret.key` in `application.properties`

- Option 1: use `debezium.sink.iceberg.fs.s3a.access.key` and `debezium.sink.iceberg.fs.s3a.secret.key`
in `application.properties`
- Option 2: inject credentials to environment variables `AWS_ACCESS_KEY` and `AWS_SECRET_ACCESS_KEY`
- Option 3: setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3).
- Option 3: setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be
found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3).
Loading

0 comments on commit 6cbd9b8

Please sign in to comment.