Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ acceptedBreaks:
new: "method boolean org.apache.iceberg.expressions.BoundTerm<T>::isEquivalentTo(org.apache.iceberg.expressions.BoundTerm<?>)"
justification: "new API method"
- code: "java.method.addedToInterface"
new: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::addedFiles(org.apache.iceberg.io.FileIO)"
new: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::addedDataFiles(org.apache.iceberg.io.FileIO)"
justification: "Allow adding a new method to the interface - old method is deprecated"
- code: "java.method.addedToInterface"
new: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::deletedFiles(org.apache.iceberg.io.FileIO)"
new: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::removedDataFiles(org.apache.iceberg.io.FileIO)"
justification: "Allow adding a new method to the interface - old method is deprecated"
- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.ManifestFile> org.apache.iceberg.Snapshot::allManifests(org.apache.iceberg.io.FileIO)"
Expand Down
51 changes: 38 additions & 13 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public interface Snapshot extends Serializable {
@Deprecated
List<ManifestFile> deleteManifests();


/**
* Return a {@link ManifestFile} for each delete manifest in this snapshot.
*
Expand All @@ -133,50 +132,76 @@ public interface Snapshot extends Serializable {
Map<String, String> summary();

/**
* Return all files added to the table in this snapshot.
* Return all data files added to the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @return all files added to the table in this snapshot.
* @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#addedFiles(FileIO)} instead.
* @return all data files added to the table in this snapshot.
* @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#addedDataFiles(FileIO)} instead.
*/
@Deprecated
Iterable<DataFile> addedFiles();

/**
* Return all files added to the table in this snapshot.
* Return all data files added to the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return all files added to the table in this snapshot.
* @return all data files added to the table in this snapshot.
*/
Iterable<DataFile> addedFiles(FileIO io);
Iterable<DataFile> addedDataFiles(FileIO io);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed methods haven't been released yet so we won't break anyone.


/**
* Return all files deleted from the table in this snapshot.
* Return all data files deleted from the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @return all files deleted from the table in this snapshot.
* @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#deletedFiles(FileIO)} instead.
* @return all data files deleted from the table in this snapshot.
* @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#removedDataFiles(FileIO)} instead.
*/
@Deprecated
Iterable<DataFile> deletedFiles();

/**
* Return all files deleted from the table in this snapshot.
* Return all data files removed from the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return all data files removed from the table in this snapshot.
*/
Iterable<DataFile> removedDataFiles(FileIO io);

/**
* Return all delete files added to the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return all files deleted from the table in this snapshot.
* @return all delete files added to the table in this snapshot
*/
Iterable<DataFile> deletedFiles(FileIO io);
default Iterable<DeleteFile> addedDeleteFiles(FileIO io) {
throw new UnsupportedOperationException(this.getClass().getName() + " doesn't implement addedDeleteFiles");
}

/**
* Return all delete files removed from the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return all delete files removed from the table in this snapshot
*/
default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
throw new UnsupportedOperationException(this.getClass().getName() + " doesn't implement removedDeleteFiles");
}

/**
* Return the location of this snapshot's manifest list, or null if it is not separate.
Expand Down
96 changes: 73 additions & 23 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,7 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -54,8 +56,10 @@ class BaseSnapshot implements Snapshot {
private transient List<ManifestFile> allManifests = null;
private transient List<ManifestFile> dataManifests = null;
private transient List<ManifestFile> deleteManifests = null;
private transient List<DataFile> cachedAdds = null;
private transient List<DataFile> cachedDeletes = null;
private transient List<DataFile> addedDataFiles = null;
private transient List<DataFile> removedDataFiles = null;
private transient List<DeleteFile> addedDeleteFiles = null;
private transient List<DeleteFile> removedDeleteFiles = null;

/**
* For testing only.
Expand Down Expand Up @@ -235,55 +239,101 @@ public List<ManifestFile> deleteManifests() {
}

@Override
public List<DataFile> addedFiles(FileIO fileIO) {
if (cachedAdds == null) {
cacheChanges(fileIO);
public List<DataFile> addedDataFiles(FileIO fileIO) {
if (addedDataFiles == null) {
cacheDataFileChanges(fileIO);
}
return cachedAdds;
return addedDataFiles;
}

/**
* @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#addedFiles(FileIO)} instead.
* @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#addedDataFiles(FileIO)} instead.
*/
@Override
@Deprecated
public List<DataFile> addedFiles() {
if (cachedAdds == null) {
cacheChanges(io);
if (addedDataFiles == null) {
cacheDataFileChanges(io);
}
return cachedAdds;
return addedDataFiles;
}

@Override
public List<DataFile> deletedFiles(FileIO fileIO) {
if (cachedDeletes == null) {
cacheChanges(fileIO);
public List<DataFile> removedDataFiles(FileIO fileIO) {
if (removedDataFiles == null) {
cacheDataFileChanges(fileIO);
}
return cachedDeletes;
return removedDataFiles;
}

/**
* @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#deletedFiles(FileIO)} instead.
* @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#removedDataFiles(FileIO)} instead.
*/
@Override
@Deprecated
public List<DataFile> deletedFiles() {
if (cachedDeletes == null) {
cacheChanges(io);
if (removedDataFiles == null) {
cacheDataFileChanges(io);
}
return cachedDeletes;
return removedDataFiles;
}

@Override
public Iterable<DeleteFile> addedDeleteFiles(FileIO fileIO) {
if (addedDeleteFiles == null) {
cacheDeleteFileChanges(fileIO);
}
return addedDeleteFiles;
}

@Override
public Iterable<DeleteFile> removedDeleteFiles(FileIO fileIO) {
if (removedDeleteFiles == null) {
cacheDeleteFileChanges(fileIO);
}
return removedDeleteFiles;
}

@Override
public String manifestListLocation() {
return manifestListLocation;
}

private void cacheChanges(FileIO fileIO) {
if (fileIO == null) {
throw new IllegalArgumentException("Cannot cache changes: FileIO is null");
private void cacheDeleteFileChanges(FileIO fileIO) {
Preconditions.checkArgument(fileIO != null, "Cannot cache delete file changes: FileIO is null");

ImmutableList.Builder<DeleteFile> adds = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> deletes = ImmutableList.builder();

Iterable<ManifestFile> changedManifests = Iterables.filter(deleteManifests(fileIO),
manifest -> Objects.equal(manifest.snapshotId(), snapshotId));

for (ManifestFile manifest : changedManifests) {
try (ManifestReader<DeleteFile> reader = ManifestFiles.readDeleteManifest(manifest, fileIO, null)) {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning: the spec map is null.

If I remember correctly, ManifestReader will use the Avro header metadata to parse the schema and spec. While it is generally not reliable as the schema may be old, I think it should be fine as long as we don't do any binding or filtering (like in this case).

for (ManifestEntry<DeleteFile> entry : reader.entries()) {
switch (entry.status()) {
case ADDED:
adds.add(entry.file().copy());
break;
case DELETED:
deletes.add(entry.file().copyWithoutStats());
break;
default:
// ignore existing
}
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close manifest reader", e);
}
}

this.addedDeleteFiles = adds.build();
this.removedDeleteFiles = deletes.build();
}

private void cacheDataFileChanges(FileIO fileIO) {
Preconditions.checkArgument(fileIO != null, "Cannot cache data file changes: FileIO is null");

ImmutableList.Builder<DataFile> adds = ImmutableList.builder();
ImmutableList.Builder<DataFile> deletes = ImmutableList.builder();

Expand All @@ -310,8 +360,8 @@ private void cacheChanges(FileIO fileIO) {
throw new RuntimeIOException(e, "Failed to close entries while caching changes");
}

this.cachedAdds = adds.build();
this.cachedDeletes = deletes.build();
this.addedDataFiles = adds.build();
this.removedDataFiles = deletes.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public CherryPickOperation cherrypick(long snapshotId) {
set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(snapshotId));

// Pick modifications from the snapshot
for (DataFile addedFile : cherrypickSnapshot.addedFiles(io)) {
for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) {
add(addedFile);
}

Expand All @@ -106,13 +106,13 @@ public CherryPickOperation cherrypick(long snapshotId) {

// copy adds from the picked snapshot
this.replacedPartitions = PartitionSet.create(specsById);
for (DataFile addedFile : cherrypickSnapshot.addedFiles(io)) {
for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) {
add(addedFile);
replacedPartitions.add(addedFile.specId(), addedFile.partition());
}

// copy deletes from the picked snapshot
for (DataFile deletedFile : cherrypickSnapshot.deletedFiles(io)) {
for (DataFile deletedFile : cherrypickSnapshot.removedDataFiles(io)) {
delete(deletedFile);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public static List<DataFile> newFiles(
return newFiles;
}

Iterables.addAll(newFiles, currentSnapshot.addedFiles(io));
Iterables.addAll(newFiles, currentSnapshot.addedDataFiles(io));
}

ValidationException.check(Objects.equals(lastSnapshot.parentId(), baseSnapshotId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ public void testWithExpiringDanglingStageCommit() {
expectedDeletes.add(snapshotA.manifestListLocation());

// Files should be deleted of dangling staged snapshot
snapshotB.addedFiles(table.io()).forEach(i -> {
snapshotB.addedDataFiles(table.io()).forEach(i -> {
expectedDeletes.add(i.path().toString());
});

Expand Down Expand Up @@ -982,7 +982,7 @@ public void testWithCherryPickTableSnapshot() {

// Make sure no dataFiles are deleted for the B, C, D snapshot
Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> {
i.addedFiles(table.io()).forEach(item -> {
i.addedDataFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
Expand Down Expand Up @@ -1035,7 +1035,7 @@ public void testWithExpiringStagedThenCherrypick() {

// Make sure no dataFiles are deleted for the staged snapshot
Lists.newArrayList(snapshotB).forEach(i -> {
i.addedFiles(table.io()).forEach(item -> {
i.addedDataFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
Expand All @@ -1048,7 +1048,7 @@ public void testWithExpiringStagedThenCherrypick() {

// Make sure no dataFiles are deleted for the staged and cherry-pick
Lists.newArrayList(snapshotB, snapshotD).forEach(i -> {
i.addedFiles(table.io()).forEach(item -> {
i.addedDataFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
Expand Down
Loading