Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public String partition() {

private final Map<Integer, PartitionSpec> specsById;
private final PartitionSet deleteFilePartitions;
private final Set<F> deleteFiles = newFileSet();
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private final Set<F> deleteFiles = newFileSet();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean failAnyDelete = false;
Expand Down
69 changes: 34 additions & 35 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.ValidationException;
Expand All @@ -42,7 +41,6 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
Expand Down Expand Up @@ -82,11 +80,9 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private final ManifestFilterManager<DeleteFile> deleteFilterManager;

// update data
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
private final DataFileSet newDataFiles = DataFileSet.create();
private final DeleteFileSet newDeleteFiles = DeleteFileSet.create();
private final Map<PartitionSpec, DataFileSet> newDataFilesBySpec = Maps.newHashMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... When did we start using PartitionSpec as keys? This makes all operations more expensive. We always used Integer when indexing by specs, like PartitionMap or even newDeleteFilesBySpec below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this was introduced with #9860

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can follow up on this in a separate PR and change it to Map<Integer, DataFileSet

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think we should. Thanks!

private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
Expand Down Expand Up @@ -161,12 +157,9 @@ protected Expression rowFilter() {
}

protected List<DataFile> addedDataFiles() {
return ImmutableList.copyOf(
newDataFilesBySpec.values().stream().flatMap(List::stream).collect(Collectors.toList()));
}

protected Map<PartitionSpec, List<DataFile>> addedDataFilesBySpec() {
return ImmutableMap.copyOf(newDataFilesBySpec);
return newDataFilesBySpec.values().stream()
.flatMap(Set::stream)
.collect(ImmutableList.toImmutableList());
}

protected void failAnyDelete() {
Expand Down Expand Up @@ -236,43 +229,49 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newDataFiles.add(file)) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkArgument(
fileSpec != null,
"Cannot find partition spec %s for data file: %s",
file.specId(),
file.path());

addedFilesSummary.addedFile(fileSpec, file);
PartitionSpec spec = spec(file.specId());
Preconditions.checkArgument(
spec != null,
"Cannot find partition spec %s for data file: %s",
file.specId(),
file.location());

DataFileSet dataFiles =
newDataFilesBySpec.computeIfAbsent(spec, ignored -> DataFileSet.create());
if (dataFiles.add(file)) {
addedFilesSummary.addedFile(spec, file);
hasNewDataFiles = true;
List<DataFile> dataFiles =
newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList());
dataFiles.add(file);
}
}

private PartitionSpec spec(int specId) {
return ops.current().spec(specId);
}

/** Add a delete file to the new snapshot. */
protected void add(DeleteFile file) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
add(new DeleteFileHolder(file));
add(new PendingDeleteFile(file));
}

/** Add a delete file to the new snapshot. */
protected void add(DeleteFile file, long dataSequenceNumber) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
add(new DeleteFileHolder(file, dataSequenceNumber));
add(new PendingDeleteFile(file, dataSequenceNumber));
}

private void add(DeleteFileHolder fileHolder) {
int specId = fileHolder.deleteFile().specId();
PartitionSpec fileSpec = ops.current().spec(specId);
List<DeleteFileHolder> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());

if (newDeleteFiles.add(fileHolder.deleteFile())) {
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
private void add(PendingDeleteFile file) {
PartitionSpec spec = spec(file.specId());
Preconditions.checkArgument(
spec != null,
"Cannot find partition spec %s for delete file: %s",
file.specId(),
file.location());

DeleteFileSet deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create());
if (deleteFiles.add(file)) {
addedFilesSummary.addedFile(spec, file);
hasNewDeleteFiles = true;
}
}
Expand Down
153 changes: 143 additions & 10 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -595,20 +596,22 @@ private List<ManifestFile> writeDataFileGroup(
}

protected List<ManifestFile> writeDeleteManifests(
Collection<DeleteFileHolder> files, PartitionSpec spec) {
Collection<DeleteFile> files, PartitionSpec spec) {
return writeManifests(files, group -> writeDeleteFileGroup(group, spec));
}

private List<ManifestFile> writeDeleteFileGroup(
Collection<DeleteFileHolder> files, PartitionSpec spec) {
Collection<DeleteFile> files, PartitionSpec spec) {
RollingManifestWriter<DeleteFile> writer = newRollingDeleteManifestWriter(spec);

try (RollingManifestWriter<DeleteFile> closableWriter = writer) {
for (DeleteFileHolder file : files) {
for (DeleteFile file : files) {
Preconditions.checkArgument(
file instanceof PendingDeleteFile, "Invalid delete file: must be PendingDeleteFile");
if (file.dataSequenceNumber() != null) {
closableWriter.add(file.deleteFile(), file.dataSequenceNumber());
closableWriter.add(file, file.dataSequenceNumber());
} else {
closableWriter.add(file.deleteFile());
closableWriter.add(file);
}
}
} catch (IOException e) {
Expand Down Expand Up @@ -752,7 +755,7 @@ private static void updateTotal(
}
}

protected static class DeleteFileHolder {
protected static class PendingDeleteFile implements DeleteFile {
private final DeleteFile deleteFile;
private final Long dataSequenceNumber;

Expand All @@ -762,7 +765,7 @@ protected static class DeleteFileHolder {
* @param deleteFile delete file
* @param dataSequenceNumber data sequence number to apply
*/
DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) {
PendingDeleteFile(DeleteFile deleteFile, long dataSequenceNumber) {
this.deleteFile = deleteFile;
this.dataSequenceNumber = dataSequenceNumber;
}
Expand All @@ -772,17 +775,147 @@ protected static class DeleteFileHolder {
*
* @param deleteFile delete file
*/
DeleteFileHolder(DeleteFile deleteFile) {
PendingDeleteFile(DeleteFile deleteFile) {
this.deleteFile = deleteFile;
this.dataSequenceNumber = null;
}

public DeleteFile deleteFile() {
return deleteFile;
private PendingDeleteFile wrap(DeleteFile file) {
if (null != dataSequenceNumber) {
return new PendingDeleteFile(file, dataSequenceNumber);
}

return new PendingDeleteFile(file);
}

@Override
public Long dataSequenceNumber() {
return dataSequenceNumber;
}

@Override
public Long fileSequenceNumber() {
return deleteFile.fileSequenceNumber();
}

@Override
public DeleteFile copy() {
return wrap(deleteFile.copy());
}

@Override
public DeleteFile copyWithoutStats() {
return wrap(deleteFile.copyWithoutStats());
}

@Override
public DeleteFile copyWithStats(Set<Integer> requestedColumnIds) {
return wrap(deleteFile.copyWithStats(requestedColumnIds));
}

@Override
public DeleteFile copy(boolean withStats) {
return wrap(deleteFile.copy(withStats));
}

@Override
public String manifestLocation() {
return deleteFile.manifestLocation();
}

@Override
public Long pos() {
return deleteFile.pos();
}

@Override
public int specId() {
return deleteFile.specId();
}

@Override
public FileContent content() {
return deleteFile.content();
}

@Override
public CharSequence path() {
return deleteFile.path();
}

@Override
public String location() {
return deleteFile.location();
}

@Override
public FileFormat format() {
return deleteFile.format();
}

@Override
public StructLike partition() {
return deleteFile.partition();
}

@Override
public long recordCount() {
return deleteFile.recordCount();
}

@Override
public long fileSizeInBytes() {
return deleteFile.fileSizeInBytes();
}

@Override
public Map<Integer, Long> columnSizes() {
return deleteFile.columnSizes();
}

@Override
public Map<Integer, Long> valueCounts() {
return deleteFile.valueCounts();
}

@Override
public Map<Integer, Long> nullValueCounts() {
return deleteFile.nullValueCounts();
}

@Override
public Map<Integer, Long> nanValueCounts() {
return deleteFile.nanValueCounts();
}

@Override
public Map<Integer, ByteBuffer> lowerBounds() {
return deleteFile.lowerBounds();
}

@Override
public Map<Integer, ByteBuffer> upperBounds() {
return deleteFile.upperBounds();
}

@Override
public ByteBuffer keyMetadata() {
return deleteFile.keyMetadata();
}

@Override
public List<Long> splitOffsets() {
return deleteFile.splitOffsets();
}

@Override
public List<Integer> equalityFieldIds() {
return deleteFile.equalityFieldIds();
}

@Override
public Integer sortOrderId() {
return deleteFile.sortOrderId();
}
}
}