Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions api/src/main/java/org/apache/iceberg/RewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,23 @@ default RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> fil
/**
* Add a rewrite that replaces one set of files with another set that contains the same data.
*
* @param dataFilesToDelete data files that will be replaced (deleted).
* @param deleteFilesToDelete delete files that will be replaced (deleted).
* @param dataFilesToReplace data files that will be replaced (deleted).
* @param deleteFilesToReplace delete files that will be replaced (deleted).
* @param dataFilesToAdd data files that will be added.
* @param deleteFilesToAdd delete files that will be added.
* @return this for method chaining.
*/
RewriteFiles rewriteFiles(Set<DataFile> dataFilesToDelete, Set<DeleteFile> deleteFilesToDelete,
RewriteFiles rewriteFiles(Set<DataFile> dataFilesToReplace, Set<DeleteFile> deleteFilesToReplace,
Set<DataFile> dataFilesToAdd, Set<DeleteFile> deleteFilesToAdd);

/**
* Set the snapshot ID used in any reads for this operation.
* <p>
* Validations will check changes after this snapshot ID. If this is not called, all ancestor snapshots through the
* table's initial snapshot are validated.
*
* @param snapshotId a snapshot ID
* @return this for method chaining
*/
RewriteFiles validateFromSnapshot(long snapshotId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see all the snapshotId are set to be ops.current().snapshotId() except the test cases ? I'm thinking is there neccesary to introduce a method that validating since a older history snaphsot id.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if this needs to be an open api? Seems like we always want to set it to table.currentSnapshot.snapshotID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because the current snapshot's ID may not be the one that was used for the operation. This RewriteFiles operation may be created significantly later than the original planning that is done for compaction. For example, in the new Spark compaction code, the rewrite is created when a group is ready to commit. The table state may have changed from that time so using the current snapshot ID could easily skip delta commits that were interleaved, leaving the same issue that we are fixing here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also see this in the test case, where we actually create commits and then run the rewrite differently to simulate concurrent operations: https://github.com/apache/iceberg/pull/2865/files#diff-b914d99ace31a09150792c10e3b0f1e40331edb1c24233ee6912d4608cfde0c4R615-R643

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that make sense !

}
27 changes: 23 additions & 4 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@

import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final Set<DataFile> replacedDataFiles = Sets.newHashSet();
private Long startingSnapshotId = null;

BaseRewriteFiles(String tableName, TableOperations ops) {
super(tableName, ops);

Expand Down Expand Up @@ -63,15 +67,16 @@ private void verifyInputAndOutputFiles(Set<DataFile> dataFilesToDelete, Set<Dele
}

@Override
public RewriteFiles rewriteFiles(Set<DataFile> dataFilesToDelete, Set<DeleteFile> deleteFilesToDelete,
public RewriteFiles rewriteFiles(Set<DataFile> dataFilesToReplace, Set<DeleteFile> deleteFilesToReplace,
Set<DataFile> dataFilesToAdd, Set<DeleteFile> deleteFilesToAdd) {
verifyInputAndOutputFiles(dataFilesToDelete, deleteFilesToDelete, dataFilesToAdd, deleteFilesToAdd);
verifyInputAndOutputFiles(dataFilesToReplace, deleteFilesToReplace, dataFilesToAdd, deleteFilesToAdd);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we've already changed the dataFilesToDelete and deleteFilesToDelete to dataFilesToReplace & deleteFilesToReplace, then I think we should also change the all the variables & arguments in verifyInputAndOutputFiles for consistence ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx, I had a version where I did that and fixed the error messages, but it ended up being too many unrelated test changes so I'll do that in a follow-up.

replacedDataFiles.addAll(dataFilesToReplace);

for (DataFile dataFile : dataFilesToDelete) {
for (DataFile dataFile : dataFilesToReplace) {
delete(dataFile);
}

for (DeleteFile deleteFile : deleteFilesToDelete) {
for (DeleteFile deleteFile : deleteFilesToReplace) {
delete(deleteFile);
}

Expand All @@ -85,4 +90,18 @@ public RewriteFiles rewriteFiles(Set<DataFile> dataFilesToDelete, Set<DeleteFile

return this;
}

@Override
public RewriteFiles validateFromSnapshot(long snapshotId) {
this.startingSnapshotId = snapshotId;
return this;
}

@Override
protected void validate(TableMetadata base) {
if (replacedDataFiles.size() > 0) {
// if there are replaced data files, there cannot be any new row-level deletes for those data files
validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles);
Copy link
Member

@openinx openinx Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also handle the case :

  • t1: table has FILE_A and EQ_DELETE_FILE_B;
  • t2: RewriteFiles action to convert the EQ_DELETE_FILE_B to POS_DELETE_FILE_C which hold the file_id and offset from FILE_A, just started the txn but not commit;
  • t3: DeleteFiles action committed the txn which deletes the FILE_A;
  • t4: Try to commit the RewriteFiles action.

At timestamp t4, the RewriteFiles txn should be aborted because the data file FILE_A which is relied by POS_DELETE_FILE_C has been deleted.

Do we plan to add this validation in the following PR, or did not consider this case because we currently don't have introduced any RewriteFiles action that converting equality delete files into positional delete files ?

Copy link
Member

@openinx openinx Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my thought, the data conflicts between REPLACE operation and APPEND/OVERWRITE/DELETE operations is because of the dependency relationship from positional delete files to data files. Then there will be two cases:

  1. The REPLACE operation remove the data files that was relied by other committed APPEND/OVERWRITE/DELTE operations ( among the three operations, currently only the RowDelta will introduce new positional delete files which reference to committed data files). I think we handle this case perfectly in this PR.
  2. The APPEND/OVERWRITE/DELETE operations removed the data files that was relied by other committing REPLACE operation. The above comment is an example.

As a final solution, I think we should handle both the cases perfectly, so that we won't encounter any unexpected interruption when reading the iceberg table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx, I opened a draft PR, #2892, for to fix this, but I'm not sure that we actually want to merge it.

Originally, I copied the validateDataFilesExist implementation from RowDelta and added tests. That works great and is a fairly simple commit if we want it. But as I was testing it, I added cases for delete, overwrite, rewrite, and a transactional delete and append to overwrite.

Then I realized that not all of those cases apply. For example, if I delete a file that has row deltas, that's perfectly okay. The operation actually deletes all rows that weren't already deleted and any dangling positional deletes will just never get applied. Similarly, an overwrite from, for example, a MERGE INTO operation that replaces a file will need to first read that file and then build its replacement. The read must merge the existing deletes, so the rows will already be removed from the replacement file. Then the deletes in that case are also fine to leave dangling even if they are rewritten into a new file.

I ended up leaving a validation that the data files are not rewritten, but I also think that may not be necessary because I don't think it would be reasonable to rewrite a data file without also applying the deletes. If we build a compaction operation we will probably apply deletes during compaction. We could also build one that compacts just data files, but in order for that operation to be correct it would also need to rewrite delete files (that are pointing to the compacted data files) or use an older sequence number with equality deletes. If it rewrote delete files, then the delete rewrite would fail. If it changed sequence numbers, then we would have a lot more validation to do when building that plan.

After thinking about it, I don't think that we actually need to handle any cases where a compaction and delete file rewrite are concurrent, at least not from the delete file rewrite side. Any compaction should handle deletes as part of the compaction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also build one that compacts just data files, but in order for that operation to be correct it would also need to rewrite delete files (that are pointing to the compacted data files) or use an older sequence number with equality deletes

You mean people will need to know whether the data-files-only compaction will break the existing data set ? This puts too much burden on users ? I think we still need to provide the validation because we developers are sure that those cases will break the data correctness ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean people will need to know whether the data-files-only compaction will break the existing data set ? This puts too much burden on users ? I think we still need to provide the validation because we developers are sure that those cases will break the data correctness ...

The contract of the RewriteFiles operation is that it does not change the table data. That is explicitly stated in the rewriteFiles configuration method's javadoc:

Add a rewrite that replaces one set of files with another set that contains the same data.

I think that is a reasonable requirement for the operation. Compacting only data files without accounting for deletes would change the table data, even without a concurrent delete file rewrite. For example, if I have

file-a.parquet
1, 'x'
2, 'y'
file-b.parquet
3, 'z'
file-a-deletes.parquet
'file-a.parquet', 0

Compacting file-a.parquet and file-b.parquet together would undelete row id=1. There is no way for Iceberg to check whether the rewrite ignored file-a-deletes.parquet or applied the deletes. The correctness problem is entirely in the compaction operation.

We have to trust the RewriteFiles caller to do the right thing and preserve correctness.

That means either the caller must do one of the following:

  1. Remove the deleted row
  2. Rewrite file-a-deletes.parquet to point the delete at the new data file
  3. Write the new data file with a compatible sequence number (only valid if the deletes are equality deletes so they would still apply)

If the caller uses option 1, then it doesn't matter to concurrent operations that are rewriting the delete. It becomes a dangling delete that doesn't need to be applied. If the caller uses option 2, then there is a new file tracking the delete that would not be part of a concurrent rewrite operation; a concurrent rewrite may contain a dangling delete.

The only case we would need to worry about is option 3, and only if a concurrent delete rewrites an equality delete as a positional delete. To be careful, we could merge #2892 to be able to validate this case. But I'm skeptical that altering the sequence number of the compacted file is a good idea. It may not even be possible if the sequence number of a delete that must be applied is between the sequence numbers of the starting data files.

As I said, I'm okay not merging the validation for option 3 because I think it is unlikely that this is going to be a viable strategy. But I'm fine merging #2892 if you think it is needed.

}
}
}
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ protected void validate(TableMetadata base) {
validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes);
}

// TODO: does this need to check new delete files?
if (conflictDetectionFilter != null) {
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);
}
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) {
static class Builder {
private final FileIO io;
private final Set<ManifestFile> deleteManifests;
private long minSequenceNumber = 0L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember the sequence number 0 is kept for the data files for iceberg v1, so in theory the sequence number from delete files should start from 1. So setting it to 0 as the default value sounds correct.

private Map<Integer, PartitionSpec> specsById = null;
private Expression dataFilter = Expressions.alwaysTrue();
private Expression partitionFilter = Expressions.alwaysTrue();
Expand All @@ -323,6 +324,11 @@ static class Builder {
this.deleteManifests = Sets.newHashSet(deleteManifests);
}

Builder afterSequenceNumber(long seq) {
this.minSequenceNumber = seq;
return this;
}

Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
this.specsById = newSpecsById;
return this;
Expand Down Expand Up @@ -357,8 +363,10 @@ DeleteFileIndex build() {
.run(deleteFile -> {
try (CloseableIterable<ManifestEntry<DeleteFile>> reader = deleteFile) {
for (ManifestEntry<DeleteFile> entry : reader) {
// copy with stats for better filtering against data file stats
deleteEntries.add(entry.copy());
if (entry.sequenceNumber() > minSequenceNumber) {
// copy with stats for better filtering against data file stats
deleteEntries.add(entry.copy());
}
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close");
Expand Down
118 changes: 78 additions & 40 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -62,6 +63,9 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE, DataOperations.DELETE);
private static final Set<String> VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS =
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE);
// delete files can be added in "overwrite" or "delete" operations
private static final Set<String> VALIDATE_REPLACED_DATA_FILES_OPERATIONS =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I am not sure the var name conveys its purpose and does not follow the convention compared used in 2 vars above. If I understand correctly, this is a set of operations that may produce delete files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update this. And I'll also add "delete" as an operation. Just because we only create "overwrite" commits for row-level deletes today doesn't mean that we won't create "delete" commits later. I considered updating the RowDelta operation so that it created delete commits if only delete files were added, but didn't end up doing it because we don't do that for overwrites that only remove data files.

We can discuss whether to convert overwrite commits to delete commits that later, but for now I think that we should add "delete" to this list because it would be a valid commit. Just because the reference implementation doesn't produce them doesn't mean it isn't valid.

Copy link
Contributor

@aokolnychyi aokolnychyi Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same thought about using the DELETE operation type when we just add delete files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think the variable name does follow the convention. If we are validating replaced data files then those are the types of commits that are needed for validation.

I'm going to update this by adding delete, but not renaming.

ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE);

private final String tableName;
private final TableOperations ops;
Expand Down Expand Up @@ -253,28 +257,10 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI
return;
}

List<ManifestFile> manifests = Lists.newArrayList();
Set<Long> newSnapshots = Sets.newHashSet();

Long currentSnapshotId = base.currentSnapshot().snapshotId();
while (currentSnapshotId != null && !currentSnapshotId.equals(startingSnapshotId)) {
Snapshot currentSnapshot = ops.current().snapshot(currentSnapshotId);

ValidationException.check(currentSnapshot != null,
"Cannot determine history between starting snapshot %s and current %s",
startingSnapshotId, currentSnapshotId);

if (VALIDATE_ADDED_FILES_OPERATIONS.contains(currentSnapshot.operation())) {
newSnapshots.add(currentSnapshotId);
for (ManifestFile manifest : currentSnapshot.dataManifests()) {
if (manifest.snapshotId() == (long) currentSnapshotId) {
manifests.add(manifest);
}
}
}

currentSnapshotId = currentSnapshot.parentId();
}
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId, VALIDATE_ADDED_FILES_OPERATIONS, ManifestContent.DATA);
List<ManifestFile> manifests = history.first();
Set<Long> newSnapshots = history.second();

ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of())
.caseSensitive(caseSensitive)
Expand All @@ -297,6 +283,39 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI
}
}

/**
* Validates that no new delete files that must be applied to the given data files have been added to the table since
* a starting snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFiles data files to validate have no new row deletes
*/
protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId,
Iterable<DataFile> dataFiles) {
// if there is no current table state, no files have been added
if (base.currentSnapshot() == null) {
return;
}

Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId, VALIDATE_REPLACED_DATA_FILES_OPERATIONS, ManifestContent.DELETES);
List<ManifestFile> deleteManifests = history.first();

long startingSequenceNumber = startingSnapshotId == null ? 0 : base.snapshot(startingSnapshotId).sequenceNumber();
DeleteFileIndex deletes = DeleteFileIndex.builderFor(ops.io(), deleteManifests)
.afterSequenceNumber(startingSequenceNumber)
.specsById(ops.current().specsById())
.build();

for (DataFile dataFile : dataFiles) {
Copy link
Contributor

@aokolnychyi aokolnychyi Jul 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do we have to iterate through data files if the index is empty and we did not find any delete manifests? This logic will be triggered on top of various operations including copy-on-write MERGE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data files here are the ones that are in memory because they are being replaced. I don't think that this is going to be a significant slow-down since we're just doing an index check, but we can follow up with a couple improvements to make it faster.

One improvement I'd opt for before avoiding this loop is to only read manifest files that were created in the new snapshots. That is, when the snapshot ID of the delete file is one of the snapshots newer than the starting snapshot. We don't currently do that because the delete file index builder doesn't support it and it looked more invasive to update the index builder (and we're trying to get 0.12.0 out).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that would be way more important than skipping this loop.

// if any delete is found that applies to files written in or before the starting snapshot, fail
if (deletes.forDataFile(startingSequenceNumber, dataFile).length > 0) {
throw new ValidationException("Cannot commit, found new delete for replaced data file: %s", dataFile);
}
}
}

@SuppressWarnings("CollectionUndefinedEquality")
protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId,
CharSequenceSet requiredDataFiles, boolean skipDeletes) {
Expand All @@ -309,6 +328,31 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI
VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS :
VALIDATE_DATA_FILES_EXIST_OPERATIONS;

Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId, matchingOperations, ManifestContent.DATA);
List<ManifestFile> manifests = history.first();
Set<Long> newSnapshots = history.second();

ManifestGroup matchingDeletesGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of())
.filterManifestEntries(entry -> entry.status() != ManifestEntry.Status.ADDED &&
newSnapshots.contains(entry.snapshotId()) && requiredDataFiles.contains(entry.file().path()))
.specsById(base.specsById())
.ignoreExisting();

try (CloseableIterator<ManifestEntry<DataFile>> deletes = matchingDeletesGroup.entries().iterator()) {
if (deletes.hasNext()) {
throw new ValidationException("Cannot commit, missing data files: %s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Cannot commit, deletes have been added to the table which refer to data files which no longer exist"?

I think I go that right here ... It would help me a lot if the message here was a bit more explanatory about what went wrong and why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing error message. We can fix it, but I'd prefer to do it in a separate commit so that we don't have changes to more test cases than necessary. I also plan to fix the initial validation in RewriteFiles later for the same reason. Is it okay to leave this as-is?

Iterators.toString(Iterators.transform(deletes, entry -> entry.file().path().toString())));
}

} catch (IOException e) {
throw new UncheckedIOException("Failed to validate required files exist", e);
}
}

private Pair<List<ManifestFile>, Set<Long>> validationHistory(TableMetadata base, Long startingSnapshotId,
Set<String> matchingOperations,
ManifestContent content) {
List<ManifestFile> manifests = Lists.newArrayList();
Set<Long> newSnapshots = Sets.newHashSet();

Expand All @@ -322,31 +366,25 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI

if (matchingOperations.contains(currentSnapshot.operation())) {
newSnapshots.add(currentSnapshotId);
for (ManifestFile manifest : currentSnapshot.dataManifests()) {
if (manifest.snapshotId() == (long) currentSnapshotId) {
manifests.add(manifest);
if (content == ManifestContent.DATA) {
for (ManifestFile manifest : currentSnapshot.dataManifests()) {
if (manifest.snapshotId() == (long) currentSnapshotId) {
manifests.add(manifest);
}
}
} else {
for (ManifestFile manifest : currentSnapshot.deleteManifests()) {
if (manifest.snapshotId() == (long) currentSnapshotId) {
manifests.add(manifest);
}
}
}
}

currentSnapshotId = currentSnapshot.parentId();
}

ManifestGroup matchingDeletesGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of())
.filterManifestEntries(entry -> entry.status() != ManifestEntry.Status.ADDED &&
newSnapshots.contains(entry.snapshotId()) && requiredDataFiles.contains(entry.file().path()))
.specsById(base.specsById())
.ignoreExisting();

try (CloseableIterator<ManifestEntry<DataFile>> deletes = matchingDeletesGroup.entries().iterator()) {
if (deletes.hasNext()) {
throw new ValidationException("Cannot commit, missing data files: %s",
Iterators.toString(Iterators.transform(deletes, entry -> entry.file().path().toString())));
}

} catch (IOException e) {
throw new UncheckedIOException("Failed to validate required files exist", e);
}
return Pair.of(manifests, newSnapshots);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,14 @@ public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
@Override
public RewriteDataFilesActionResult execute() {
CloseableIterable<FileScanTask> fileScanTasks = null;
if (table.currentSnapshot() == null) {
return RewriteDataFilesActionResult.empty();
}

long startingSnapshotId = table.currentSnapshot().snapshotId();
try {
fileScanTasks = table.newScan()
.useSnapshot(startingSnapshotId)
.caseSensitive(caseSensitive)
.ignoreResiduals()
.filter(filter)
Expand Down Expand Up @@ -241,7 +247,7 @@ public RewriteDataFilesActionResult execute() {
List<DataFile> currentDataFiles = combinedScanTasks.stream()
.flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
.collect(Collectors.toList());
replaceDataFiles(currentDataFiles, addedDataFiles);
replaceDataFiles(currentDataFiles, addedDataFiles, startingSnapshotId);

return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
}
Expand All @@ -262,10 +268,12 @@ private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
return tasksGroupedByPartition.asMap();
}

private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles,
long startingSnapshotId) {
try {
RewriteFiles rewriteFiles = table.newRewrite();
rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
RewriteFiles rewriteFiles = table.newRewrite()
.validateFromSnapshot(startingSnapshotId)
.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
commit(rewriteFiles);
} catch (Exception e) {
Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -46,9 +47,16 @@ public class RewriteDataFilesCommitManager {
private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesCommitManager.class);

private final Table table;
private final long startingSnapshotId;

// constructor used for testing
public RewriteDataFilesCommitManager(Table table) {
this(table, table.currentSnapshot().snapshotId());
}

public RewriteDataFilesCommitManager(Table table, long startingSnapshotId) {
this.table = table;
this.startingSnapshotId = startingSnapshotId;
}

/**
Expand All @@ -64,9 +72,10 @@ public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
addedDataFiles = Sets.union(addedDataFiles, group.addedFiles());
}

table.newRewrite()
.rewriteFiles(rewrittenDataFiles, addedDataFiles)
.commit();
RewriteFiles rewrite = table.newRewrite()
.validateFromSnapshot(startingSnapshotId)
.rewriteFiles(rewrittenDataFiles, addedDataFiles);
rewrite.commit();
}

/**
Expand Down
Loading