Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,35 @@
*/
package org.apache.iceberg.deletes;

import java.util.Collection;
import java.util.List;
import java.util.function.LongConsumer;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.roaringbitmap.longlong.Roaring64Bitmap;

class BitmapPositionDeleteIndex implements PositionDeleteIndex {
private final Roaring64Bitmap roaring64Bitmap;
private final List<DeleteFile> deleteFiles;

BitmapPositionDeleteIndex() {
roaring64Bitmap = new Roaring64Bitmap();
this.roaring64Bitmap = new Roaring64Bitmap();
this.deleteFiles = Lists.newArrayList();
}

BitmapPositionDeleteIndex(Collection<DeleteFile> deleteFiles) {
this.roaring64Bitmap = new Roaring64Bitmap();
this.deleteFiles = Lists.newArrayList(deleteFiles);
}

BitmapPositionDeleteIndex(DeleteFile deleteFile) {
this.roaring64Bitmap = new Roaring64Bitmap();
this.deleteFiles = deleteFile != null ? Lists.newArrayList(deleteFile) : Lists.newArrayList();
}

void merge(BitmapPositionDeleteIndex that) {
roaring64Bitmap.or(that.roaring64Bitmap);
deleteFiles.addAll(that.deleteFiles);
}

@Override
Expand All @@ -48,6 +65,7 @@ public void merge(PositionDeleteIndex that) {
merge((BitmapPositionDeleteIndex) that);
} else {
that.forEach(this::delete);
deleteFiles.addAll(that.deleteFiles());
}
}

Expand All @@ -65,4 +83,9 @@ public boolean isEmpty() {
public void forEach(LongConsumer consumer) {
roaring64Bitmap.forEach(consumer::accept);
}

@Override
public Collection<DeleteFile> deleteFiles() {
return deleteFiles;
}
}
32 changes: 29 additions & 3 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
Expand Down Expand Up @@ -123,6 +124,11 @@ public static StructLikeSet toEqualitySet(
}
}

public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> toPositionIndexes(
Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purely to avoid breaking the API.

CloseableIterable<T> posDeletes) {
return toPositionIndexes(posDeletes, null /* unknown delete file */);
}

/**
* Builds a map of position delete indexes by path.
*
Expand All @@ -131,18 +137,19 @@ public static StructLikeSet toEqualitySet(
* entire delete file content is needed (e.g. caching).
*
* @param posDeletes position deletes
* @param file the source delete file for the deletes
* @return the map of position delete indexes by path
*/
public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> toPositionIndexes(
CloseableIterable<T> posDeletes) {
CloseableIterable<T> posDeletes, DeleteFile file) {
CharSequenceMap<PositionDeleteIndex> indexes = CharSequenceMap.create();

try (CloseableIterable<T> deletes = posDeletes) {
for (T delete : deletes) {
CharSequence filePath = (CharSequence) FILENAME_ACCESSOR.get(delete);
long position = (long) POSITION_ACCESSOR.get(delete);
PositionDeleteIndex index =
indexes.computeIfAbsent(filePath, key -> new BitmapPositionDeleteIndex());
indexes.computeIfAbsent(filePath, key -> new BitmapPositionDeleteIndex(file));
index.delete(position);
}
} catch (IOException e) {
Expand All @@ -152,6 +159,20 @@ public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> toPosi
return indexes;
}

public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
CharSequence dataLocation, CloseableIterable<T> posDeletes, DeleteFile file) {
CloseableIterable<Long> positions = extractPositions(dataLocation, posDeletes);
List<DeleteFile> files = ImmutableList.of(file);
return toPositionIndex(positions, files);
}

private static <T extends StructLike> CloseableIterable<Long> extractPositions(
CharSequence dataLocation, CloseableIterable<T> rows) {
DataFileFilter<T> filter = new DataFileFilter<>(dataLocation);
CloseableIterable<T> filteredRows = filter.filter(rows);
return CloseableIterable.transform(filteredRows, row -> (Long) POSITION_ACCESSOR.get(row));
}

public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
CharSequence dataLocation, List<CloseableIterable<T>> deleteFiles) {
return toPositionIndex(dataLocation, deleteFiles, ThreadPools.getDeleteWorkerPool());
Expand All @@ -176,8 +197,13 @@ public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
}

public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDeletes) {
return toPositionIndex(posDeletes, ImmutableList.of());
}

private static PositionDeleteIndex toPositionIndex(
CloseableIterable<Long> posDeletes, List<DeleteFile> files) {
try (CloseableIterable<Long> deletes = posDeletes) {
PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex();
PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex(files);
deletes.forEach(positionDeleteIndex::delete);
return positionDeleteIndex;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public static <T> PositionDelete<T> create() {

private PositionDelete() {}

public PositionDelete<R> set(CharSequence newPath, long newPos) {
this.path = newPath;
this.pos = newPos;
this.row = null;
return this;
}

public PositionDelete<R> set(CharSequence newPath, long newPos, R newRow) {
this.path = newPath;
this.pos = newPos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.iceberg.deletes;

import java.util.Collection;
import java.util.function.LongConsumer;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

public interface PositionDeleteIndex {
/**
Expand All @@ -42,6 +45,9 @@ public interface PositionDeleteIndex {
* @param that the other index to merge
*/
default void merge(PositionDeleteIndex that) {
if (!that.deleteFiles().isEmpty()) {
throw new UnsupportedOperationException(getClass().getName() + " does not support merge");
}
that.forEach(this::delete);
}

Expand Down Expand Up @@ -72,6 +78,15 @@ default void forEach(LongConsumer consumer) {
}
}

/**
* Returns delete files that this index was created from or an empty collection if unknown.
*
* @return delete files that this index was created from
*/
default Collection<DeleteFile> deleteFiles() {
return ImmutableList.of();
}

/** Returns an empty immutable position delete index. */
static PositionDeleteIndex empty() {
return EmptyPositionDeleteIndex.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceMap;
import org.apache.iceberg.util.CharSequenceSet;
import org.roaringbitmap.longlong.PeekableLongIterator;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.apache.iceberg.util.ContentFileUtil;

/**
* A position delete writer that is capable of handling unordered deletes without rows.
Expand All @@ -41,14 +42,20 @@
* records are not ordered by file and position as required by the spec. If the incoming deletes are
* ordered by an external process, use {@link PositionDeleteWriter} instead.
*
* <p>If configured, this writer can also load previous deletes using the provided function and
* merge them with incoming ones prior to flushing the deletes into a file. Callers must ensure only
* previous file-scoped deletes are loaded because partition-scoped deletes can apply to multiple
* data files and can't be safely discarded.
*
* <p>Note this writer stores only positions. It does not store deleted records.
*/
public class SortingPositionOnlyDeleteWriter<T>
implements FileWriter<PositionDelete<T>, DeleteWriteResult> {

private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers;
private final DeleteGranularity granularity;
private final CharSequenceMap<Roaring64Bitmap> positionsByPath;
private final CharSequenceMap<PositionDeleteIndex> positionsByPath;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an area we'd want to explore using Map<String, PositionDeleteIndex> instead of the CharSequenceMap? Doesn't need to be in this PR, more so just wondering

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think it'll probably make more sense to look at that when I do the update to use location instead of the deprecated path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to keep this as CharSequenceMap as writers may use arbitrary CharSequence implementations and it is a bit different from DataFile/DeleteFile structs.

private final Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes;
private DeleteWriteResult result = null;

public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>, DeleteWriteResult> writer) {
Expand All @@ -58,17 +65,26 @@ public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>, DeleteWrite
public SortingPositionOnlyDeleteWriter(
Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
DeleteGranularity granularity) {
this(writers, granularity, path -> null /* no access to previous deletes */);
}

public SortingPositionOnlyDeleteWriter(
Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
DeleteGranularity granularity,
Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
this.writers = writers;
this.granularity = granularity;
this.positionsByPath = CharSequenceMap.create();
this.loadPreviousDeletes = loadPreviousDeletes;
}

@Override
public void write(PositionDelete<T> positionDelete) {
CharSequence path = positionDelete.path();
long position = positionDelete.pos();
Roaring64Bitmap positions = positionsByPath.computeIfAbsent(path, Roaring64Bitmap::new);
positions.add(position);
PositionDeleteIndex positions =
positionsByPath.computeIfAbsent(path, key -> new BitmapPositionDeleteIndex());
positions.delete(position);
}

@Override
Expand Down Expand Up @@ -106,14 +122,16 @@ private DeleteWriteResult writePartitionDeletes() throws IOException {
private DeleteWriteResult writeFileDeletes() throws IOException {
List<DeleteFile> deleteFiles = Lists.newArrayList();
CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
List<DeleteFile> rewrittenDeleteFiles = Lists.newArrayList();

for (CharSequence path : positionsByPath.keySet()) {
DeleteWriteResult writeResult = writeDeletes(ImmutableList.of(path));
deleteFiles.addAll(writeResult.deleteFiles());
referencedDataFiles.addAll(writeResult.referencedDataFiles());
rewrittenDeleteFiles.addAll(writeResult.rewrittenDeleteFiles());
}

return new DeleteWriteResult(deleteFiles, referencedDataFiles);
return new DeleteWriteResult(deleteFiles, referencedDataFiles, rewrittenDeleteFiles);
}

@SuppressWarnings("CollectionUndefinedEquality")
Expand All @@ -123,22 +141,38 @@ private DeleteWriteResult writeDeletes(Collection<CharSequence> paths) throws IO
}

FileWriter<PositionDelete<T>, DeleteWriteResult> writer = writers.get();
List<DeleteFile> rewrittenDeleteFiles = Lists.newArrayList();

try {
PositionDelete<T> positionDelete = PositionDelete.create();
for (CharSequence path : sort(paths)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another aspect I'm curious about, have we ever compared with using a TreeMap instead of sorting? It'll be the same time complexity in the end but interested in seeing if there's any significant differences in practice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say TreeMap starts to make sense if we access the collection in sorted order more than once. Otherwise, paying the extra cost during inserts may not be worth it.

// the iterator provides values in ascending sorted order
PeekableLongIterator positions = positionsByPath.get(path).getLongIterator();
while (positions.hasNext()) {
long position = positions.next();
writer.write(positionDelete.set(path, position, null /* no row */));
PositionDeleteIndex positions = positionsByPath.get(path);
PositionDeleteIndex previousPositions = loadPreviousDeletes.apply(path);
if (previousPositions != null && previousPositions.isNotEmpty()) {
validatePreviousDeletes(previousPositions);
positions.merge(previousPositions);
rewrittenDeleteFiles.addAll(previousPositions.deleteFiles());
}
positions.forEach(position -> writer.write(positionDelete.set(path, position)));
}
} finally {
writer.close();
}

return writer.result();
DeleteWriteResult writerResult = writer.result();
List<DeleteFile> deleteFiles = writerResult.deleteFiles();
CharSequenceSet referencedDataFiles = writerResult.referencedDataFiles();
return new DeleteWriteResult(deleteFiles, referencedDataFiles, rewrittenDeleteFiles);
}

private void validatePreviousDeletes(PositionDeleteIndex index) {
Preconditions.checkArgument(
index.deleteFiles().stream().allMatch(this::isFileScoped),
"Previous deletes must be file-scoped");
}

private boolean isFileScoped(DeleteFile deleteFile) {
return ContentFileUtil.referencedDataFile(deleteFile) != null;
}

private Collection<CharSequence> sort(Collection<CharSequence> paths) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.iceberg.io;

import java.util.List;
import java.util.function.Function;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.deletes.SortingPositionOnlyDeleteWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
Expand All @@ -45,6 +47,8 @@ public class FanoutPositionOnlyDeleteWriter<T>
private final DeleteGranularity granularity;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;
private final List<DeleteFile> rewrittenDeleteFiles;
private final Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes;

public FanoutPositionOnlyDeleteWriter(
FileWriterFactory<T> writerFactory,
Expand All @@ -60,13 +64,31 @@ public FanoutPositionOnlyDeleteWriter(
FileIO io,
long targetFileSizeInBytes,
DeleteGranularity granularity) {
this(
writerFactory,
fileFactory,
io,
targetFileSizeInBytes,
granularity,
path -> null /* no access to previous deletes */);
}

public FanoutPositionOnlyDeleteWriter(
FileWriterFactory<T> writerFactory,
OutputFileFactory fileFactory,
FileIO io,
long targetFileSizeInBytes,
DeleteGranularity granularity,
Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.granularity = granularity;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
this.rewrittenDeleteFiles = Lists.newArrayList();
this.loadPreviousDeletes = loadPreviousDeletes;
}

@Override
Expand All @@ -76,17 +98,19 @@ protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(
() ->
new RollingPositionDeleteWriter<>(
writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition),
granularity);
granularity,
loadPreviousDeletes);
}

@Override
protected void addResult(DeleteWriteResult result) {
deleteFiles.addAll(result.deleteFiles());
referencedDataFiles.addAll(result.referencedDataFiles());
rewrittenDeleteFiles.addAll(result.rewrittenDeleteFiles());
}

@Override
protected DeleteWriteResult aggregatedResult() {
return new DeleteWriteResult(deleteFiles, referencedDataFiles);
return new DeleteWriteResult(deleteFiles, referencedDataFiles, rewrittenDeleteFiles);
}
}
Loading