Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(IcebergTableOperatorUpsert): use DeleteWriteBuilder.forTable instead of setting table params manually #44

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -26,7 +27,6 @@
import javax.inject.Named;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.apache.iceberg.*;
import org.apache.iceberg.data.GenericRecord;
Expand Down Expand Up @@ -63,11 +63,10 @@ public void initialize() {
icebergTableAppend.initialize();
}

private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) {
private Optional<DeleteFile> getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
Set<Integer> equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds();

EqualityDeleteWriter<Record> deleteWriter;

Expand All @@ -82,23 +81,18 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRe
).collect(Collectors.toList());

if (deleteRows.size() == 0) {
return null;
return Optional.empty();
}

try {
LOGGER.debug("Writing data to equality delete file: {}!", out);

deleteWriter = Parquet.writeDeletes(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.rowSchema(icebergTable.sortOrder().schema())
.withSpec(icebergTable.spec())
.equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds()))
.metricsConfig(MetricsConfig.fromProperties(icebergTable.properties()))
.forTable(icebergTable)
.equalityFieldIds(List.copyOf(icebergTable.schema().identifierFieldIds()))
.withSortOrder(icebergTable.sortOrder())
.setAll(icebergTable.properties())
.buildEqualityWriter()
;
.buildEqualityWriter();

try (Closeable toClose = deleteWriter) {
deleteWriter.deleteAll(deleteRows);
Expand All @@ -111,15 +105,15 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRe
LOGGER.debug("Creating iceberg equality delete file!");
// Equality delete files identify deleted rows in a collection of data files by one or more column values,
// and may optionally contain additional columns of the deleted row.
return FileMetadata.deleteFileBuilder(icebergTable.spec())
return Optional.of(FileMetadata.deleteFileBuilder(icebergTable.spec())
.ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds()))
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(deleteWriter.length())
.withFileSizeInBytes(deleteWriter.length())
.withRecordCount(deleteRows.size())
.withSortOrder(icebergTable.sortOrder())
.build();
.build());
}

private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {
Expand Down Expand Up @@ -171,16 +165,12 @@ public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>
// DO UPSERT >>> DELETE + INSERT
ArrayList<Record> icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords);
LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path());
Optional<DeleteFile> deleteDataFile = getDeleteFile(icebergTable, icebergRecords);
LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile.isPresent(), dataFile.path());
RowDelta c = icebergTable
.newRowDelta()
.addRows(dataFile);

if (deleteDataFile != null) {
c.addDeletes(deleteDataFile)
.validateDeletedFiles();
}
deleteDataFile.ifPresent(deleteFile -> c.addDeletes(deleteFile).validateDeletedFiles());

c.commit();
LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());
Expand Down