Skip to content

Commit

Permalink
Use Append commit when there is no delete files (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Apr 20, 2023
1 parent b9d1a67 commit f000a1e
Showing 1 changed file with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.*;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.WriteResult;
Expand Down Expand Up @@ -179,10 +176,16 @@ private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> ev

writer.close();
WriteResult files = writer.complete();
RowDelta newRowDelta = icebergTable.newRowDelta();
Arrays.stream(files.dataFiles()).forEach(newRowDelta::addRows);
Arrays.stream(files.deleteFiles()).forEach(newRowDelta::addDeletes);
newRowDelta.commit();
if (files.deleteFiles().length > 0) {
RowDelta newRowDelta = icebergTable.newRowDelta();
Arrays.stream(files.dataFiles()).forEach(newRowDelta::addRows);
Arrays.stream(files.deleteFiles()).forEach(newRowDelta::addDeletes);
newRowDelta.commit();
} else {
AppendFiles appendFiles = icebergTable.newAppend();
Arrays.stream(files.dataFiles()).forEach(appendFiles::appendFile);
appendFiles.commit();
}

} catch (IOException ex) {
throw new DebeziumException(ex);
Expand Down

0 comments on commit f000a1e

Please sign in to comment.