Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/RewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ default RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> fil
);
}

/**
* Add a rewrite that replaces one set of data files with another set that contains the same data.
* The sequence number provided will be used for all the data files added.
*
* @param filesToDelete files that will be replaced (deleted), cannot be null or empty.
* @param filesToAdd files that will be added, cannot be null or empty.
* @param sequenceNumber sequence number to use for all data files added
* @return this for method chaining
*/
RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd, long sequenceNumber);

/**
* Add a rewrite that replaces one set of files with another set that contains the same data.
*
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, Rewri
*/
String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";

/**
* If the compaction should use the sequence number of the snapshot at compaction start time for new data files,
* instead of using the sequence number of the newly produced snapshot.
* <p>
* This avoids commit conflicts with updates that add newer equality deletes at a higher sequence number.
* <p>
* Defaults to true.
*/
String USE_STARTING_SEQUENCE_NUMBER = "use-starting-sequence-number";
boolean USE_STARTING_SEQUENCE_NUMBER_DEFAULT = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this is true, do we have to ignore it with V1 Tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was thinking about that right now. Technically I think it has no harm for v1 tables, because the sequence number is always 0, and it is not read or written anywhere. Let me add a unit test for v1. Do you see any place this might affect v1 table?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's all I was thinking, V1 Tables don't have sequence numbers so I just wanted to make sure they don't break if we are trying to set them.


/**
* Choose BINPACK as a strategy for this rewrite operation
* @return this for method chaining
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
Expand Down Expand Up @@ -66,6 +67,12 @@ private void verifyInputAndOutputFiles(Set<DataFile> dataFilesToDelete, Set<Dele
}
}

@Override
public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd, long sequenceNumber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly unrelated concern: should we be using Set here? It seems needlessly restrictive. Plus, DataFile is an interface, so you could easily pass files that don't implement equals/hashSet and are always considered unique. Just one thing that's always made me wonder about this API.

Not a blocker for this PR though!

setNewFilesSequenceNumber(sequenceNumber);
return rewriteFiles(filesToDelete, ImmutableSet.of(), filesToAdd, ImmutableSet.of());
}

@Override
public RewriteFiles rewriteFiles(Set<DataFile> dataFilesToReplace, Set<DeleteFile> deleteFilesToReplace,
Set<DataFile> dataFilesToAdd, Set<DeleteFile> deleteFilesToAdd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,13 @@ ManifestEntry<F> wrapExisting(Long newSnapshotId, Long newSequenceNumber, F newF
}

ManifestEntry<F> wrapAppend(Long newSnapshotId, F newFile) {
return wrapAppend(newSnapshotId, null, newFile);
}

ManifestEntry<F> wrapAppend(Long newSnapshotId, Long newSequenceNumber, F newFile) {
this.status = Status.ADDED;
this.snapshotId = newSnapshotId;
this.sequenceNumber = null;
this.sequenceNumber = newSequenceNumber;
this.file = newFile;
return this;
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ public void add(F addedFile) {
addEntry(reused.wrapAppend(snapshotId, addedFile));
}

/**
* Add an added entry for a file with a specific sequence number.
* <p>
* The entry's snapshot ID will be this manifest's snapshot ID.
* The entry's sequence number will be the provided sequence number.
*
* @param addedFile a data file
* @param sequenceNumber sequence number for the data file
*/
public void add(F addedFile, long sequenceNumber) {
addEntry(reused.wrapAppend(snapshotId, sequenceNumber, addedFile));
}

void add(ManifestEntry<F> entry) {
addEntry(reused.wrapAppend(snapshotId, entry.file()));
}
Expand Down
47 changes: 43 additions & 4 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand Down Expand Up @@ -80,6 +81,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final List<DataFile> newFiles = Lists.newArrayList();
private Long newFilesSequenceNumber;
private final Map<Integer, List<DeleteFile>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -297,7 +299,8 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI
*/
protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId,
Iterable<DataFile> dataFiles) {
validateNoNewDeletesForDataFiles(base, startingSnapshotId, null, dataFiles, true);
validateNoNewDeletesForDataFiles(base, startingSnapshotId, null, dataFiles, true,
newFilesSequenceNumber != null);
}

/**
Expand All @@ -313,6 +316,28 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin
protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you copy the javadoc for the previous version? I think it's helpful to have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added back

Expression dataFilter, Iterable<DataFile> dataFiles,
boolean caseSensitive) {
validateNoNewDeletesForDataFiles(base, startingSnapshotId, dataFilter, dataFiles, caseSensitive, false);
}

/**
* Validates that no new delete files that must be applied to the given data files have been added to the table since
* a starting snapshot, with the option to ignore equality deletes during the validation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nbd, but I would add a note here about why we want to ignore equality deletes, just so future readers could understand.

* <p>
* For example, in the case of rewriting data files, if the added data files have the same sequence number as the
* replaced data files, equality deletes added at a higher sequence number are still effective against the added
* data files, so there is no risk of commit conflict between RewriteFiles and RowDelta. In cases like this,
* validation against equality delete files can be omitted.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter a data filter
* @param dataFiles data files to validate have no new row deletes
* @param caseSensitive whether expression binding should be case-sensitive
* @param ignoreEqualityDeletes whether equality deletes should be ignored in validation
*/
private void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId,
Expression dataFilter, Iterable<DataFile> dataFiles,
boolean caseSensitive, boolean ignoreEqualityDeletes) {
// if there is no current table state, no files have been added
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
return;
Expand All @@ -327,8 +352,14 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin

for (DataFile dataFile : dataFiles) {
// if any delete is found that applies to files written in or before the starting snapshot, fail
if (deletes.forDataFile(startingSequenceNumber, dataFile).length > 0) {
throw new ValidationException("Cannot commit, found new delete for replaced data file: %s", dataFile);
DeleteFile[] deleteFiles = deletes.forDataFile(startingSequenceNumber, dataFile);
if (ignoreEqualityDeletes) {
ValidationException.check(
Arrays.stream(deleteFiles).noneMatch(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES),
"Cannot commit, found new position delete for replaced data file: %s", dataFile);
} else {
ValidationException.check(deleteFiles.length == 0,
"Cannot commit, found new delete for replaced data file: %s", dataFile);
}
}
}
Expand Down Expand Up @@ -360,6 +391,10 @@ protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapsho
dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
}

protected void setNewFilesSequenceNumber(long sequenceNumber) {
this.newFilesSequenceNumber = sequenceNumber;
}

private long startingSequenceNumber(TableMetadata metadata, Long staringSnapshotId) {
if (staringSnapshotId != null && metadata.snapshot(staringSnapshotId) != null) {
Snapshot startingSnapshot = metadata.snapshot(staringSnapshotId);
Expand Down Expand Up @@ -591,7 +626,11 @@ private ManifestFile newFilesAsManifest() {
try {
ManifestWriter<DataFile> writer = newManifestWriter(dataSpec());
try {
writer.addAll(newFiles);
if (newFilesSequenceNumber == null) {
writer.addAll(newFiles);
} else {
newFiles.forEach(f -> writer.add(f, newFilesSequenceNumber));
}
} finally {
writer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,21 @@ public class RewriteDataFilesCommitManager {

private final Table table;
private final long startingSnapshotId;
private final boolean useStartingSequenceNumber;

// constructor used for testing
public RewriteDataFilesCommitManager(Table table) {
this(table, table.currentSnapshot().snapshotId());
}

public RewriteDataFilesCommitManager(Table table, long startingSnapshotId) {
this(table, startingSnapshotId, RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
}

public RewriteDataFilesCommitManager(Table table, long startingSnapshotId, boolean useStartingSequenceNumber) {
this.table = table;
this.startingSnapshotId = startingSnapshotId;
this.useStartingSequenceNumber = useStartingSequenceNumber;
}

/**
Expand All @@ -72,9 +78,14 @@ public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
addedDataFiles = Sets.union(addedDataFiles, group.addedFiles());
}

RewriteFiles rewrite = table.newRewrite()
.validateFromSnapshot(startingSnapshotId)
.rewriteFiles(rewrittenDataFiles, addedDataFiles);
RewriteFiles rewrite = table.newRewrite().validateFromSnapshot(startingSnapshotId);
if (useStartingSequenceNumber) {
long sequenceNumber = table.snapshot(startingSnapshotId).sequenceNumber();
rewrite.rewriteFiles(rewrittenDataFiles, addedDataFiles, sequenceNumber);
} else {
rewrite.rewriteFiles(rewrittenDataFiles, addedDataFiles);
}

rewrite.commit();
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,17 @@ protected DeleteFile newDeleteFile(int specId, String partitionPath) {
.build();
}

protected DeleteFile newEqualityDeleteFile(int specId, String partitionPath, int... fieldIds) {
PartitionSpec spec = table.specs().get(specId);
return FileMetadata.deleteFileBuilder(spec)
.ofEqualityDeletes(fieldIds)
.withPath("/path/to/delete-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath(partitionPath)
.withRecordCount(1)
.build();
}

protected <T> PositionDelete<T> positionDelete(CharSequence path, long pos, T row) {
PositionDelete<T> positionDelete = PositionDelete.create();
return positionDelete.set(path, pos, row);
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

package org.apache.iceberg;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -87,6 +90,24 @@ public void testManifestPartitionStats() throws IOException {
Conversions.fromByteBuffer(Types.IntegerType.get(), partitionFieldSummary.upperBound()));
}

@Test
public void testWriteManifestWithSequenceNumber() throws IOException {
Assume.assumeTrue("sequence number is only valid for format version > 1", formatVersion > 1);
File manifestFile = temp.newFile("manifest.avro");
Assert.assertTrue(manifestFile.delete());
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, 1L);
writer.add(newFile(10, TestHelpers.Row.of(1)), 1000L);
writer.close();
ManifestFile manifest = writer.toManifestFile();
Assert.assertEquals("Manifest should have no sequence number", -1L, manifest.sequenceNumber());
ManifestReader<DataFile> manifestReader = ManifestFiles.read(manifest, table.io());
for (ManifestEntry<DataFile> entry : manifestReader.entries()) {
Assert.assertEquals("Custom sequence number should be used for all manifest entries",
1000L, (long) entry.sequenceNumber());
}
}

private DataFile newFile(long recordCount) {
return newFile(recordCount, null);
}
Expand Down
67 changes: 66 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,75 @@ public void testRewriteDataAndDeleteFiles() {
files(FILE_A_DELETES, FILE_B_DELETES),
statuses(DELETED, EXISTING));

// We should only get the 3 manifests that this test is expected to add.
// We should only get the 5 manifests that this test is expected to add.
Assert.assertEquals("Only 5 manifests should exist", 5, listManifestFiles().size());
}

@Test
public void testRewriteDataAndAssignOldSequenceNumber() {
Assume.assumeTrue("Sequence number is only supported in iceberg format v2. ", formatVersion > 1);
Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());

table.newRowDelta()
.addRows(FILE_A)
.addRows(FILE_B)
.addRows(FILE_C)
.addDeletes(FILE_A_DELETES)
.addDeletes(FILE_B_DELETES)
.commit();

TableMetadata base = readMetadata();
Snapshot baseSnap = base.currentSnapshot();
long baseSnapshotId = baseSnap.snapshotId();
Assert.assertEquals("Should create 2 manifests for initial write", 2, baseSnap.allManifests().size());
List<ManifestFile> initialManifests = baseSnap.allManifests();

validateManifestEntries(initialManifests.get(0),
ids(baseSnapshotId, baseSnapshotId, baseSnapshotId),
files(FILE_A, FILE_B, FILE_C),
statuses(ADDED, ADDED, ADDED));
validateDeleteManifest(initialManifests.get(1),
seqs(1, 1),
ids(baseSnapshotId, baseSnapshotId),
files(FILE_A_DELETES, FILE_B_DELETES),
statuses(ADDED, ADDED));

// Rewrite the files.
long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
Snapshot pending = table.newRewrite()
.validateFromSnapshot(table.currentSnapshot().snapshotId())
.rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_D), oldSequenceNumber)
.apply();

Assert.assertEquals("Should contain 3 manifest", 3, pending.allManifests().size());
Assert.assertFalse("Should not contain data manifest from initial write",
pending.dataManifests().stream().anyMatch(initialManifests::contains));

long pendingId = pending.snapshotId();
ManifestFile newManifest = pending.allManifests().get(0);
validateManifestEntries(newManifest, ids(pendingId), files(FILE_D), statuses(ADDED));
for (ManifestEntry<DataFile> entry : ManifestFiles.read(newManifest, FILE_IO).entries()) {
Assert.assertEquals("Should have old sequence number for manifest entries",
oldSequenceNumber, (long) entry.sequenceNumber());
}
Assert.assertEquals("Should use new sequence number for the manifest file",
oldSequenceNumber + 1, newManifest.sequenceNumber());

validateManifestEntries(pending.allManifests().get(1),
ids(pendingId, baseSnapshotId, baseSnapshotId),
files(FILE_A, FILE_B, FILE_C),
statuses(DELETED, EXISTING, EXISTING));

validateDeleteManifest(pending.allManifests().get(2),
seqs(1, 1),
ids(baseSnapshotId, baseSnapshotId),
files(FILE_A_DELETES, FILE_B_DELETES),
statuses(ADDED, ADDED));

// We should only get the 4 manifests that this test is expected to add.
Assert.assertEquals("Only 4 manifests should exist", 4, listManifestFiles().size());
}

@Test
public void testFailure() {
table.newAppend()
Expand Down
Loading