Skip to content

Commit

Permalink
Improve handling delete events and logging message (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Jun 18, 2024
1 parent a96d4fc commit 4d635cd
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.debezium.server.iceberg.tableoperator;

import com.google.common.collect.Sets;
import io.debezium.DebeziumException;
import org.apache.iceberg.*;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
Expand All @@ -13,6 +14,8 @@
import java.io.IOException;
import java.util.List;

import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.opFieldName;

abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {

private final Schema schema;
Expand Down Expand Up @@ -50,16 +53,20 @@ InternalRecordWrapper wrapper() {
@Override
public void write(Record row) throws IOException {
RowDataDeltaWriter writer = route(row);
// @TODO __op field should not be hardcoded! when unwrapped its __op when not ist op
if (upsert && !row.getField("__op").equals("c")) {// anything which not an insert is upsert
final Object opFieldValue = row.getField(opFieldName);
if (opFieldValue == null) {
throw new DebeziumException("The value for field `" + opFieldName + "` is missing. " +
"This field is required when updating or deleting data, when running in upsert mode."
);
}
if (upsert && !opFieldValue.equals("c")) {// anything which not an insert is upsert
writer.delete(row);
}
// if its deleted row and upsertKeepDeletes = true then add deleted record to target table
// else deleted records are deleted from target table
// @TODO __op field should not be hardcoded! when unwrapped its __op when not ist op
if (
upsertKeepDeletes
|| !(row.getField("__op").equals("d")))// anything which not an insert is upsert
|| !(opFieldValue.equals("d")))// anything which not an insert is upsert
{
writer.write(row);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* Wrapper to perform operations on iceberg tables
*
Expand All @@ -41,10 +40,9 @@ public class IcebergTableOperator {

static final ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
protected static final String opFieldName = "__op";
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op")
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
boolean allowFieldAddition;
@ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true")
Expand Down Expand Up @@ -97,9 +95,9 @@ private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {

if (result == 0) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
result = cdcOperations.getOrDefault(lhs.get(opColumn).asText("c"), -1)
result = cdcOperations.getOrDefault(lhs.get(opFieldName).asText("c"), -1)
.compareTo(
cdcOperations.getOrDefault(rhs.get(opColumn).asText("c"), -1)
cdcOperations.getOrDefault(rhs.get(opFieldName).asText("c"), -1)
);
}

Expand Down Expand Up @@ -195,7 +193,7 @@ private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> ev
appendFiles.commit();
}
} catch (IOException ex) {
throw new DebeziumException(ex);
throw new DebeziumException("Failed to write data to table:`" + icebergTable.name() + "`", ex);
}

LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());
Expand Down
1 change: 0 additions & 1 deletion docs/DOCS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ tables created automatically with the first start.
| `debezium.sink.iceberg.upsert` | `true` | Running consumer in upsert mode, overwriting updated rows. explained below. |
| `debezium.sink.iceberg.upsert-keep-deletes` | `true` | When running with upsert mode, keeps deleted rows in target table (soft delete). |
| `debezium.sink.iceberg.upsert-dedup-column` | `__source_ts_ms` | With upsert mode used to deduplicate data. row with highest `__source_ts_ms` kept(last change event). _dont change!_ |
| `debezium.sink.iceberg.upsert-op-column` | `__op` | Used with upsert mode to determine event priority(order). _dont change!_ |
| `debezium.sink.iceberg.create-identifier-fields` | `true` | When set to false the consumer will create tables without identifier fields. useful when user wants to consume nested events with append only mode. |
| `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination iceberg table name. For example with this setting, its possible to combine some tables `table_ptt1`,`table_ptt2` to one `table_combined`. |
| `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp replace part to modify destination iceberg table name |
Expand Down

0 comments on commit 4d635cd

Please sign in to comment.