Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling delete events and logging message #366

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.debezium.server.iceberg.tableoperator;

import com.google.common.collect.Sets;
import io.debezium.DebeziumException;
import org.apache.iceberg.*;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
Expand All @@ -13,6 +14,8 @@
import java.io.IOException;
import java.util.List;

import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.opFieldName;

abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {

private final Schema schema;
Expand Down Expand Up @@ -50,16 +53,20 @@ InternalRecordWrapper wrapper() {
@Override
public void write(Record row) throws IOException {
RowDataDeltaWriter writer = route(row);
// @TODO __op field should not be hardcoded! when unwrapped its __op when not ist op
if (upsert && !row.getField("__op").equals("c")) {// anything which not an insert is upsert
final Object opFieldValue = row.getField(opFieldName);
if (opFieldValue == null) {
throw new DebeziumException("The value for field `" + opFieldName + "` is missing. " +
"This field is required when updating or deleting data, when running in upsert mode."
);
}
if (upsert && !opFieldValue.equals("c")) {// anything which not an insert is upsert
writer.delete(row);
}
// if its deleted row and upsertKeepDeletes = true then add deleted record to target table
// else deleted records are deleted from target table
// @TODO __op field should not be hardcoded! when unwrapped its __op when not ist op
if (
upsertKeepDeletes
|| !(row.getField("__op").equals("d")))// anything which not an insert is upsert
|| !(opFieldValue.equals("d")))// anything which not an insert is upsert
{
writer.write(row);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* Wrapper to perform operations on iceberg tables
*
Expand All @@ -41,10 +40,9 @@ public class IcebergTableOperator {

static final ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
protected static final String opFieldName = "__op";
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op")
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
boolean allowFieldAddition;
@ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true")
Expand Down Expand Up @@ -97,9 +95,9 @@ private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {

if (result == 0) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
result = cdcOperations.getOrDefault(lhs.get(opColumn).asText("c"), -1)
result = cdcOperations.getOrDefault(lhs.get(opFieldName).asText("c"), -1)
.compareTo(
cdcOperations.getOrDefault(rhs.get(opColumn).asText("c"), -1)
cdcOperations.getOrDefault(rhs.get(opFieldName).asText("c"), -1)
);
}

Expand Down Expand Up @@ -195,7 +193,7 @@ private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> ev
appendFiles.commit();
}
} catch (IOException ex) {
throw new DebeziumException(ex);
throw new DebeziumException("Failed to write data to table:`" + icebergTable.name() + "`", ex);
}

LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());
Expand Down
1 change: 0 additions & 1 deletion docs/DOCS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ tables created automatically with the first start.
| `debezium.sink.iceberg.upsert` | `true` | Running consumer in upsert mode, overwriting updated rows. explained below. |
| `debezium.sink.iceberg.upsert-keep-deletes` | `true` | When running with upsert mode, keeps deleted rows in target table (soft delete). |
| `debezium.sink.iceberg.upsert-dedup-column` | `__source_ts_ms` | With upsert mode used to deduplicate data. row with highest `__source_ts_ms` kept(last change event). _dont change!_ |
| `debezium.sink.iceberg.upsert-op-column` | `__op` | Used with upsert mode to determine event priority(order). _dont change!_ |
| `debezium.sink.iceberg.create-identifier-fields` | `true` | When set to false the consumer will create tables without identifier fields. useful when user wants to consume nested events with append only mode. |
| `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination iceberg table name. For example with this setting, its possible to combine some tables `table_ptt1`,`table_ptt2` to one `table_combined`. |
| `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp replace part to modify destination iceberg table name |
Expand Down