Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions api/src/main/java/org/apache/iceberg/RewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,93 @@
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
* If any of the deleted files are no longer in the latest snapshot when reattempting, the commit
* will throw a {@link ValidationException}.
*
* <p>Note that the new state of the table after each rewrite must be logically equivalent to the
* original table state.
*/
public interface RewriteFiles extends SnapshotUpdate<RewriteFiles> {
/**
* Remove a data file from the current table state.
*
* <p>This rewrite operation may change the size or layout of the data files. When applicable, it
* is also recommended to discard already deleted records while rewriting data files. However, the
* set of live data records must never change.
*
* @param dataFile a rewritten data file
* @return this for method chaining
*/
default RewriteFiles deleteFile(DataFile dataFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteFile");
}

/**
* Remove a delete file from the table state.
*
* <p>This rewrite operation may change the size or layout of the delete files. When applicable,
* it is also recommended to discard delete records for files that are no longer part of the table
* state. However, the set of applicable delete records must never change.
*
* @param deleteFile a rewritten delete file
* @return this for method chaining
*/
default RewriteFiles deleteFile(DeleteFile deleteFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteFile");
}

/**
* Add a new data file.
*
* <p>This rewrite operation may change the size or layout of the data files. When applicable, it
* is also recommended to discard already deleted records while rewriting data files. However, the
* set of live data records must never change.
*
* @param dataFile a new data file
* @return this for method chaining
*/
default RewriteFiles addFile(DataFile dataFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Add a new delete file.
*
* <p>This rewrite operation may change the size or layout of the delete files. When applicable,
* it is also recommended to discard delete records for files that are no longer part of the table
* state. However, the set of applicable delete records must never change.
*
* @param deleteFile a new delete file
* @return this for method chaining
*/
default RewriteFiles addFile(DeleteFile deleteFile) {
Copy link
Member

@RussellSpitzer RussellSpitzer May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to specify that adding the same delete file/data file twice will have no effect? We required set
s before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I'll add. That said, neither DataFile nor DeleteFile implementations provide proper equality at the moment so Set was kind of useless.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, Sounds like we can drop it, although may want to fix that in the future since we don't actually support multiple instances of the same file within a table.

throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Configure the data sequence number for this rewrite operation. This data sequence number will
* be used for all new data files that are added in this rewrite. This method is helpful to avoid
* commit conflicts between data compaction and adding equality deletes.
*
* @param sequenceNumber a data sequence number
* @return this for method chaining
*/
default RewriteFiles dataSequenceNumber(long sequenceNumber) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement dataSequenceNumber");
}

/**
* Add a rewrite that replaces one set of data files with another set that contains the same data.
*
* @param filesToDelete files that will be replaced (deleted), cannot be null or empty.
* @param filesToAdd files that will be added, cannot be null or empty.
* @return this for method chaining
* @deprecated since 1.3.0, will be removed in 2.0.0
*/
@Deprecated
default RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd) {
return rewriteFiles(filesToDelete, ImmutableSet.of(), filesToAdd, ImmutableSet.of());
}
Expand All @@ -53,7 +131,9 @@ default RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> fil
* @param filesToAdd files that will be added, cannot be null or empty.
* @param sequenceNumber sequence number to use for all data files added
* @return this for method chaining
* @deprecated since 1.3.0, will be removed in 2.0.0
*/
@Deprecated
RewriteFiles rewriteFiles(
Set<DataFile> filesToDelete, Set<DataFile> filesToAdd, long sequenceNumber);

Expand All @@ -65,7 +145,9 @@ RewriteFiles rewriteFiles(
* @param dataFilesToAdd data files that will be added.
* @param deleteFilesToAdd delete files that will be added.
* @return this for method chaining.
* @deprecated since 1.3.0, will be removed in 2.0.0
*/
@Deprecated
RewriteFiles rewriteFiles(
Set<DataFile> dataFilesToReplace,
Set<DeleteFile> deleteFilesToReplace,
Expand Down
76 changes: 50 additions & 26 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,41 @@ protected String operation() {
return DataOperations.REPLACE;
}

private void verifyInputAndOutputFiles(
Set<DataFile> dataFilesToDelete,
Set<DeleteFile> deleteFilesToDelete,
Set<DataFile> dataFilesToAdd,
Set<DeleteFile> deleteFilesToAdd) {
Preconditions.checkNotNull(dataFilesToDelete, "Data files to delete can not be null");
Preconditions.checkNotNull(deleteFilesToDelete, "Delete files to delete can not be null");
Preconditions.checkNotNull(dataFilesToAdd, "Data files to add can not be null");
Preconditions.checkNotNull(deleteFilesToAdd, "Delete files to add can not be null");
@Override
public RewriteFiles deleteFile(DataFile dataFile) {
replacedDataFiles.add(dataFile);
delete(dataFile);
return self();
}

int filesToDelete = 0;
filesToDelete += dataFilesToDelete.size();
filesToDelete += deleteFilesToDelete.size();
@Override
public RewriteFiles deleteFile(DeleteFile deleteFile) {
delete(deleteFile);
return self();
}

Preconditions.checkArgument(filesToDelete > 0, "Files to delete cannot be null or empty");
@Override
public RewriteFiles addFile(DataFile dataFile) {
add(dataFile);
return self();
}

if (deleteFilesToDelete.isEmpty()) {
Preconditions.checkArgument(
deleteFilesToAdd.isEmpty(),
"Delete files to add must be empty because there's no delete file to be rewritten");
}
@Override
public RewriteFiles addFile(DeleteFile deleteFile) {
add(deleteFile);
return self();
}

@Override
public RewriteFiles dataSequenceNumber(long sequenceNumber) {
setNewDataFilesDataSequenceNumber(sequenceNumber);
return self();
}

@Override
public RewriteFiles rewriteFiles(
Set<DataFile> filesToDelete, Set<DataFile> filesToAdd, long sequenceNumber) {
setNewFilesSequenceNumber(sequenceNumber);
setNewDataFilesDataSequenceNumber(sequenceNumber);
return rewriteFiles(filesToDelete, ImmutableSet.of(), filesToAdd, ImmutableSet.of());
}

Expand All @@ -80,24 +88,26 @@ public RewriteFiles rewriteFiles(
Set<DeleteFile> deleteFilesToReplace,
Set<DataFile> dataFilesToAdd,
Set<DeleteFile> deleteFilesToAdd) {
verifyInputAndOutputFiles(
dataFilesToReplace, deleteFilesToReplace, dataFilesToAdd, deleteFilesToAdd);
replacedDataFiles.addAll(dataFilesToReplace);

Preconditions.checkNotNull(dataFilesToReplace, "Replaced data files can't be null");
Preconditions.checkNotNull(deleteFilesToReplace, "Replaced delete files can't be null");
Preconditions.checkNotNull(dataFilesToAdd, "Added data files can't be null");
Preconditions.checkNotNull(deleteFilesToAdd, "Added delete files can't be null");

for (DataFile dataFile : dataFilesToReplace) {
delete(dataFile);
deleteFile(dataFile);
}

for (DeleteFile deleteFile : deleteFilesToReplace) {
delete(deleteFile);
deleteFile(deleteFile);
}

for (DataFile dataFile : dataFilesToAdd) {
add(dataFile);
addFile(dataFile);
}

for (DeleteFile deleteFile : deleteFilesToAdd) {
add(deleteFile);
addFile(deleteFile);
}

return this;
Expand All @@ -117,10 +127,24 @@ public BaseRewriteFiles toBranch(String branch) {

@Override
protected void validate(TableMetadata base, Snapshot parent) {
validateReplacedAndAddedFiles();
if (replacedDataFiles.size() > 0) {
// if there are replaced data files, there cannot be any new row-level deletes for those data
// files
validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles, parent);
}
}

private void validateReplacedAndAddedFiles() {
Preconditions.checkArgument(
deletesDataFiles() || deletesDeleteFiles(), "Files to delete cannot be empty");

Preconditions.checkArgument(
deletesDataFiles() || !addsDataFiles(),
"Data files to add must be empty because there's no data file to be rewritten");

Preconditions.checkArgument(
deletesDeleteFiles() || !addsDeleteFiles(),
"Delete files to add must be empty because there's no delete file to be rewritten");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ void delete(CharSequence path) {
deletePaths.add(path);
}

boolean containsDeletes() {
return deletePaths.size() > 0
|| deleteExpression != Expressions.alwaysFalse()
|| dropPartitions.size() > 0;
}

/**
* Filter deleted files out of a list of manifests.
*
Expand Down
38 changes: 33 additions & 5 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final List<DataFile> newFiles = Lists.newArrayList();
private Long newFilesSequenceNumber;
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFile>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -209,6 +209,22 @@ protected void delete(CharSequence path) {
filterManager.delete(path);
}

protected boolean deletesDataFiles() {
return filterManager.containsDeletes();
}

protected boolean deletesDeleteFiles() {
return deleteFilterManager.containsDeletes();
}

protected boolean addsDataFiles() {
return newFiles.size() > 0;
}

protected boolean addsDeleteFiles() {
return newDeleteFilesBySpec.size() > 0;
}

/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
Expand Down Expand Up @@ -392,7 +408,7 @@ private CloseableIterable<ManifestEntry<DataFile>> addedDataFiles(
protected void validateNoNewDeletesForDataFiles(
TableMetadata base, Long startingSnapshotId, Iterable<DataFile> dataFiles, Snapshot parent) {
validateNoNewDeletesForDataFiles(
base, startingSnapshotId, null, dataFiles, newFilesSequenceNumber != null, parent);
base, startingSnapshotId, null, dataFiles, newDataFilesDataSequenceNumber != null, parent);
}

/**
Expand Down Expand Up @@ -651,8 +667,20 @@ private CloseableIterable<ManifestEntry<DataFile>> deletedDataFiles(
return manifestGroup.entries();
}

/**
* Sets a data sequence number for new data files.
*
* @param sequenceNumber a data sequence number
* @deprecated since 1.3.0, will be removed in 1.4.0; use {@link
* #setNewDataFilesDataSequenceNumber(long)};
*/
@Deprecated
protected void setNewFilesSequenceNumber(long sequenceNumber) {
this.newFilesSequenceNumber = sequenceNumber;
setNewDataFilesDataSequenceNumber(sequenceNumber);
}

protected void setNewDataFilesDataSequenceNumber(long sequenceNumber) {
this.newDataFilesDataSequenceNumber = sequenceNumber;
}

private long startingSequenceNumber(TableMetadata metadata, Long staringSnapshotId) {
Expand Down Expand Up @@ -926,10 +954,10 @@ private ManifestFile newFilesAsManifest() {
try {
ManifestWriter<DataFile> writer = newManifestWriter(dataSpec());
try {
if (newFilesSequenceNumber == null) {
if (newDataFilesDataSequenceNumber == null) {
writer.addAll(newFiles);
} else {
newFiles.forEach(f -> writer.add(f, newFilesSequenceNumber));
newFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
}
} finally {
writer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeWrapper;
Expand Down Expand Up @@ -325,12 +324,18 @@ void doReplace(
Iterable<DataFile> addedDataFiles,
long startingSnapshotId) {
RewriteFiles rewriteFiles = table.newRewrite().validateFromSnapshot(startingSnapshotId);

for (DataFile dataFile : deletedDataFiles) {
rewriteFiles.deleteFile(dataFile);
}

for (DataFile dataFile : addedDataFiles) {
rewriteFiles.addFile(dataFile);
}

if (useStartingSequenceNumber) {
long sequenceNumber = table.snapshot(startingSnapshotId).sequenceNumber();
rewriteFiles.rewriteFiles(
Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles), sequenceNumber);
} else {
rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
rewriteFiles.dataSequenceNumber(sequenceNumber);
}

commit(rewriteFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
import java.util.Set;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,18 +52,19 @@ public RewritePositionDeletesCommitManager(Table table) {
* @param fileGroups file groups to commit
*/
public void commit(Set<RewritePositionDeletesGroup> fileGroups) {
Set<DeleteFile> rewrittenDeleteFiles = Sets.newHashSet();
Set<DeleteFile> addedDeleteFiles = Sets.newHashSet();
RewriteFiles rewriteFiles = table.newRewrite().validateFromSnapshot(startingSnapshotId);

for (RewritePositionDeletesGroup group : fileGroups) {
rewrittenDeleteFiles.addAll(group.rewrittenDeleteFiles());
addedDeleteFiles.addAll(group.addedDeleteFiles());
for (DeleteFile file : group.rewrittenDeleteFiles()) {
rewriteFiles.deleteFile(file);
}

for (DeleteFile file : group.addedDeleteFiles()) {
rewriteFiles.addFile(file);
}
}

table
.newRewrite()
.validateFromSnapshot(startingSnapshotId)
.rewriteFiles(ImmutableSet.of(), rewrittenDeleteFiles, ImmutableSet.of(), addedDeleteFiles)
.commit();
rewriteFiles.commit();
}

/**
Expand Down
Loading