From f000a1e736c7ab0d98e5f151615efbabbb015989 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 20 Apr 2023 15:19:42 +0200 Subject: [PATCH] Use Append commit when there is no delete files (#188) --- .../tableoperator/IcebergTableOperator.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 48ac9211..e1b143f5 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -23,10 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; -import org.apache.iceberg.RowDelta; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.*; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.BaseTaskWriter; import org.apache.iceberg.io.WriteResult; @@ -179,10 +176,16 @@ private void addToTablePerSchema(Table icebergTable, List ev writer.close(); WriteResult files = writer.complete(); - RowDelta newRowDelta = icebergTable.newRowDelta(); - Arrays.stream(files.dataFiles()).forEach(newRowDelta::addRows); - Arrays.stream(files.deleteFiles()).forEach(newRowDelta::addDeletes); - newRowDelta.commit(); + if (files.deleteFiles().length > 0) { + RowDelta newRowDelta = icebergTable.newRowDelta(); + Arrays.stream(files.dataFiles()).forEach(newRowDelta::addRows); + Arrays.stream(files.deleteFiles()).forEach(newRowDelta::addDeletes); + newRowDelta.commit(); + } else { + AppendFiles appendFiles = icebergTable.newAppend(); + Arrays.stream(files.dataFiles()).forEach(appendFiles::appendFile); + appendFiles.commit(); + } } catch (IOException ex) { throw new DebeziumException(ex);