Skip to content

Commit

Permalink
refactor(IcebergTableOperatorUpsert): use `DeleteWriteBuilder.forTabl…
Browse files Browse the repository at this point in the history
…e` instead of setting table params manually (#44)

We were setting the parameters "manually" when there is a `forTable`
helper in the iceberg API.

Also, this refactors the `getDeleteFile` to return a `Optional`,
providing more clarity that there can be no delete file for a given
batch
  • Loading branch information
racevedoo committed Oct 25, 2021
1 parent 062f49f commit 0ad4efb
Showing 1 changed file with 11 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -26,7 +27,6 @@
import javax.inject.Named;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.apache.iceberg.*;
import org.apache.iceberg.data.GenericRecord;
Expand Down Expand Up @@ -63,11 +63,10 @@ public void initialize() {
icebergTableAppend.initialize();
}

private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) {
private Optional<DeleteFile> getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
Set<Integer> equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds();

EqualityDeleteWriter<Record> deleteWriter;

Expand All @@ -82,23 +81,18 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRe
).collect(Collectors.toList());

if (deleteRows.size() == 0) {
return null;
return Optional.empty();
}

try {
LOGGER.debug("Writing data to equality delete file: {}!", out);

deleteWriter = Parquet.writeDeletes(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.rowSchema(icebergTable.sortOrder().schema())
.withSpec(icebergTable.spec())
.equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds()))
.metricsConfig(MetricsConfig.fromProperties(icebergTable.properties()))
.forTable(icebergTable)
.equalityFieldIds(List.copyOf(icebergTable.schema().identifierFieldIds()))
.withSortOrder(icebergTable.sortOrder())
.setAll(icebergTable.properties())
.buildEqualityWriter()
;
.buildEqualityWriter();

try (Closeable toClose = deleteWriter) {
deleteWriter.deleteAll(deleteRows);
Expand All @@ -111,15 +105,15 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRe
LOGGER.debug("Creating iceberg equality delete file!");
// Equality delete files identify deleted rows in a collection of data files by one or more column values,
// and may optionally contain additional columns of the deleted row.
return FileMetadata.deleteFileBuilder(icebergTable.spec())
return Optional.of(FileMetadata.deleteFileBuilder(icebergTable.spec())
.ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds()))
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(deleteWriter.length())
.withFileSizeInBytes(deleteWriter.length())
.withRecordCount(deleteRows.size())
.withSortOrder(icebergTable.sortOrder())
.build();
.build());
}

private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {
Expand Down Expand Up @@ -171,16 +165,12 @@ public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>
// DO UPSERT >>> DELETE + INSERT
ArrayList<Record> icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords);
LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path());
Optional<DeleteFile> deleteDataFile = getDeleteFile(icebergTable, icebergRecords);
LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile.isPresent(), dataFile.path());
RowDelta c = icebergTable
.newRowDelta()
.addRows(dataFile);

if (deleteDataFile != null) {
c.addDeletes(deleteDataFile)
.validateDeletedFiles();
}
deleteDataFile.ifPresent(deleteFile -> c.addDeletes(deleteFile).validateDeletedFiles());

c.commit();
LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());
Expand Down

0 comments on commit 0ad4efb

Please sign in to comment.