diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 48ac9211..e1b143f5 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -23,10 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; -import org.apache.iceberg.RowDelta; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.*; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.BaseTaskWriter; import org.apache.iceberg.io.WriteResult; @@ -179,10 +176,16 @@ private void addToTablePerSchema(Table icebergTable, List ev writer.close(); WriteResult files = writer.complete(); - RowDelta newRowDelta = icebergTable.newRowDelta(); - Arrays.stream(files.dataFiles()).forEach(newRowDelta::addRows); - Arrays.stream(files.deleteFiles()).forEach(newRowDelta::addDeletes); - newRowDelta.commit(); + if (files.deleteFiles().length > 0) { + RowDelta newRowDelta = icebergTable.newRowDelta(); + Arrays.stream(files.dataFiles()).forEach(newRowDelta::addRows); + Arrays.stream(files.deleteFiles()).forEach(newRowDelta::addDeletes); + newRowDelta.commit(); + } else { + AppendFiles appendFiles = icebergTable.newAppend(); + Arrays.stream(files.dataFiles()).forEach(appendFiles::appendFile); + appendFiles.commit(); + } } catch (IOException ex) { throw new DebeziumException(ex);